HBASE-7305. ZK based Read/Write locks for table operations

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1448867 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Enis Soztutar 2013-02-22 00:15:52 +00:00
parent e3046bfe11
commit 9bb04c741a
34 changed files with 3286 additions and 214 deletions

View File

@ -4902,6 +4902,807 @@ public final class ZooKeeperProtos {
// @@protoc_insertion_point(class_scope:ReplicationLock)
}
public interface TableLockOrBuilder
extends com.google.protobuf.MessageOrBuilder {
// required bytes tableName = 1;
boolean hasTableName();
com.google.protobuf.ByteString getTableName();
// required .ServerName lockOwner = 2;
boolean hasLockOwner();
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getLockOwner();
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getLockOwnerOrBuilder();
// required int64 threadId = 3;
boolean hasThreadId();
long getThreadId();
// required bool isShared = 4;
boolean hasIsShared();
boolean getIsShared();
// optional string purpose = 5;
boolean hasPurpose();
String getPurpose();
}
public static final class TableLock extends
com.google.protobuf.GeneratedMessage
implements TableLockOrBuilder {
// Use TableLock.newBuilder() to construct.
private TableLock(Builder builder) {
super(builder);
}
private TableLock(boolean noInit) {}
private static final TableLock defaultInstance;
public static TableLock getDefaultInstance() {
return defaultInstance;
}
public TableLock getDefaultInstanceForType() {
return defaultInstance;
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_TableLock_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_TableLock_fieldAccessorTable;
}
private int bitField0_;
// required bytes tableName = 1;
public static final int TABLENAME_FIELD_NUMBER = 1;
private com.google.protobuf.ByteString tableName_;
public boolean hasTableName() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public com.google.protobuf.ByteString getTableName() {
return tableName_;
}
// required .ServerName lockOwner = 2;
public static final int LOCKOWNER_FIELD_NUMBER = 2;
private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName lockOwner_;
public boolean hasLockOwner() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getLockOwner() {
return lockOwner_;
}
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getLockOwnerOrBuilder() {
return lockOwner_;
}
// required int64 threadId = 3;
public static final int THREADID_FIELD_NUMBER = 3;
private long threadId_;
public boolean hasThreadId() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
public long getThreadId() {
return threadId_;
}
// required bool isShared = 4;
public static final int ISSHARED_FIELD_NUMBER = 4;
private boolean isShared_;
public boolean hasIsShared() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
public boolean getIsShared() {
return isShared_;
}
// optional string purpose = 5;
public static final int PURPOSE_FIELD_NUMBER = 5;
private java.lang.Object purpose_;
public boolean hasPurpose() {
return ((bitField0_ & 0x00000010) == 0x00000010);
}
public String getPurpose() {
java.lang.Object ref = purpose_;
if (ref instanceof String) {
return (String) ref;
} else {
com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref;
String s = bs.toStringUtf8();
if (com.google.protobuf.Internal.isValidUtf8(bs)) {
purpose_ = s;
}
return s;
}
}
private com.google.protobuf.ByteString getPurposeBytes() {
java.lang.Object ref = purpose_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8((String) ref);
purpose_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
private void initFields() {
tableName_ = com.google.protobuf.ByteString.EMPTY;
lockOwner_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
threadId_ = 0L;
isShared_ = false;
purpose_ = "";
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
if (!hasTableName()) {
memoizedIsInitialized = 0;
return false;
}
if (!hasLockOwner()) {
memoizedIsInitialized = 0;
return false;
}
if (!hasThreadId()) {
memoizedIsInitialized = 0;
return false;
}
if (!hasIsShared()) {
memoizedIsInitialized = 0;
return false;
}
if (!getLockOwner().isInitialized()) {
memoizedIsInitialized = 0;
return false;
}
memoizedIsInitialized = 1;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeBytes(1, tableName_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeMessage(2, lockOwner_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeInt64(3, threadId_);
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeBool(4, isShared_);
}
if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeBytes(5, getPurposeBytes());
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(1, tableName_);
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(2, lockOwner_);
}
if (((bitField0_ & 0x00000004) == 0x00000004)) {
size += com.google.protobuf.CodedOutputStream
.computeInt64Size(3, threadId_);
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
size += com.google.protobuf.CodedOutputStream
.computeBoolSize(4, isShared_);
}
if (((bitField0_ & 0x00000010) == 0x00000010)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(5, getPurposeBytes());
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@java.lang.Override
protected java.lang.Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
@java.lang.Override
public boolean equals(final java.lang.Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock)) {
return super.equals(obj);
}
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock other = (org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock) obj;
boolean result = true;
result = result && (hasTableName() == other.hasTableName());
if (hasTableName()) {
result = result && getTableName()
.equals(other.getTableName());
}
result = result && (hasLockOwner() == other.hasLockOwner());
if (hasLockOwner()) {
result = result && getLockOwner()
.equals(other.getLockOwner());
}
result = result && (hasThreadId() == other.hasThreadId());
if (hasThreadId()) {
result = result && (getThreadId()
== other.getThreadId());
}
result = result && (hasIsShared() == other.hasIsShared());
if (hasIsShared()) {
result = result && (getIsShared()
== other.getIsShared());
}
result = result && (hasPurpose() == other.hasPurpose());
if (hasPurpose()) {
result = result && getPurpose()
.equals(other.getPurpose());
}
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
}
@java.lang.Override
public int hashCode() {
int hash = 41;
hash = (19 * hash) + getDescriptorForType().hashCode();
if (hasTableName()) {
hash = (37 * hash) + TABLENAME_FIELD_NUMBER;
hash = (53 * hash) + getTableName().hashCode();
}
if (hasLockOwner()) {
hash = (37 * hash) + LOCKOWNER_FIELD_NUMBER;
hash = (53 * hash) + getLockOwner().hashCode();
}
if (hasThreadId()) {
hash = (37 * hash) + THREADID_FIELD_NUMBER;
hash = (53 * hash) + hashLong(getThreadId());
}
if (hasIsShared()) {
hash = (37 * hash) + ISSHARED_FIELD_NUMBER;
hash = (53 * hash) + hashBoolean(getIsShared());
}
if (hasPurpose()) {
hash = (37 * hash) + PURPOSE_FIELD_NUMBER;
hash = (53 * hash) + getPurpose().hashCode();
}
hash = (29 * hash) + getUnknownFields().hashCode();
return hash;
}
public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input)) {
return builder.buildParsed();
} else {
return null;
}
}
public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
return builder.buildParsed();
} else {
return null;
}
}
public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
protected Builder newBuilderForType(
com.google.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder>
implements org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLockOrBuilder {
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_TableLock_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.internal_static_TableLock_fieldAccessorTable;
}
// Construct using org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
getLockOwnerFieldBuilder();
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
tableName_ = com.google.protobuf.ByteString.EMPTY;
bitField0_ = (bitField0_ & ~0x00000001);
if (lockOwnerBuilder_ == null) {
lockOwner_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
} else {
lockOwnerBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000002);
threadId_ = 0L;
bitField0_ = (bitField0_ & ~0x00000004);
isShared_ = false;
bitField0_ = (bitField0_ & ~0x00000008);
purpose_ = "";
bitField0_ = (bitField0_ & ~0x00000010);
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.getDescriptor();
}
public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock getDefaultInstanceForType() {
return org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.getDefaultInstance();
}
public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock build() {
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
private org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock buildParsed()
throws com.google.protobuf.InvalidProtocolBufferException {
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(
result).asInvalidProtocolBufferException();
}
return result;
}
public org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock buildPartial() {
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock result = new org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
to_bitField0_ |= 0x00000001;
}
result.tableName_ = tableName_;
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
to_bitField0_ |= 0x00000002;
}
if (lockOwnerBuilder_ == null) {
result.lockOwner_ = lockOwner_;
} else {
result.lockOwner_ = lockOwnerBuilder_.build();
}
if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
to_bitField0_ |= 0x00000004;
}
result.threadId_ = threadId_;
if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
to_bitField0_ |= 0x00000008;
}
result.isShared_ = isShared_;
if (((from_bitField0_ & 0x00000010) == 0x00000010)) {
to_bitField0_ |= 0x00000010;
}
result.purpose_ = purpose_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock) {
return mergeFrom((org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock other) {
if (other == org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.getDefaultInstance()) return this;
if (other.hasTableName()) {
setTableName(other.getTableName());
}
if (other.hasLockOwner()) {
mergeLockOwner(other.getLockOwner());
}
if (other.hasThreadId()) {
setThreadId(other.getThreadId());
}
if (other.hasIsShared()) {
setIsShared(other.getIsShared());
}
if (other.hasPurpose()) {
setPurpose(other.getPurpose());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
if (!hasTableName()) {
return false;
}
if (!hasLockOwner()) {
return false;
}
if (!hasThreadId()) {
return false;
}
if (!hasIsShared()) {
return false;
}
if (!getLockOwner().isInitialized()) {
return false;
}
return true;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder(
this.getUnknownFields());
while (true) {
int tag = input.readTag();
switch (tag) {
case 0:
this.setUnknownFields(unknownFields.build());
onChanged();
return this;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
this.setUnknownFields(unknownFields.build());
onChanged();
return this;
}
break;
}
case 10: {
bitField0_ |= 0x00000001;
tableName_ = input.readBytes();
break;
}
case 18: {
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder subBuilder = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder();
if (hasLockOwner()) {
subBuilder.mergeFrom(getLockOwner());
}
input.readMessage(subBuilder, extensionRegistry);
setLockOwner(subBuilder.buildPartial());
break;
}
case 24: {
bitField0_ |= 0x00000004;
threadId_ = input.readInt64();
break;
}
case 32: {
bitField0_ |= 0x00000008;
isShared_ = input.readBool();
break;
}
case 42: {
bitField0_ |= 0x00000010;
purpose_ = input.readBytes();
break;
}
}
}
}
private int bitField0_;
// required bytes tableName = 1;
private com.google.protobuf.ByteString tableName_ = com.google.protobuf.ByteString.EMPTY;
public boolean hasTableName() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
public com.google.protobuf.ByteString getTableName() {
return tableName_;
}
public Builder setTableName(com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000001;
tableName_ = value;
onChanged();
return this;
}
public Builder clearTableName() {
bitField0_ = (bitField0_ & ~0x00000001);
tableName_ = getDefaultInstance().getTableName();
onChanged();
return this;
}
// required .ServerName lockOwner = 2;
private org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName lockOwner_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
private com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder> lockOwnerBuilder_;
public boolean hasLockOwner() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName getLockOwner() {
if (lockOwnerBuilder_ == null) {
return lockOwner_;
} else {
return lockOwnerBuilder_.getMessage();
}
}
public Builder setLockOwner(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) {
if (lockOwnerBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
lockOwner_ = value;
onChanged();
} else {
lockOwnerBuilder_.setMessage(value);
}
bitField0_ |= 0x00000002;
return this;
}
public Builder setLockOwner(
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder builderForValue) {
if (lockOwnerBuilder_ == null) {
lockOwner_ = builderForValue.build();
onChanged();
} else {
lockOwnerBuilder_.setMessage(builderForValue.build());
}
bitField0_ |= 0x00000002;
return this;
}
public Builder mergeLockOwner(org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName value) {
if (lockOwnerBuilder_ == null) {
if (((bitField0_ & 0x00000002) == 0x00000002) &&
lockOwner_ != org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance()) {
lockOwner_ =
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.newBuilder(lockOwner_).mergeFrom(value).buildPartial();
} else {
lockOwner_ = value;
}
onChanged();
} else {
lockOwnerBuilder_.mergeFrom(value);
}
bitField0_ |= 0x00000002;
return this;
}
public Builder clearLockOwner() {
if (lockOwnerBuilder_ == null) {
lockOwner_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
onChanged();
} else {
lockOwnerBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder getLockOwnerBuilder() {
bitField0_ |= 0x00000002;
onChanged();
return getLockOwnerFieldBuilder().getBuilder();
}
public org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder getLockOwnerOrBuilder() {
if (lockOwnerBuilder_ != null) {
return lockOwnerBuilder_.getMessageOrBuilder();
} else {
return lockOwner_;
}
}
private com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder>
getLockOwnerFieldBuilder() {
if (lockOwnerBuilder_ == null) {
lockOwnerBuilder_ = new com.google.protobuf.SingleFieldBuilder<
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.Builder, org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerNameOrBuilder>(
lockOwner_,
getParentForChildren(),
isClean());
lockOwner_ = null;
}
return lockOwnerBuilder_;
}
// required int64 threadId = 3;
private long threadId_ ;
public boolean hasThreadId() {
return ((bitField0_ & 0x00000004) == 0x00000004);
}
public long getThreadId() {
return threadId_;
}
public Builder setThreadId(long value) {
bitField0_ |= 0x00000004;
threadId_ = value;
onChanged();
return this;
}
public Builder clearThreadId() {
bitField0_ = (bitField0_ & ~0x00000004);
threadId_ = 0L;
onChanged();
return this;
}
// required bool isShared = 4;
private boolean isShared_ ;
public boolean hasIsShared() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
public boolean getIsShared() {
return isShared_;
}
public Builder setIsShared(boolean value) {
bitField0_ |= 0x00000008;
isShared_ = value;
onChanged();
return this;
}
public Builder clearIsShared() {
bitField0_ = (bitField0_ & ~0x00000008);
isShared_ = false;
onChanged();
return this;
}
// optional string purpose = 5;
private java.lang.Object purpose_ = "";
public boolean hasPurpose() {
return ((bitField0_ & 0x00000010) == 0x00000010);
}
public String getPurpose() {
java.lang.Object ref = purpose_;
if (!(ref instanceof String)) {
String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
purpose_ = s;
return s;
} else {
return (String) ref;
}
}
public Builder setPurpose(String value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000010;
purpose_ = value;
onChanged();
return this;
}
public Builder clearPurpose() {
bitField0_ = (bitField0_ & ~0x00000010);
purpose_ = getDefaultInstance().getPurpose();
onChanged();
return this;
}
void setPurpose(com.google.protobuf.ByteString value) {
bitField0_ |= 0x00000010;
purpose_ = value;
onChanged();
}
// @@protoc_insertion_point(builder_scope:TableLock)
}
static {
defaultInstance = new TableLock(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:TableLock)
}
private static com.google.protobuf.Descriptors.Descriptor
internal_static_RootRegionServer_descriptor;
private static
@ -4952,6 +5753,11 @@ public final class ZooKeeperProtos {
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_ReplicationLock_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
internal_static_TableLock_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_TableLock_fieldAccessorTable;
public static com.google.protobuf.Descriptors.FileDescriptor
getDescriptor() {
@ -4980,8 +5786,11 @@ public final class ZooKeeperProtos {
"tate.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
"BLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010pos" +
"ition\030\001 \002(\003\"$\n\017ReplicationLock\022\021\n\tlockOw" +
"ner\030\001 \002(\tBE\n*org.apache.hadoop.hbase.pro",
"tobuf.generatedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
"ner\030\001 \002(\t\"s\n\tTableLock\022\021\n\ttableName\030\001 \002(",
"\014\022\036\n\tlockOwner\030\002 \002(\0132\013.ServerName\022\020\n\010thr" +
"eadId\030\003 \002(\003\022\020\n\010isShared\030\004 \002(\010\022\017\n\007purpose" +
"\030\005 \001(\tBE\n*org.apache.hadoop.hbase.protob" +
"uf.generatedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -5068,6 +5877,14 @@ public final class ZooKeeperProtos {
new java.lang.String[] { "LockOwner", },
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationLock.class,
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.ReplicationLock.Builder.class);
internal_static_TableLock_descriptor =
getDescriptor().getMessageTypes().get(10);
internal_static_TableLock_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_TableLock_descriptor,
new java.lang.String[] { "TableName", "LockOwner", "ThreadId", "IsShared", "Purpose", },
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.class,
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.Builder.class);
return null;
}
};

View File

@ -133,3 +133,14 @@ message ReplicationHLogPosition {
message ReplicationLock {
required string lockOwner = 1;
}
/**
* Metadata associated with a table lock in zookeeper
*/
message TableLock {
optional bytes tableName = 1;
optional ServerName lockOwner = 2;
optional int64 threadId = 3;
optional bool isShared = 4;
optional string purpose = 5;
}

View File

@ -0,0 +1,86 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* An interface for an application-specific lock.
*/
@InterfaceAudience.Private
public interface InterProcessLock {
/**
* Acquire the lock, waiting indefinitely until the lock is released or
* the thread is interrupted.
* @throws IOException If there is an unrecoverable error releasing the lock
* @throws InterruptedException If current thread is interrupted while
* waiting for the lock
*/
public void acquire() throws IOException, InterruptedException;
/**
* Acquire the lock within a wait time.
* @param timeoutMs The maximum time (in milliseconds) to wait for the lock,
* -1 to wait indefinitely
* @return True if the lock was acquired, false if waiting time elapsed
* before the lock was acquired
* @throws IOException If there is an unrecoverable error talking talking
* (e.g., when talking to a lock service) when acquiring
* the lock
* @throws InterruptedException If the thread is interrupted while waiting to
* acquire the lock
*/
public boolean tryAcquire(long timeoutMs)
throws IOException, InterruptedException;
/**
* Release the lock.
* @throws IOException If there is an unrecoverable error releasing the lock
* @throws InterruptedException If the thread is interrupted while releasing
* the lock
*/
public void release() throws IOException, InterruptedException;
/**
* If supported, attempts to reap all the locks of this type by forcefully
* deleting the locks. Lock reaping is different than coordinated lock revocation
* in that, there is no coordination, and the behavior is undefined if the
* lock holder is still alive.
* @throws IOException If there is an unrecoverable error reaping the locks
*/
public void reapAllLocks() throws IOException;
/**
* An interface for objects that process lock metadata.
*/
public static interface MetadataHandler {
/**
* Called after lock metadata is successfully read from a distributed
* lock service. This method may contain any procedures for, e.g.,
* printing the metadata in a humanly-readable format.
* @param metadata The metadata
*/
public void handleMetadata(byte[] metadata);
}
}

View File

@ -0,0 +1,45 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* An interface for a distributed reader-writer lock.
*/
@InterfaceAudience.Private
public interface InterProcessReadWriteLock {
/**
* Obtain a reader lock containing given metadata.
* @param metadata Serialized lock metadata (this may contain information
* such as the process owning the lock or the purpose for
* which the lock was acquired). Must not be null.
* @return An instantiated InterProcessReadWriteLock instance
*/
public InterProcessLock readLock(byte[] metadata);
/**
* Obtain a writer lock containing given metadata.
* @param metadata See documentation of metadata parameter in readLock()
* @return An instantiated InterProcessReadWriteLock instance
*/
public InterProcessLock writeLock(byte[] metadata);
}

View File

@ -0,0 +1,37 @@
/*
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
public class LockTimeoutException extends IOException {
private static final long serialVersionUID = -1770764924258999825L;
/** Default constructor */
public LockTimeoutException() {
super();
}
public LockTimeoutException(String s) {
super(s);
}
}

View File

@ -33,10 +33,10 @@ import org.cloudera.htrace.Trace;
/**
* Abstract base class for all HBase event handlers. Subclasses should
* implement the {@link #process()} method. Subclasses should also do all
* necessary checks up in their constructor if possible -- check table exists,
* is disabled, etc. -- so they fail fast rather than later when process is
* running. Do it this way because process be invoked directly but event
* implement the {@link #process()} and {@link #prepare()} methods. Subclasses
* should also do all necessary checks up in their prepare() if possible -- check
* table exists, is disabled, etc. -- so they fail fast rather than later when process
* is running. Do it this way because process be invoked directly but event
* handlers are also
* run in an executor context -- i.e. asynchronously -- and in this case,
* exceptions thrown at process time will not be seen by the invoker, not till
@ -102,7 +102,7 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
* originated and then where its destined -- e.g. RS2ZK_ prefix means the
* event came from a regionserver destined for zookeeper -- and then what
* the even is; e.g. REGION_OPENING.
*
*
* <p>We give the enums indices so we can add types later and keep them
* grouped together rather than have to add them always to the end as we
* would have to if we used raw enum ordinals.
@ -202,6 +202,19 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
}
}
/**
* Event handlers should do all the necessary checks in this method (rather than
* in the constructor, or in process()) so that the caller, which is mostly executed
* in the ipc context can fail fast. Process is executed async from the client ipc,
* so this method gives a quick chance to do some basic checks.
* Should be called after constructing the EventHandler, and before process().
* @return the instance of this class
* @throws Exception when something goes wrong
*/
public EventHandler prepare() throws Exception {
return this;
}
public void run() {
Span chunk = Trace.startSpan(Thread.currentThread().getName(), parent,
Sampler.ALWAYS);
@ -282,7 +295,7 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
public synchronized void setListener(EventHandlerListener listener) {
this.listener = listener;
}
@Override
public String toString() {
return "Event #" + getSeqid() +

View File

@ -112,6 +112,8 @@ public class AssignmentManager extends ZooKeeperListener {
private LoadBalancer balancer;
private final TableLockManager tableLockManager;
final private KeyLocker<String> locker = new KeyLocker<String>();
/**
@ -192,7 +194,8 @@ public class AssignmentManager extends ZooKeeperListener {
*/
public AssignmentManager(Server server, ServerManager serverManager,
CatalogTracker catalogTracker, final LoadBalancer balancer,
final ExecutorService service, MetricsMaster metricsMaster) throws KeeperException, IOException {
final ExecutorService service, MetricsMaster metricsMaster,
final TableLockManager tableLockManager) throws KeeperException, IOException {
super(server.getZooKeeper());
this.server = server;
this.serverManager = serverManager;
@ -228,6 +231,7 @@ public class AssignmentManager extends ZooKeeperListener {
ThreadFactory threadFactory = Threads.newDaemonThreadFactory("hbase-am-zkevent-worker");
zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
TimeUnit.SECONDS, threadFactory);
this.tableLockManager = tableLockManager;
}
void startTimeOutMonitor() {
@ -301,7 +305,7 @@ public class AssignmentManager extends ZooKeeperListener {
public Pair<Integer, Integer> getReopenStatus(byte[] tableName)
throws IOException {
List <HRegionInfo> hris =
MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName);
MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName, true);
Integer pending = 0;
for (HRegionInfo hri : hris) {
String name = hri.getEncodedName();
@ -1258,7 +1262,7 @@ public class AssignmentManager extends ZooKeeperListener {
*/
public void regionOffline(final HRegionInfo regionInfo) {
regionStates.regionOffline(regionInfo);
removeClosedRegion(regionInfo);
// remove the region plan as well just in case.
clearRegionPlan(regionInfo);
}
@ -2408,8 +2412,8 @@ public class AssignmentManager extends ZooKeeperListener {
LOG.info("The table " + tableName
+ " is in DISABLING state. Hence recovering by moving the table"
+ " to DISABLED state.");
new DisableTableHandler(this.server, tableName.getBytes(),
catalogTracker, this, true).process();
new DisableTableHandler(this.server, tableName.getBytes(), catalogTracker,
this, tableLockManager, true).prepare().process();
}
}
}
@ -2434,7 +2438,7 @@ public class AssignmentManager extends ZooKeeperListener {
// enableTable in sync way during master startup,
// no need to invoke coprocessor
new EnableTableHandler(this.server, tableName.getBytes(),
catalogTracker, this, true).process();
catalogTracker, this, tableLockManager, true).prepare().process();
}
}
}

View File

@ -316,6 +316,9 @@ Server {
private TableDescriptors tableDescriptors;
// Table level lock manager for schema changes
private TableLockManager tableLockManager;
// Time stamps for when a hmaster was started and when it became active
private long masterStartTime;
private long masterActiveTime;
@ -566,7 +569,8 @@ Server {
this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
this.loadBalancerTracker.start();
this.assignmentManager = new AssignmentManager(this, serverManager,
this.catalogTracker, this.balancer, this.executorService, this.metricsMaster);
this.catalogTracker, this.balancer, this.executorService, this.metricsMaster,
this.tableLockManager);
zooKeeper.registerListenerFirst(assignmentManager);
this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
@ -703,6 +707,13 @@ Server {
startServiceThreads();
}
//Initialize table lock manager, and ensure that all write locks held previously
//are invalidated
this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper, serverName);
if (!masterRecovery) {
this.tableLockManager.reapAllTableWriteLocks();
}
// Wait for region servers to report in.
this.serverManager.waitForRegionServers(status);
// Check zk for region servers that are up but didn't register
@ -1508,7 +1519,7 @@ Server {
this.executorService.submit(new CreateTableHandler(this,
this.fileSystemManager, hTableDescriptor, conf,
newRegions, catalogTracker, assignmentManager));
newRegions, this).prepare());
if (cpHost != null) {
cpHost.postCreateTable(hTableDescriptor, newRegions);
}
@ -1575,7 +1586,7 @@ Server {
if (cpHost != null) {
cpHost.preDeleteTable(tableName);
}
this.executorService.submit(new DeleteTableHandler(tableName, this, this));
this.executorService.submit(new DeleteTableHandler(tableName, this, this).prepare());
if (cpHost != null) {
cpHost.postDeleteTable(tableName);
}
@ -1629,7 +1640,9 @@ Server {
return;
}
}
new TableAddFamilyHandler(tableName, column, this, this).process();
//TODO: we should process this (and some others) in an executor
new TableAddFamilyHandler(tableName, column, this, this)
.prepare().process();
if (cpHost != null) {
cpHost.postAddColumn(tableName, column);
}
@ -1657,7 +1670,8 @@ Server {
return;
}
}
new TableModifyFamilyHandler(tableName, descriptor, this, this).process();
new TableModifyFamilyHandler(tableName, descriptor, this, this)
.prepare().process();
if (cpHost != null) {
cpHost.postModifyColumn(tableName, descriptor);
}
@ -1684,7 +1698,7 @@ Server {
return;
}
}
new TableDeleteFamilyHandler(tableName, columnName, this, this).process();
new TableDeleteFamilyHandler(tableName, columnName, this, this).prepare().process();
if (cpHost != null) {
cpHost.postDeleteColumn(tableName, columnName);
}
@ -1708,7 +1722,7 @@ Server {
cpHost.preEnableTable(tableName);
}
this.executorService.submit(new EnableTableHandler(this, tableName,
catalogTracker, assignmentManager, false));
catalogTracker, assignmentManager, tableLockManager, false).prepare());
if (cpHost != null) {
cpHost.postEnableTable(tableName);
}
@ -1732,7 +1746,7 @@ Server {
cpHost.preDisableTable(tableName);
}
this.executorService.submit(new DisableTableHandler(this, tableName,
catalogTracker, assignmentManager, false));
catalogTracker, assignmentManager, tableLockManager, false).prepare());
if (cpHost != null) {
cpHost.postDisableTable(tableName);
}
@ -1792,8 +1806,7 @@ Server {
if (cpHost != null) {
cpHost.preModifyTable(tableName, descriptor);
}
new ModifyTableHandler(tableName, descriptor, this, this).process();
new ModifyTableHandler(tableName, descriptor, this, this).prepare().process();
if (cpHost != null) {
cpHost.postModifyTable(tableName, descriptor);
}
@ -2056,12 +2069,17 @@ Server {
return this.assignmentManager;
}
@Override
public TableLockManager getTableLockManager() {
return this.tableLockManager;
}
public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
return rsFatals;
}
public void shutdown() throws IOException {
if (spanReceiverHost != null) {
if (spanReceiverHost != null) {
spanReceiverHost.closeReceivers();
}
if (cpHost != null) {

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import com.google.protobuf.Service;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
@ -30,6 +29,8 @@ import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.executor.ExecutorService;
import com.google.protobuf.Service;
/**
* Services Master supplies
*/
@ -55,6 +56,11 @@ public interface MasterServices extends Server {
*/
public ExecutorService getExecutorService();
/**
* @return Master's instance of {@link TableLockManager}
*/
public TableLockManager getTableLockManager();
/**
* @return Master's instance of {@link MasterCoprocessorHost}
*/

View File

@ -0,0 +1,387 @@
/*
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.InterProcessLock;
import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler;
import org.apache.hadoop.hbase.InterProcessReadWriteLock;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.LockTimeoutException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.lock.ZKInterProcessReadWriteLock;
import org.apache.zookeeper.KeeperException;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* A manager for distributed table level locks.
*/
@InterfaceAudience.Private
public abstract class TableLockManager {
private static final Log LOG = LogFactory.getLog(TableLockManager.class);
/** Configuration key for enabling table-level locks for schema changes */
public static final String TABLE_LOCK_ENABLE =
"hbase.table.lock.enable";
/** by default we should enable table-level locks for schema changes */
private static final boolean DEFAULT_TABLE_LOCK_ENABLE = true;
/** Configuration key for time out for trying to acquire table locks */
protected static final String TABLE_WRITE_LOCK_TIMEOUT_MS =
"hbase.table.write.lock.timeout.ms";
/** Configuration key for time out for trying to acquire table locks */
protected static final String TABLE_READ_LOCK_TIMEOUT_MS =
"hbase.table.read.lock.timeout.ms";
protected static final int DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS =
600 * 1000; //10 min default
protected static final int DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS =
600 * 1000; //10 min default
/**
* A distributed lock for a table.
*/
@InterfaceAudience.Private
public static interface TableLock {
/**
* Acquire the lock, with the configured lock timeout.
* @throws LockTimeoutException If unable to acquire a lock within a specified
* time period (if any)
* @throws IOException If unrecoverable error occurs
*/
public void acquire() throws IOException;
/**
* Release the lock already held.
* @throws IOException If there is an unrecoverable error releasing the lock
*/
public void release() throws IOException;
}
/**
* Returns a TableLock for locking the table for exclusive access
* @param tableName Table to lock
* @param purpose Human readable reason for locking the table
* @return A new TableLock object for acquiring a write lock
*/
public abstract TableLock writeLock(byte[] tableName, String purpose);
/**
* Returns a TableLock for locking the table for shared access among read-lock holders
* @param tableName Table to lock
* @param purpose Human readable reason for locking the table
* @return A new TableLock object for acquiring a read lock
*/
public abstract TableLock readLock(byte[] tableName, String purpose);
/**
* Force releases all table write locks and lock attempts even if this thread does
* not own the lock. The behavior of the lock holders still thinking that they
* have the lock is undefined. This should be used carefully and only when
* we can ensure that all write-lock holders have died. For example if only
* the master can hold write locks, then we can reap it's locks when the backup
* master starts.
*/
public abstract void reapAllTableWriteLocks() throws IOException;
/**
* Called after a table has been deleted, and after the table lock is released.
* TableLockManager should do cleanup for the table state.
* @param tableName name of the table
* @throws IOException If there is an unrecoverable error releasing the lock
*/
public abstract void tableDeleted(byte[] tableName)
throws IOException;
/**
* Creates and returns a TableLockManager according to the configuration
*/
public static TableLockManager createTableLockManager(Configuration conf,
ZooKeeperWatcher zkWatcher, ServerName serverName) {
// Initialize table level lock manager for schema changes, if enabled.
if (conf.getBoolean(TABLE_LOCK_ENABLE,
DEFAULT_TABLE_LOCK_ENABLE)) {
int writeLockTimeoutMs = conf.getInt(TABLE_WRITE_LOCK_TIMEOUT_MS,
DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS);
int readLockTimeoutMs = conf.getInt(TABLE_READ_LOCK_TIMEOUT_MS,
DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS);
return new ZKTableLockManager(zkWatcher, serverName, writeLockTimeoutMs, readLockTimeoutMs);
}
return new NullTableLockManager();
}
/**
* A null implementation
*/
@InterfaceAudience.Private
static class NullTableLockManager extends TableLockManager {
static class NullTableLock implements TableLock {
@Override
public void acquire() throws IOException {
}
@Override
public void release() throws IOException {
}
}
@Override
public TableLock writeLock(byte[] tableName, String purpose) {
return new NullTableLock();
}
@Override
public TableLock readLock(byte[] tableName, String purpose) {
return new NullTableLock();
}
@Override
public void reapAllTableWriteLocks() throws IOException {
}
@Override
public void tableDeleted(byte[] tableName) throws IOException {
}
}
/**
* ZooKeeper based TableLockManager
*/
@InterfaceAudience.Private
private static class ZKTableLockManager extends TableLockManager {
private static final MetadataHandler METADATA_HANDLER = new MetadataHandler() {
@Override
public void handleMetadata(byte[] ownerMetadata) {
if (!LOG.isDebugEnabled()) {
return;
}
ZooKeeperProtos.TableLock data = fromBytes(ownerMetadata);
if (data == null) {
return;
}
LOG.debug("Table is locked by: " +
String.format("[tableName=%s, lockOwner=%s, threadId=%s, " +
"purpose=%s, isShared=%s]", Bytes.toString(data.getTableName().toByteArray()),
ProtobufUtil.toServerName(data.getLockOwner()), data.getThreadId(),
data.getPurpose(), data.getIsShared()));
}
};
private static class TableLockImpl implements TableLock {
long lockTimeoutMs;
byte[] tableName;
String tableNameStr;
InterProcessLock lock;
boolean isShared;
ZooKeeperWatcher zkWatcher;
ServerName serverName;
String purpose;
public TableLockImpl(byte[] tableName, ZooKeeperWatcher zkWatcher,
ServerName serverName, long lockTimeoutMs, boolean isShared, String purpose) {
this.tableName = tableName;
tableNameStr = Bytes.toString(tableName);
this.zkWatcher = zkWatcher;
this.serverName = serverName;
this.lockTimeoutMs = lockTimeoutMs;
this.isShared = isShared;
this.purpose = purpose;
}
@Override
public void acquire() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Attempt to acquire table " + (isShared ? "read" : "write")
+ " lock on :" + tableNameStr + " for:" + purpose);
}
lock = createTableLock();
try {
if (lockTimeoutMs == -1) {
// Wait indefinitely
lock.acquire();
} else {
if (!lock.tryAcquire(lockTimeoutMs)) {
throw new LockTimeoutException("Timed out acquiring " +
(isShared ? "read" : "write") + "lock for table:" + tableNameStr +
"for:" + purpose + " after " + lockTimeoutMs + " ms.");
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted acquiring a lock for " + tableNameStr, e);
Thread.currentThread().interrupt();
throw new InterruptedIOException("Interrupted acquiring a lock");
}
LOG.debug("Acquired table " + (isShared ? "read" : "write")
+ " lock on :" + tableNameStr + " for:" + purpose);
}
@Override
public void release() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Attempt to release table " + (isShared ? "read" : "write")
+ " lock on :" + tableNameStr);
}
if (lock == null) {
throw new IllegalStateException("Table " + tableNameStr +
" is not locked!");
}
try {
lock.release();
} catch (InterruptedException e) {
LOG.warn("Interrupted while releasing a lock for " + tableNameStr);
Thread.currentThread().interrupt();
throw new InterruptedIOException();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Released table lock on :" + tableNameStr);
}
}
private InterProcessLock createTableLock() {
String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableNameStr);
ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder()
.setTableName(ByteString.copyFrom(tableName))
.setLockOwner(ProtobufUtil.toServerName(serverName))
.setThreadId(Thread.currentThread().getId())
.setPurpose(purpose)
.setIsShared(isShared).build();
byte[] lockMetadata = toBytes(data);
InterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(zkWatcher, tableLockZNode,
METADATA_HANDLER);
return isShared ? lock.readLock(lockMetadata) : lock.writeLock(lockMetadata);
}
}
private static byte[] toBytes(ZooKeeperProtos.TableLock data) {
return ProtobufUtil.prependPBMagic(data.toByteArray());
}
private static ZooKeeperProtos.TableLock fromBytes(byte[] bytes) {
int pblen = ProtobufUtil.lengthOfPBMagic();
if (bytes == null || bytes.length < pblen) {
return null;
}
try {
ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder().mergeFrom(
bytes, pblen, bytes.length - pblen).build();
return data;
} catch (InvalidProtocolBufferException ex) {
LOG.warn("Exception in deserialization", ex);
}
return null;
}
private final ServerName serverName;
private final ZooKeeperWatcher zkWatcher;
private final long writeLockTimeoutMs;
private final long readLockTimeoutMs;
/**
* Initialize a new manager for table-level locks.
* @param zkWatcher
* @param serverName Address of the server responsible for acquiring and
* releasing the table-level locks
* @param writeLockTimeoutMs Timeout (in milliseconds) for acquiring a write lock for a
* given table, or -1 for no timeout
* @param readLockTimeoutMs Timeout (in milliseconds) for acquiring a read lock for a
* given table, or -1 for no timeout
*/
public ZKTableLockManager(ZooKeeperWatcher zkWatcher,
ServerName serverName, long writeLockTimeoutMs, long readLockTimeoutMs) {
this.zkWatcher = zkWatcher;
this.serverName = serverName;
this.writeLockTimeoutMs = writeLockTimeoutMs;
this.readLockTimeoutMs = readLockTimeoutMs;
}
@Override
public TableLock writeLock(byte[] tableName, String purpose) {
return new TableLockImpl(tableName, zkWatcher,
serverName, writeLockTimeoutMs, false, purpose);
}
public TableLock readLock(byte[] tableName, String purpose) {
return new TableLockImpl(tableName, zkWatcher,
serverName, readLockTimeoutMs, true, purpose);
}
@Override
public void reapAllTableWriteLocks() throws IOException {
//get the table names
try {
List<String> tableNames;
try {
tableNames = ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.tableLockZNode);
} catch (KeeperException e) {
LOG.error("Unexpected ZooKeeper error when listing children", e);
throw new IOException("Unexpected ZooKeeper exception", e);
}
for (String tableName : tableNames) {
String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
zkWatcher, tableLockZNode, null);
lock.writeLock(null).reapAllLocks();
}
} catch (IOException ex) {
throw ex;
} catch (Exception ex) {
LOG.warn("Caught exception while reaping table write locks", ex);
}
}
@Override
public void tableDeleted(byte[] tableName) throws IOException {
//table write lock from DeleteHandler is already released, just delete the parent znode
String tableNameStr = Bytes.toString(tableName);
String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableNameStr);
try {
ZKUtil.deleteNode(zkWatcher, tableLockZNode);
} catch (KeeperException ex) {
if (ex.code() == KeeperException.Code.NOTEMPTY) {
//we might get this in rare occasions where a CREATE table or some other table operation
//is waiting to acquire the lock. In this case, parent znode won't be deleted.
LOG.warn("Could not delete the znode for table locks because NOTEMPTY: "
+ tableLockZNode);
return;
}
throw new IOException(ex);
}
}
}
}

View File

@ -50,6 +50,9 @@ import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.Threads;
@ -66,21 +69,29 @@ public class CreateTableHandler extends EventHandler {
protected final Configuration conf;
private final AssignmentManager assignmentManager;
private final CatalogTracker catalogTracker;
private final TableLockManager tableLockManager;
private final HRegionInfo [] newRegions;
private final TableLock tableLock;
public CreateTableHandler(Server server, MasterFileSystem fileSystemManager,
HTableDescriptor hTableDescriptor, Configuration conf, HRegionInfo [] newRegions,
CatalogTracker catalogTracker, AssignmentManager assignmentManager)
throws NotAllMetaRegionsOnlineException, TableExistsException, IOException {
MasterServices masterServices) {
super(server, EventType.C_M_CREATE_TABLE);
this.fileSystemManager = fileSystemManager;
this.hTableDescriptor = hTableDescriptor;
this.conf = conf;
this.newRegions = newRegions;
this.catalogTracker = catalogTracker;
this.assignmentManager = assignmentManager;
this.catalogTracker = masterServices.getCatalogTracker();
this.assignmentManager = masterServices.getAssignmentManager();
this.tableLockManager = masterServices.getTableLockManager();
this.tableLock = this.tableLockManager.writeLock(this.hTableDescriptor.getName()
, EventType.C_M_CREATE_TABLE.toString());
}
public CreateTableHandler prepare()
throws NotAllMetaRegionsOnlineException, TableExistsException, IOException {
int timeout = conf.getInt("hbase.client.catalog.timeout", 10000);
// Need META availability to create a table
try {
@ -94,27 +105,39 @@ public class CreateTableHandler extends EventHandler {
throw ie;
}
String tableName = this.hTableDescriptor.getNameAsString();
if (MetaReader.tableExists(catalogTracker, tableName)) {
throw new TableExistsException(tableName);
}
// If we have multiple client threads trying to create the table at the
// same time, given the async nature of the operation, the table
// could be in a state where .META. table hasn't been updated yet in
// the process() function.
// Use enabling state to tell if there is already a request for the same
// table in progress. This will introduce a new zookeeper call. Given
// createTable isn't a frequent operation, that should be ok.
//acquire the table write lock, blocking. Make sure that it is released.
this.tableLock.acquire();
boolean success = false;
try {
if (!this.assignmentManager.getZKTable().checkAndSetEnablingTable(tableName))
String tableName = this.hTableDescriptor.getNameAsString();
if (MetaReader.tableExists(catalogTracker, tableName)) {
throw new TableExistsException(tableName);
} catch (KeeperException e) {
throw new IOException("Unable to ensure that the table will be" +
" enabling because of a ZooKeeper issue", e);
}
}
}
// If we have multiple client threads trying to create the table at the
// same time, given the async nature of the operation, the table
// could be in a state where .META. table hasn't been updated yet in
// the process() function.
// Use enabling state to tell if there is already a request for the same
// table in progress. This will introduce a new zookeeper call. Given
// createTable isn't a frequent operation, that should be ok.
//TODO: now that we have table locks, re-evaluate above
try {
if (!this.assignmentManager.getZKTable().checkAndSetEnablingTable(tableName)) {
throw new TableExistsException(tableName);
}
} catch (KeeperException e) {
throw new IOException("Unable to ensure that the table will be" +
" enabling because of a ZooKeeper issue", e);
}
success = true;
} finally {
if (!success) {
releaseTableLock();
}
}
return this;
}
@Override
public String toString() {
@ -129,8 +152,9 @@ public class CreateTableHandler extends EventHandler {
@Override
public void process() {
String tableName = this.hTableDescriptor.getNameAsString();
LOG.info("Attempting to create the table " + tableName);
try {
LOG.info("Attempting to create the table " + tableName);
MasterCoprocessorHost cpHost = ((HMaster) this.server).getCoprocessorHost();
if (cpHost != null) {
cpHost.preCreateTableHandler(this.hTableDescriptor, this.newRegions);
@ -207,6 +231,18 @@ public class CreateTableHandler extends EventHandler {
} catch (KeeperException e) {
throw new IOException("Unable to ensure that " + tableName + " will be" +
" enabled because of a ZooKeeper issue", e);
} finally {
releaseTableLock();
}
}
private void releaseTableLock() {
if (this.tableLock != null) {
try {
this.tableLock.release();
} catch (IOException ex) {
LOG.warn("Could not release the table lock", ex);
}
}
}

View File

@ -44,9 +44,12 @@ public class DeleteTableHandler extends TableEventHandler {
private static final Log LOG = LogFactory.getLog(DeleteTableHandler.class);
public DeleteTableHandler(byte [] tableName, Server server,
final MasterServices masterServices)
throws IOException {
final MasterServices masterServices) {
super(EventType.C_M_DELETE_TABLE, tableName, server, masterServices);
}
@Override
protected void prepareWithTableLock() throws IOException {
// The next call fails if no such table.
getTableDescriptor();
}
@ -113,6 +116,16 @@ public class DeleteTableHandler extends TableEventHandler {
}
}
@Override
protected void releaseTableLock() {
super.releaseTableLock();
try {
masterServices.getTableLockManager().tableDeleted(tableName);
} catch (IOException ex) {
LOG.warn("Received exception from TableLockManager.tableDeleted:", ex); //not critical
}
}
@Override
public String toString() {
String name = "UnknownServerName";

View File

@ -37,6 +37,8 @@ import org.apache.hadoop.hbase.master.BulkAssigner;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
import org.cloudera.htrace.Trace;
@ -50,40 +52,62 @@ public class DisableTableHandler extends EventHandler {
private final byte [] tableName;
private final String tableNameStr;
private final AssignmentManager assignmentManager;
private final TableLockManager tableLockManager;
private final CatalogTracker catalogTracker;
private final boolean skipTableStateCheck;
private TableLock tableLock;
public DisableTableHandler(Server server, byte [] tableName,
CatalogTracker catalogTracker, AssignmentManager assignmentManager,
boolean skipTableStateCheck)
throws TableNotFoundException, TableNotEnabledException, IOException {
TableLockManager tableLockManager, boolean skipTableStateCheck) {
super(server, EventType.C_M_DISABLE_TABLE);
this.tableName = tableName;
this.tableNameStr = Bytes.toString(this.tableName);
this.assignmentManager = assignmentManager;
// Check if table exists
// TODO: do we want to keep this in-memory as well? i guess this is
// part of old master rewrite, schema to zk to check for table
// existence and such
if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) {
throw new TableNotFoundException(this.tableNameStr);
}
this.catalogTracker = catalogTracker;
this.tableLockManager = tableLockManager;
this.skipTableStateCheck = skipTableStateCheck;
}
// There could be multiple client requests trying to disable or enable
// the table at the same time. Ensure only the first request is honored
// After that, no other requests can be accepted until the table reaches
// DISABLED or ENABLED.
if (!skipTableStateCheck)
{
try {
if (!this.assignmentManager.getZKTable().checkEnabledAndSetDisablingTable
(this.tableNameStr)) {
LOG.info("Table " + tableNameStr + " isn't enabled; skipping disable");
throw new TableNotEnabledException(this.tableNameStr);
public DisableTableHandler prepare()
throws TableNotFoundException, TableNotEnabledException, IOException {
//acquire the table write lock, blocking
this.tableLock = this.tableLockManager.writeLock(tableName,
EventType.C_M_DISABLE_TABLE.toString());
this.tableLock.acquire();
boolean success = false;
try {
// Check if table exists
if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) {
throw new TableNotFoundException(this.tableNameStr);
}
// There could be multiple client requests trying to disable or enable
// the table at the same time. Ensure only the first request is honored
// After that, no other requests can be accepted until the table reaches
// DISABLED or ENABLED.
//TODO: reevaluate this since we have table locks now
if (!skipTableStateCheck) {
try {
if (!this.assignmentManager.getZKTable().checkEnabledAndSetDisablingTable
(this.tableNameStr)) {
LOG.info("Table " + tableNameStr + " isn't enabled; skipping disable");
throw new TableNotEnabledException(this.tableNameStr);
}
} catch (KeeperException e) {
throw new IOException("Unable to ensure that the table will be" +
" disabling because of a ZooKeeper issue", e);
}
} catch (KeeperException e) {
throw new IOException("Unable to ensure that the table will be" +
" disabling because of a ZooKeeper issue", e);
}
success = true;
} finally {
if (!success) {
releaseTableLock();
}
}
return this;
}
@Override
@ -113,6 +137,18 @@ public class DisableTableHandler extends EventHandler {
LOG.error("Error trying to disable table " + this.tableNameStr, e);
} catch (KeeperException e) {
LOG.error("Error trying to disable table " + this.tableNameStr, e);
} finally {
releaseTableLock();
}
}
private void releaseTableLock() {
if (this.tableLock != null) {
try {
this.tableLock.release();
} catch (IOException ex) {
LOG.warn("Could not release the table lock", ex);
}
}
}

View File

@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.zookeeper.KeeperException;
@ -55,41 +57,60 @@ public class EnableTableHandler extends EventHandler {
private final byte [] tableName;
private final String tableNameStr;
private final AssignmentManager assignmentManager;
private final CatalogTracker ct;
private final TableLockManager tableLockManager;
private final CatalogTracker catalogTracker;
private boolean retainAssignment = false;
private TableLock tableLock;
public EnableTableHandler(Server server, byte [] tableName,
CatalogTracker catalogTracker, AssignmentManager assignmentManager,
boolean skipTableStateCheck)
throws TableNotFoundException, TableNotDisabledException, IOException {
TableLockManager tableLockManager, boolean skipTableStateCheck) {
super(server, EventType.C_M_ENABLE_TABLE);
this.tableName = tableName;
this.tableNameStr = Bytes.toString(tableName);
this.ct = catalogTracker;
this.catalogTracker = catalogTracker;
this.assignmentManager = assignmentManager;
this.tableLockManager = tableLockManager;
this.retainAssignment = skipTableStateCheck;
// Check if table exists
if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) {
throw new TableNotFoundException(Bytes.toString(tableName));
}
}
// There could be multiple client requests trying to disable or enable
// the table at the same time. Ensure only the first request is honored
// After that, no other requests can be accepted until the table reaches
// DISABLED or ENABLED.
if (!skipTableStateCheck)
{
try {
if (!this.assignmentManager.getZKTable().checkDisabledAndSetEnablingTable
(this.tableNameStr)) {
LOG.info("Table " + tableNameStr + " isn't disabled; skipping enable");
throw new TableNotDisabledException(this.tableNameStr);
public EnableTableHandler prepare()
throws TableNotFoundException, TableNotDisabledException, IOException {
//acquire the table write lock, blocking
this.tableLock = this.tableLockManager.writeLock(tableName,
EventType.C_M_ENABLE_TABLE.toString());
this.tableLock.acquire();
boolean success = false;
try {
// Check if table exists
if (!MetaReader.tableExists(catalogTracker, this.tableNameStr)) {
throw new TableNotFoundException(Bytes.toString(tableName));
}
// There could be multiple client requests trying to disable or enable
// the table at the same time. Ensure only the first request is honored
// After that, no other requests can be accepted until the table reaches
// DISABLED or ENABLED.
if (!retainAssignment) {
try {
if (!this.assignmentManager.getZKTable().checkDisabledAndSetEnablingTable
(this.tableNameStr)) {
LOG.info("Table " + tableNameStr + " isn't disabled; skipping enable");
throw new TableNotDisabledException(this.tableNameStr);
}
} catch (KeeperException e) {
throw new IOException("Unable to ensure that the table will be" +
" enabling because of a ZooKeeper issue", e);
}
} catch (KeeperException e) {
throw new IOException("Unable to ensure that the table will be" +
" enabling because of a ZooKeeper issue", e);
}
success = true;
} finally {
if (!success) {
releaseTableLock();
}
}
return this;
}
@Override
@ -121,6 +142,18 @@ public class EnableTableHandler extends EventHandler {
LOG.error("Error trying to enable the table " + this.tableNameStr, e);
} catch (InterruptedException e) {
LOG.error("Error trying to enable the table " + this.tableNameStr, e);
} finally {
releaseTableLock();
}
}
private void releaseTableLock() {
if (this.tableLock != null) {
try {
this.tableLock.release();
} catch (IOException ex) {
LOG.warn("Could not release the table lock", ex);
}
}
}
@ -134,7 +167,7 @@ public class EnableTableHandler extends EventHandler {
// Get the regions of this table. We're done when all listed
// tables are onlined.
List<Pair<HRegionInfo, ServerName>> tableRegionsAndLocations = MetaReader
.getTableRegionsAndLocations(this.ct, tableName, true);
.getTableRegionsAndLocations(this.catalogTracker, tableName, true);
int countOfRegionsInTable = tableRegionsAndLocations.size();
List<HRegionInfo> regions = regionsToAssignWithServerName(tableRegionsAndLocations);
int regionsCount = regions.size();

View File

@ -35,15 +35,19 @@ public class ModifyTableHandler extends TableEventHandler {
public ModifyTableHandler(final byte [] tableName,
final HTableDescriptor htd, final Server server,
final MasterServices masterServices)
throws IOException {
final MasterServices masterServices) {
super(EventType.C_M_MODIFY_TABLE, tableName, server, masterServices);
// Check table exists.
getTableDescriptor();
// This is the new schema we are going to write out as this modification.
this.htd = htd;
}
@Override
protected void prepareWithTableLock() throws IOException {
super.prepareWithTableLock();
// Check table exists.
getTableDescriptor();
}
@Override
protected void handleTableOperation(List<HRegionInfo> hris)
throws IOException {

View File

@ -40,14 +40,19 @@ public class TableAddFamilyHandler extends TableEventHandler {
private final HColumnDescriptor familyDesc;
public TableAddFamilyHandler(byte[] tableName, HColumnDescriptor familyDesc,
Server server, final MasterServices masterServices) throws IOException {
Server server, final MasterServices masterServices) {
super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices);
this.familyDesc = familyDesc;
}
@Override
protected void prepareWithTableLock() throws IOException {
super.prepareWithTableLock();
HTableDescriptor htd = getTableDescriptor();
if (htd.hasFamily(familyDesc.getName())) {
throw new InvalidFamilyOperationException("Family '" +
familyDesc.getNameAsString() + "' already exists so cannot be added");
}
this.familyDesc = familyDesc;
}
@Override

View File

@ -37,11 +37,17 @@ import org.apache.hadoop.hbase.util.Bytes;
@InterfaceAudience.Private
public class TableDeleteFamilyHandler extends TableEventHandler {
private final byte [] familyName;
private byte [] familyName;
public TableDeleteFamilyHandler(byte[] tableName, byte [] familyName,
Server server, final MasterServices masterServices) throws IOException {
super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices);
super(EventType.C_M_DELETE_FAMILY, tableName, server, masterServices);
this.familyName = familyName;
}
@Override
protected void prepareWithTableLock() throws IOException {
super.prepareWithTableLock();
HTableDescriptor htd = getTableDescriptor();
this.familyName = hasColumnFamily(htd, familyName);
}

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.master.BulkReOpen;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
@ -61,37 +62,61 @@ public abstract class TableEventHandler extends EventHandler {
protected final MasterServices masterServices;
protected final byte [] tableName;
protected final String tableNameStr;
protected TableLock tableLock;
public TableEventHandler(EventType eventType, byte [] tableName, Server server,
MasterServices masterServices)
throws IOException {
MasterServices masterServices) {
super(server, eventType);
this.masterServices = masterServices;
this.tableName = tableName;
try {
this.masterServices.checkTableModifiable(tableName);
} catch (TableNotDisabledException ex) {
if (isOnlineSchemaChangeAllowed()
&& eventType.isOnlineSchemaChangeSupported()) {
LOG.debug("Ignoring table not disabled exception " +
"for supporting online schema changes.");
} else {
throw ex;
}
}
this.tableNameStr = Bytes.toString(this.tableName);
}
private boolean isOnlineSchemaChangeAllowed() {
return this.server.getConfiguration().getBoolean(
"hbase.online.schema.update.enable", false);
}
public TableEventHandler prepare() throws IOException {
//acquire the table write lock, blocking
this.tableLock = masterServices.getTableLockManager()
.writeLock(tableName, eventType.toString());
this.tableLock.acquire();
boolean success = false;
try {
try {
this.masterServices.checkTableModifiable(tableName);
} catch (TableNotDisabledException ex) {
if (isOnlineSchemaChangeAllowed()
&& eventType.isOnlineSchemaChangeSupported()) {
LOG.debug("Ignoring table not disabled exception " +
"for supporting online schema changes.");
} else {
throw ex;
}
}
prepareWithTableLock();
success = true;
} finally {
if (!success ) {
releaseTableLock();
}
}
return this;
}
/** Called from prepare() while holding the table lock. Subclasses
* can do extra initialization, and not worry about the releasing
* the table lock. */
protected void prepareWithTableLock() throws IOException {
}
private boolean isOnlineSchemaChangeAllowed() {
return this.server.getConfiguration().getBoolean(
"hbase.online.schema.update.enable", false);
}
@Override
public void process() {
try {
LOG.info("Handling table operation " + eventType + " on table " +
Bytes.toString(tableName));
List<HRegionInfo> hris =
MetaReader.getTableRegions(this.server.getCatalogTracker(),
tableName);
@ -110,7 +135,19 @@ public abstract class TableEventHandler extends EventHandler {
LOG.error("Error manipulating table " + Bytes.toString(tableName), e);
} catch (KeeperException e) {
LOG.error("Error manipulating table " + Bytes.toString(tableName), e);
}
} finally {
releaseTableLock();
}
}
protected void releaseTableLock() {
if (this.tableLock != null) {
try {
this.tableLock.release();
} catch (IOException ex) {
LOG.warn("Could not release the table lock", ex);
}
}
}
public boolean reOpenAllRegions(List<HRegionInfo> regions) throws IOException {
@ -137,7 +174,7 @@ public abstract class TableEventHandler extends EventHandler {
reRegions.add(hri);
serverToRegions.get(rsLocation).add(hri);
}
LOG.info("Reopening " + reRegions.size() + " regions on "
+ serverToRegions.size() + " region servers.");
this.masterServices.getAssignmentManager().setRegionsToReopen(reRegions);

View File

@ -25,12 +25,10 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Handles adding a new family to an existing table.
@ -41,11 +39,16 @@ public class TableModifyFamilyHandler extends TableEventHandler {
public TableModifyFamilyHandler(byte[] tableName,
HColumnDescriptor familyDesc, Server server,
final MasterServices masterServices) throws IOException {
final MasterServices masterServices) {
super(EventType.C_M_MODIFY_FAMILY, tableName, server, masterServices);
this.familyDesc = familyDesc;
}
@Override
protected void prepareWithTableLock() throws IOException {
super.prepareWithTableLock();
HTableDescriptor htd = getTableDescriptor();
hasColumnFamily(htd, familyDesc.getName());
this.familyDesc = familyDesc;
}
@Override

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.master.MasterServices;
@ -42,7 +41,6 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescriptio
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.base.Preconditions;
@ -68,8 +66,7 @@ public class CloneSnapshotHandler extends CreateTableHandler implements Snapshot
final SnapshotDescription snapshot, final HTableDescriptor hTableDescriptor)
throws NotAllMetaRegionsOnlineException, TableExistsException, IOException {
super(masterServices, masterServices.getMasterFileSystem(), hTableDescriptor,
masterServices.getConfiguration(), null, masterServices.getCatalogTracker(),
masterServices.getAssignmentManager());
masterServices.getConfiguration(), null, masterServices);
// Snapshot information
this.snapshot = snapshot;

View File

@ -0,0 +1,101 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
/**
* A ZooKeeper watcher meant to detect deletions of ZNodes.
*/
@InterfaceAudience.Private
public class DeletionListener extends ZooKeeperListener {
private static final Log LOG = LogFactory.getLog(DeletionListener.class);
private final String pathToWatch;
private final CountDownLatch deletedLatch;
private volatile Throwable exception;
/**
* Create a new instance of the deletion watcher.
* @param zkWatcher ZookeeperWatcher instance
* @param pathToWatch (Fully qualified) ZNode path that we are waiting to
* be deleted.
* @param deletedLatch Count down on this latch when deletion has occured.
*/
public DeletionListener(ZooKeeperWatcher zkWatcher, String pathToWatch,
CountDownLatch deletedLatch) {
super(zkWatcher);
this.pathToWatch = pathToWatch;
this.deletedLatch = deletedLatch;
exception = null;
}
/**
* Check if an exception has occurred when re-setting the watch.
* @return True if we were unable to re-set a watch on a ZNode due to
* an exception.
*/
public boolean hasException() {
return exception != null;
}
/**
* Get the last exception which has occurred when re-setting the watch.
* Use hasException() to check whether or not an exception has occurred.
* @return The last exception observed when re-setting the watch.
*/
public Throwable getException() {
return exception;
}
@Override
public void nodeDataChanged(String path) {
if (!path.equals(pathToWatch)) {
return;
}
try {
if (!(ZKUtil.setWatchIfNodeExists(watcher, pathToWatch))) {
deletedLatch.countDown();
}
} catch (KeeperException ex) {
exception = ex;
deletedLatch.countDown();
LOG.error("Error when re-setting the watch on " + pathToWatch, ex);
}
}
@Override
public void nodeDeleted(String path) {
if (!path.equals(pathToWatch)) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Processing delete on " + pathToWatch);
}
deletedLatch.countDown();
}
}

View File

@ -49,24 +49,24 @@ import org.apache.zookeeper.proto.SetDataRequest;
/**
* A zookeeper that can handle 'recoverable' errors.
* To handle recoverable errors, developers need to realize that there are two
* classes of requests: idempotent and non-idempotent requests. Read requests
* and unconditional sets and deletes are examples of idempotent requests, they
* can be reissued with the same results.
* (Although, the delete may throw a NoNodeException on reissue its effect on
* the ZooKeeper state is the same.) Non-idempotent requests need special
* handling, application and library writers need to keep in mind that they may
* need to encode information in the data or name of znodes to detect
* retries. A simple example is a create that uses a sequence flag.
* If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
* loss exception, that process will reissue another
* create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
* getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
* that x-109 was the result of the previous create, so the process actually
* owns both x-109 and x-111. An easy way around this is to use "x-process id-"
* To handle recoverable errors, developers need to realize that there are two
* classes of requests: idempotent and non-idempotent requests. Read requests
* and unconditional sets and deletes are examples of idempotent requests, they
* can be reissued with the same results.
* (Although, the delete may throw a NoNodeException on reissue its effect on
* the ZooKeeper state is the same.) Non-idempotent requests need special
* handling, application and library writers need to keep in mind that they may
* need to encode information in the data or name of znodes to detect
* retries. A simple example is a create that uses a sequence flag.
* If a process issues a create("/x-", ..., SEQUENCE) and gets a connection
* loss exception, that process will reissue another
* create("/x-", ..., SEQUENCE) and get back x-111. When the process does a
* getChildren("/"), it sees x-1,x-30,x-109,x-110,x-111, now it could be
* that x-109 was the result of the previous create, so the process actually
* owns both x-109 and x-111. An easy way around this is to use "x-process id-"
* when doing the create. If the process is using an id of 352, before reissuing
* the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
* "x-352-109", x-333-110". The process will know that the original create
* the create it will do a getChildren("/") and see "x-222-1", "x-542-30",
* "x-352-109", x-333-110". The process will know that the original create
* succeeded an the znode it created is "x-352-109".
* @see "http://wiki.apache.org/hadoop/ZooKeeper/ErrorHandling"
*/
@ -99,15 +99,25 @@ public class RecoverableZooKeeper {
private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT;
public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
Watcher watcher, int maxRetries, int retryIntervalMillis)
Watcher watcher, int maxRetries, int retryIntervalMillis)
throws IOException {
this(quorumServers, sessionTimeout, watcher, maxRetries, retryIntervalMillis,
null);
}
public RecoverableZooKeeper(String quorumServers, int sessionTimeout,
Watcher watcher, int maxRetries, int retryIntervalMillis, String identifier)
throws IOException {
this.zk = new ZooKeeper(quorumServers, sessionTimeout, watcher);
this.retryCounterFactory =
new RetryCounterFactory(maxRetries, retryIntervalMillis);
// the identifier = processID@hostName
this.identifier = ManagementFactory.getRuntimeMXBean().getName();
if (identifier == null || identifier.length() == 0) {
// the identifier = processID@hostName
identifier = ManagementFactory.getRuntimeMXBean().getName();
}
LOG.info("The identifier of this process is " + identifier);
this.identifier = identifier;
this.id = Bytes.toBytes(identifier);
this.watcher = watcher;
@ -343,7 +353,7 @@ public class RecoverableZooKeeper {
/**
* setData is NOT an idempotent operation. Retry may cause BadVersion Exception
* Adding an identifier field into the data to check whether
* Adding an identifier field into the data to check whether
* badversion is caused by the result of previous correctly setData
* @return Stat instance
*/
@ -390,17 +400,17 @@ public class RecoverableZooKeeper {
/**
* <p>
* NONSEQUENTIAL create is idempotent operation.
* NONSEQUENTIAL create is idempotent operation.
* Retry before throwing exceptions.
* But this function will not throw the NodeExist exception back to the
* application.
* </p>
* <p>
* But SEQUENTIAL is NOT idempotent operation. It is necessary to add
* identifier to the path to verify, whether the previous one is successful
* But SEQUENTIAL is NOT idempotent operation. It is necessary to add
* identifier to the path to verify, whether the previous one is successful
* or not.
* </p>
*
*
* @return Path
*/
public String create(String path, byte[] data, List<ACL> acl,
@ -417,12 +427,12 @@ public class RecoverableZooKeeper {
return createSequential(path, newData, acl, createMode);
default:
throw new IllegalArgumentException("Unrecognized CreateMode: " +
throw new IllegalArgumentException("Unrecognized CreateMode: " +
createMode);
}
}
private String createNonSequential(String path, byte[] data, List<ACL> acl,
private String createNonSequential(String path, byte[] data, List<ACL> acl,
CreateMode createMode) throws KeeperException, InterruptedException {
RetryCounter retryCounter = retryCounterFactory.create();
boolean isRetry = false; // False for first attempt, true for all retries.
@ -435,14 +445,14 @@ public class RecoverableZooKeeper {
if (isRetry) {
// If the connection was lost, there is still a possibility that
// we have successfully created the node at our previous attempt,
// so we read the node and compare.
// so we read the node and compare.
byte[] currentData = zk.getData(path, false, null);
if (currentData != null &&
Bytes.compareTo(currentData, data) == 0) {
Bytes.compareTo(currentData, data) == 0) {
// We successfully created a non-sequential node
return path;
}
LOG.error("Node " + path + " already exists with " +
LOG.error("Node " + path + " already exists with " +
Bytes.toStringBinary(currentData) + ", could not write " +
Bytes.toStringBinary(data));
throw e;
@ -466,8 +476,8 @@ public class RecoverableZooKeeper {
isRetry = true;
}
}
private String createSequential(String path, byte[] data,
private String createSequential(String path, byte[] data,
List<ACL> acl, CreateMode createMode)
throws KeeperException, InterruptedException {
RetryCounter retryCounter = retryCounterFactory.create();
@ -573,7 +583,7 @@ public class RecoverableZooKeeper {
}
return null;
}
public byte[] removeMetaData(byte[] data) {
if(data == null || data.length == 0) {
return data;
@ -642,7 +652,7 @@ public class RecoverableZooKeeper {
* @param prefixes the prefixes to include in the result
* @return list of every element that starts with one of the prefixes
*/
private static List<String> filterByPrefix(List<String> nodes,
private static List<String> filterByPrefix(List<String> nodes,
String... prefixes) {
List<String> lockChildren = new ArrayList<String>();
for (String child : nodes){
@ -655,4 +665,8 @@ public class RecoverableZooKeeper {
}
return lockChildren;
}
public String getIdentifier() {
return identifier;
}
}

View File

@ -82,7 +82,7 @@ public class ZKUtil {
private static final Log LOG = LogFactory.getLog(ZKUtil.class);
// TODO: Replace this with ZooKeeper constant when ZOOKEEPER-277 is resolved.
private static final char ZNODE_PATH_SEPARATOR = '/';
public static final char ZNODE_PATH_SEPARATOR = '/';
private static int zkDumpConnectionTimeOut;
/**
@ -106,18 +106,18 @@ public class ZKUtil {
public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
Watcher watcher)
throws IOException {
return connect(conf, ensemble, watcher, "");
return connect(conf, ensemble, watcher, null);
}
public static RecoverableZooKeeper connect(Configuration conf, String ensemble,
Watcher watcher, final String descriptor)
Watcher watcher, final String identifier)
throws IOException {
if(ensemble == null) {
throw new IOException("Unable to determine ZooKeeper ensemble");
}
int timeout = conf.getInt(HConstants.ZK_SESSION_TIMEOUT,
HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
LOG.debug(descriptor + " opening connection to ZooKeeper with ensemble (" +
LOG.debug(identifier + " opening connection to ZooKeeper with ensemble (" +
ensemble + ")");
int retry = conf.getInt("zookeeper.recovery.retry", 3);
int retryIntervalMillis =
@ -125,7 +125,7 @@ public class ZKUtil {
zkDumpConnectionTimeOut = conf.getInt("zookeeper.dump.connection.timeout",
1000);
return new RecoverableZooKeeper(ensemble, timeout, watcher,
retry, retryIntervalMillis);
retry, retryIntervalMillis, identifier);
}
/**
@ -436,6 +436,30 @@ public class ZKUtil {
}
}
/**
* Watch the specified znode, but only if exists. Useful when watching
* for deletions. Uses .getData() (and handles NoNodeException) instead
* of .exists() to accomplish this, as .getData() will only set a watch if
* the znode exists.
* @param zkw zk reference
* @param znode path of node to watch
* @return true if the watch is set, false if node does not exists
* @throws KeeperException if unexpected zookeeper exception
*/
public static boolean setWatchIfNodeExists(ZooKeeperWatcher zkw, String znode)
throws KeeperException {
try {
zkw.getRecoverableZooKeeper().getData(znode, true, null);
return true;
} catch (NoNodeException e) {
return false;
} catch (InterruptedException e) {
LOG.warn(zkw.prefix("Unable to set watcher on znode " + znode), e);
zkw.interruptedException(e);
return false;
}
}
/**
* Check if the specified node exists. Sets no watches.
*
@ -525,15 +549,13 @@ public class ZKUtil {
/**
* Lists the children of the specified znode without setting any watches.
*
* Used to list the currently online regionservers and their addresses.
*
* Sets no watches at all, this method is best effort.
*
* Returns an empty list if the node has no children. Returns null if the
* parent node itself does not exist.
*
* @param zkw zookeeper reference
* @param znode node to get children of as addresses
* @param znode node to get children
* @return list of data of children of specified znode, empty if no children,
* null if parent does not exist
* @throws KeeperException if unexpected zookeeper exception
@ -1025,6 +1047,36 @@ public class ZKUtil {
return true;
}
/**
* Creates the specified znode with the specified data but does not watch it.
*
* Returns the znode of the newly created node
*
* If there is another problem, a KeeperException will be thrown.
*
* @param zkw zk reference
* @param znode path of node
* @param data data of node
* @param createMode specifying whether the node to be created is ephemeral and/or sequential
* @return true name of the newly created znode or null
* @throws KeeperException if unexpected zookeeper exception
*/
public static String createNodeIfNotExistsNoWatch(ZooKeeperWatcher zkw, String znode,
byte[] data, CreateMode createMode) throws KeeperException {
String createdZNode = null;
try {
createdZNode = zkw.getRecoverableZooKeeper().create(znode, data,
createACL(zkw, znode), createMode);
} catch (KeeperException.NodeExistsException nee) {
return znode;
} catch (InterruptedException e) {
zkw.interruptedException(e);
return null;
}
return createdZNode;
}
/**
* Creates the specified node with the specified data and watches it.
*
@ -1346,7 +1398,7 @@ public class ZKUtil {
CreateAndFailSilent op = (CreateAndFailSilent) o;
return getPath().equals(op.getPath()) && Arrays.equals(data, op.data);
}
@Override
public int hashCode() {
int ret = 17 + getPath().hashCode() * 31;
@ -1370,7 +1422,7 @@ public class ZKUtil {
return super.equals(o);
}
@Override
public int hashCode() {
return getPath().hashCode();
@ -1400,7 +1452,7 @@ public class ZKUtil {
SetData op = (SetData) o;
return getPath().equals(op.getPath()) && Arrays.equals(data, op.data);
}
@Override
public int hashCode() {
int ret = getPath().hashCode();

View File

@ -103,6 +103,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
public String splitLogZNode;
// znode containing the state of the load balancer
public String balancerZNode;
// znode containing the lock for the tables
public String tableLockZNode;
// Certain ZooKeeper nodes need to be world-readable
public static final ArrayList<ACL> CREATOR_ALL_AND_WORLD_READABLE =
@ -117,23 +119,23 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
/**
* Instantiate a ZooKeeper connection and watcher.
* @param descriptor Descriptive string that is added to zookeeper sessionid
* and used as identifier for this instance.
* @param identifier string that is passed to RecoverableZookeeper to be used as
* identifier for this instance. Use null for default.
* @throws IOException
* @throws ZooKeeperConnectionException
*/
public ZooKeeperWatcher(Configuration conf, String descriptor,
public ZooKeeperWatcher(Configuration conf, String identifier,
Abortable abortable) throws ZooKeeperConnectionException, IOException {
this(conf, descriptor, abortable, false);
this(conf, identifier, abortable, false);
}
/**
* Instantiate a ZooKeeper connection and watcher.
* @param descriptor Descriptive string that is added to zookeeper sessionid
* and used as identifier for this instance.
* @param identifier string that is passed to RecoverableZookeeper to be used as
* identifier for this instance. Use null for default.
* @throws IOException
* @throws ZooKeeperConnectionException
*/
public ZooKeeperWatcher(Configuration conf, String descriptor,
public ZooKeeperWatcher(Configuration conf, String identifier,
Abortable abortable, boolean canCreateBaseZNode)
throws IOException, ZooKeeperConnectionException {
this.conf = conf;
@ -147,10 +149,10 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
this.quorum = ZKConfig.getZKQuorumServersString(conf);
// Identifier will get the sessionid appended later below down when we
// handle the syncconnect event.
this.identifier = descriptor;
this.identifier = identifier;
this.abortable = abortable;
setNodeNames(conf);
this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, descriptor);
this.recoverableZooKeeper = ZKUtil.connect(conf, quorum, this, identifier);
if (canCreateBaseZNode) {
createBaseZNodes();
}
@ -166,6 +168,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
ZKUtil.createAndFailSilent(this, tableZNode);
ZKUtil.createAndFailSilent(this, splitLogZNode);
ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode);
ZKUtil.createAndFailSilent(this, tableLockZNode);
} catch (KeeperException e) {
throw new ZooKeeperConnectionException(
prefix("Unexpected KeeperException creating base node"), e);
@ -215,6 +218,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
balancerZNode = ZKUtil.joinZNode(baseZNode,
conf.get("zookeeper.znode.balancer", "balancer"));
tableLockZNode = ZKUtil.joinZNode(baseZNode,
conf.get("zookeeper.znode.tableLock", "table-lock"));
}
/**
@ -234,6 +239,10 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
listeners.add(0, listener);
}
public void unregisterListener(ZooKeeperListener listener) {
listeners.remove(listener);
}
/**
* Get the connection to ZooKeeper.
* @return connection reference to zookeeper
@ -355,10 +364,10 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
if (this.abortable != null) this.abortable.abort(msg,
new KeeperException.SessionExpiredException());
break;
case ConnectedReadOnly:
break;
break;
default:
throw new IllegalStateException("Received event is not valid.");
}
@ -437,7 +446,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
public void abort(String why, Throwable e) {
this.abortable.abort(why, e);
}
@Override
public boolean isAborted() {
return this.abortable.isAborted();
@ -449,4 +458,5 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
public String getMasterAddressZNode() {
return this.masterAddressZNode;
}
}

View File

@ -0,0 +1,346 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper.lock;
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.InterProcessLock;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.DeletionListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.BadVersionException;
import org.apache.zookeeper.data.Stat;
import com.google.common.base.Preconditions;
/**
* ZooKeeper based HLock implementation. Based on the Shared Locks recipe.
* (see:
* <a href="http://zookeeper.apache.org/doc/trunk/recipes.html">
* ZooKeeper Recipes and Solutions
* </a>)
*/
@InterfaceAudience.Private
public abstract class ZKInterProcessLockBase implements InterProcessLock {
private static final Log LOG = LogFactory.getLog(ZKInterProcessLockBase.class);
/** ZNode prefix used by processes acquiring reader locks */
protected static final String READ_LOCK_CHILD_NODE_PREFIX = "read-";
/** ZNode prefix used by processes acquiring writer locks */
protected static final String WRITE_LOCK_CHILD_NODE_PREFIX = "write-";
protected final ZooKeeperWatcher zkWatcher;
protected final String parentLockNode;
protected final String fullyQualifiedZNode;
protected final byte[] metadata;
protected final MetadataHandler handler;
// If we acquire a lock, update this field
protected final AtomicReference<AcquiredLock> acquiredLock =
new AtomicReference<AcquiredLock>(null);
/**
* Represents information about a lock held by this thread.
*/
protected static class AcquiredLock {
private final String path;
private final int version;
/**
* Store information about a lock.
* @param path The path to a lock's ZNode
* @param version The current version of the lock's ZNode
*/
public AcquiredLock(String path, int version) {
this.path = path;
this.version = version;
}
public String getPath() {
return path;
}
public int getVersion() {
return version;
}
@Override
public String toString() {
return "AcquiredLockInfo{" +
"path='" + path + '\'' +
", version=" + version +
'}';
}
}
protected static class ZNodeComparator implements Comparator<String> {
public static final ZNodeComparator COMPARATOR = new ZNodeComparator();
private ZNodeComparator() {
}
/** Parses sequenceId from the znode name. Zookeeper documentation
* states: The sequence number is always fixed length of 10 digits, 0 padded
*/
public static int getChildSequenceId(String childZNode) {
Preconditions.checkNotNull(childZNode);
assert childZNode.length() >= 10;
String sequenceIdStr = childZNode.substring(childZNode.length() - 10);
return Integer.parseInt(sequenceIdStr);
}
@Override
public int compare(String zNode1, String zNode2) {
int seq1 = getChildSequenceId(zNode1);
int seq2 = getChildSequenceId(zNode2);
return seq1 - seq2;
}
}
/**
* Called by implementing classes.
* @param zkWatcher
* @param parentLockNode The lock ZNode path
* @param metadata
* @param handler
* @param childNode The prefix for child nodes created under the parent
*/
protected ZKInterProcessLockBase(ZooKeeperWatcher zkWatcher,
String parentLockNode, byte[] metadata, MetadataHandler handler, String childNode) {
this.zkWatcher = zkWatcher;
this.parentLockNode = parentLockNode;
this.fullyQualifiedZNode = ZKUtil.joinZNode(parentLockNode, childNode);
this.metadata = metadata;
this.handler = handler;
try {
ZKUtil.createWithParents(zkWatcher, parentLockNode);
} catch (KeeperException ex) {
LOG.warn("Failed to create znode:" + parentLockNode, ex);
}
}
/**
* {@inheritDoc}
*/
@Override
public void acquire() throws IOException, InterruptedException {
tryAcquire(-1);
}
/**
* {@inheritDoc}
*/
@Override
public boolean tryAcquire(long timeoutMs)
throws IOException, InterruptedException {
boolean hasTimeout = timeoutMs != -1;
long waitUntilMs =
hasTimeout ?EnvironmentEdgeManager.currentTimeMillis() + timeoutMs : -1;
String createdZNode = createLockZNode();
while (true) {
List<String> children;
try {
children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode);
} catch (KeeperException e) {
LOG.error("Unexpected ZooKeeper error when listing children", e);
throw new IOException("Unexpected ZooKeeper exception", e);
}
String pathToWatch;
if ((pathToWatch = getLockPath(createdZNode, children)) == null) {
break;
}
CountDownLatch deletedLatch = new CountDownLatch(1);
String zkPathToWatch =
ZKUtil.joinZNode(parentLockNode, pathToWatch);
DeletionListener deletionListener =
new DeletionListener(zkWatcher, zkPathToWatch, deletedLatch);
zkWatcher.registerListener(deletionListener);
try {
if (ZKUtil.setWatchIfNodeExists(zkWatcher, zkPathToWatch)) {
// Wait for the watcher to fire
if (hasTimeout) {
long remainingMs = waitUntilMs - EnvironmentEdgeManager.currentTimeMillis();
if (remainingMs < 0 ||
!deletedLatch.await(remainingMs, TimeUnit.MILLISECONDS)) {
LOG.warn("Unable to acquire the lock in " + timeoutMs +
" milliseconds.");
try {
ZKUtil.deleteNode(zkWatcher, createdZNode);
} catch (KeeperException e) {
LOG.warn("Unable to remove ZNode " + createdZNode);
}
return false;
}
} else {
deletedLatch.await();
}
if (deletionListener.hasException()) {
Throwable t = deletionListener.getException();
throw new IOException("Exception in the watcher", t);
}
}
} catch (KeeperException e) {
throw new IOException("Unexpected ZooKeeper exception", e);
} finally {
zkWatcher.unregisterListener(deletionListener);
}
}
updateAcquiredLock(createdZNode);
LOG.debug("Successfully acquired a lock for " + createdZNode);
return true;
}
private String createLockZNode() {
try {
return ZKUtil.createNodeIfNotExistsNoWatch(zkWatcher, fullyQualifiedZNode,
metadata, CreateMode.EPHEMERAL_SEQUENTIAL);
} catch (KeeperException ex) {
LOG.warn("Failed to create znode: " + fullyQualifiedZNode, ex);
return null;
}
}
/**
* Check if a child znode represents a write lock.
* @param child The child znode we want to check.
* @return whether the child znode represents a write lock
*/
protected static boolean isChildWriteLock(String child) {
int idx = child.lastIndexOf(ZKUtil.ZNODE_PATH_SEPARATOR);
String suffix = child.substring(idx + 1);
return suffix.startsWith(WRITE_LOCK_CHILD_NODE_PREFIX);
}
/**
* Update state as to indicate that a lock is held
* @param createdZNode The lock znode
* @throws IOException If an unrecoverable ZooKeeper error occurs
*/
protected void updateAcquiredLock(String createdZNode) throws IOException {
Stat stat = new Stat();
byte[] data = null;
Exception ex = null;
try {
data = ZKUtil.getDataNoWatch(zkWatcher, createdZNode, stat);
} catch (KeeperException e) {
LOG.warn("Cannot getData for znode:" + createdZNode, e);
ex = e;
}
if (data == null) {
LOG.error("Can't acquire a lock on a non-existent node " + createdZNode);
throw new IllegalStateException("ZNode " + createdZNode +
"no longer exists!", ex);
}
AcquiredLock newLock = new AcquiredLock(createdZNode, stat.getVersion());
if (!acquiredLock.compareAndSet(null, newLock)) {
LOG.error("The lock " + fullyQualifiedZNode +
" has already been acquired by another process!");
throw new IllegalStateException(fullyQualifiedZNode +
" is held by another process");
}
}
/**
* {@inheritDoc}
*/
@Override
public void release() throws IOException, InterruptedException {
AcquiredLock lock = acquiredLock.get();
if (lock == null) {
LOG.error("Cannot release lock" +
", process does not have a lock for " + fullyQualifiedZNode);
throw new IllegalStateException("No lock held for " + fullyQualifiedZNode);
}
try {
if (ZKUtil.checkExists(zkWatcher, lock.getPath()) != -1) {
ZKUtil.deleteNode(zkWatcher, lock.getPath(), lock.getVersion());
if (!acquiredLock.compareAndSet(lock, null)) {
LOG.debug("Current process no longer holds " + lock + " for " +
fullyQualifiedZNode);
throw new IllegalStateException("Not holding a lock for " +
fullyQualifiedZNode +"!");
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully released " + lock.getPath());
}
} catch (BadVersionException e) {
throw new IllegalStateException(e);
} catch (KeeperException e) {
throw new IOException(e);
}
}
/**
* Process metadata stored in a ZNode using a callback object passed to
* this instance.
* <p>
* @param lockZNode The node holding the metadata
* @return True if metadata was ready and processed
* @throws IOException If an unexpected ZooKeeper error occurs
* @throws InterruptedException If interrupted when reading the metadata
*/
protected boolean handleLockMetadata(String lockZNode)
throws IOException, InterruptedException {
byte[] metadata = null;
try {
metadata = ZKUtil.getData(zkWatcher, lockZNode);
} catch (KeeperException ex) {
LOG.warn("Cannot getData for znode:" + lockZNode, ex);
}
if (metadata == null) {
return false;
}
if (handler != null) {
handler.handleMetadata(metadata);
}
return true;
}
/**
* Determine based on a list of children under a ZNode, whether or not a
* process which created a specified ZNode has obtained a lock. If a lock is
* not obtained, return the path that we should watch awaiting its deletion.
* Otherwise, return null.
* This method is abstract as the logic for determining whether or not a
* lock is obtained depends on the type of lock being implemented.
* @param myZNode The ZNode created by the process attempting to acquire
* a lock
* @param children List of all child ZNodes under the lock's parent ZNode
* @return The path to watch, or null if myZNode can represent a correctly
* acquired lock.
*/
protected abstract String getLockPath(String myZNode, List<String> children)
throws IOException, InterruptedException;
}

View File

@ -0,0 +1,83 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper.lock;
import java.io.IOException;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
* ZooKeeper based read lock: does not exclude other read locks, but excludes
* and is excluded by write locks.
*/
@InterfaceAudience.Private
public class ZKInterProcessReadLock extends ZKInterProcessLockBase {
private static final Log LOG = LogFactory.getLog(ZKInterProcessReadLock.class);
public ZKInterProcessReadLock(ZooKeeperWatcher zooKeeperWatcher,
String znode, byte[] metadata, MetadataHandler handler) {
super(zooKeeperWatcher, znode, metadata, handler, READ_LOCK_CHILD_NODE_PREFIX);
}
/**
* {@inheritDoc}
*/
@Override
protected String getLockPath(String createdZNode, List<String> children)
throws IOException, InterruptedException {
TreeSet<String> writeChildren =
new TreeSet<String>(ZNodeComparator.COMPARATOR);
for (String child : children) {
if (isChildWriteLock(child)) {
writeChildren.add(child);
}
}
if (writeChildren.isEmpty()) {
return null;
}
SortedSet<String> lowerChildren = writeChildren.headSet(createdZNode);
if (lowerChildren.isEmpty()) {
return null;
}
String pathToWatch = lowerChildren.last();
String nodeHoldingLock = lowerChildren.first();
String znode = ZKUtil.joinZNode(parentLockNode, nodeHoldingLock);
try {
handleLockMetadata(znode);
} catch (IOException e) {
LOG.warn("Error processing lock metadata in " + nodeHoldingLock, e);
}
return pathToWatch;
}
@Override
public void reapAllLocks() throws IOException {
throw new UnsupportedOperationException(
"Lock reaping is not supported for ZK based read locks");
}
}

View File

@ -0,0 +1,66 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper.lock;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler;
import org.apache.hadoop.hbase.InterProcessReadWriteLock;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
* ZooKeeper based implementation of {@link InterProcessReadWriteLock}. This lock is fair,
* not reentrant, and not revocable.
*/
@InterfaceAudience.Private
public class ZKInterProcessReadWriteLock implements InterProcessReadWriteLock {
private final ZooKeeperWatcher zkWatcher;
private final String znode;
private final MetadataHandler handler;
/**
* Creates a DistributedReadWriteLock instance.
* @param zkWatcher
* @param znode ZNode path for the lock
* @param handler An object that will handle de-serializing and processing
* the metadata associated with reader or writer locks
* created by this object or null if none desired.
*/
public ZKInterProcessReadWriteLock(ZooKeeperWatcher zkWatcher, String znode,
MetadataHandler handler) {
this.zkWatcher = zkWatcher;
this.znode = znode;
this.handler = handler;
}
/**
* {@inheritDoc}
*/
public ZKInterProcessReadLock readLock(byte[] metadata) {
return new ZKInterProcessReadLock(zkWatcher, znode, metadata, handler);
}
/**
* {@inheritDoc}
*/
public ZKInterProcessWriteLock writeLock(byte[] metadata) {
return new ZKInterProcessWriteLock(zkWatcher, znode, metadata, handler);
}
}

View File

@ -0,0 +1,98 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper.lock;
import java.io.IOException;
import java.util.List;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/**
* ZooKeeper based write lock:
*/
@InterfaceAudience.Private
public class ZKInterProcessWriteLock extends ZKInterProcessLockBase {
private static final Log LOG = LogFactory.getLog(ZKInterProcessWriteLock.class);
public ZKInterProcessWriteLock(ZooKeeperWatcher zooKeeperWatcher,
String znode, byte[] metadata, MetadataHandler handler) {
super(zooKeeperWatcher, znode, metadata, handler, WRITE_LOCK_CHILD_NODE_PREFIX);
}
/**
* {@inheritDoc}
*/
@Override
protected String getLockPath(String createdZNode, List<String> children)
throws IOException, InterruptedException {
TreeSet<String> sortedChildren =
new TreeSet<String>(ZNodeComparator.COMPARATOR);
sortedChildren.addAll(children);
String pathToWatch = sortedChildren.lower(createdZNode);
if (pathToWatch != null) {
String nodeHoldingLock = sortedChildren.first();
String znode = ZKUtil.joinZNode(parentLockNode, nodeHoldingLock);
try {
handleLockMetadata(znode);
} catch (IOException e) {
LOG.warn("Error processing lock metadata in " + nodeHoldingLock, e);
}
}
return pathToWatch;
}
/**
* Referred in zk recipe as "Revocable Shared Locks with Freaking Laser Beams"
* (http://zookeeper.apache.org/doc/trunk/recipes.html).
*/
public void reapAllLocks() throws IOException {
List<String> children;
try {
children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode);
} catch (KeeperException e) {
LOG.error("Unexpected ZooKeeper error when listing children", e);
throw new IOException("Unexpected ZooKeeper exception", e);
}
KeeperException deferred = null;
for (String child : children) {
if (isChildWriteLock(child)) {
String znode = ZKUtil.joinZNode(parentLockNode, child);
LOG.info("Reaping write lock for znode:" + znode);
try {
ZKUtil.deleteNodeFailSilent(zkWatcher, znode);
} catch (KeeperException ex) {
LOG.warn("Error reaping the znode for write lock :" + znode);
deferred = ex;
}
}
}
if (deferred != null) {
throw new IOException("ZK exception while reaping locks:", deferred);
}
}
}

View File

@ -836,6 +836,15 @@
be running with splits disabled.
</description>
</property>
<property>
<name>hbase.table.lock.enable</name>
<value>true</value>
<description>
Set to true to enable locking the table in zookeeper for schema change operations.
Table locking from master prevents concurrent schema modifications to corrupt table
state.
</description>
</property>
<property>
<name>dfs.support.append</name>
<value>true</value>

View File

@ -18,8 +18,11 @@
*/
package org.apache.hadoop.hbase;
import java.util.Set;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -91,7 +94,7 @@ public abstract class MultithreadedTestUtil {
stopped = s;
}
}
public void stop() throws Exception {
synchronized (this) {
stopped = true;
@ -130,7 +133,7 @@ public abstract class MultithreadedTestUtil {
this.stopped = true;
}
}
/**
* A test thread that performs a repeating operation.
*/
@ -138,13 +141,48 @@ public abstract class MultithreadedTestUtil {
public RepeatingTestThread(TestContext ctx) {
super(ctx);
}
public final void doWork() throws Exception {
while (ctx.shouldRun() && !stopped) {
doAnAction();
}
}
public abstract void doAnAction() throws Exception;
}
/**
* Verify that no assertions have failed inside a future.
* Used for unit tests that spawn threads. E.g.,
* <p>
* <code>
* List<Future<Void>> results = Lists.newArrayList();
* Future<Void> f = executor.submit(new Callable<Void> {
* public Void call() {
* assertTrue(someMethod());
* }
* });
* results.add(f);
* assertOnFutures(results);
* </code>
* @param threadResults A list of futures
* @param <T>
* @throws InterruptedException If interrupted when waiting for a result
* from one of the futures
* @throws ExecutionException If an exception other than AssertionError
* occurs inside any of the futures
*/
public static <T> void assertOnFutures(List<Future<T>> threadResults)
throws InterruptedException, ExecutionException {
for (Future<T> threadResult : threadResults) {
try {
threadResult.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof AssertionError) {
throw (AssertionError) e.getCause();
}
throw e;
}
}
}
}

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager;
import org.apache.hadoop.hbase.master.balancer.DefaultLoadBalancer;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
@ -182,7 +183,7 @@ public class TestAssignmentManager {
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
* @throws DeserializationException
* @throws DeserializationException
*/
@Test(timeout = 5000)
public void testBalanceOnMasterFailoverScenarioWithOpenedNode()
@ -346,7 +347,7 @@ public class TestAssignmentManager {
* from one server to another mocking regionserver responding over zk.
* @throws IOException
* @throws KeeperException
* @throws DeserializationException
* @throws DeserializationException
*/
@Test
public void testBalance()
@ -361,7 +362,7 @@ public class TestAssignmentManager {
.getConfiguration());
// Create an AM.
AssignmentManager am = new AssignmentManager(this.server,
this.serverManager, ct, balancer, executor, null);
this.serverManager, ct, balancer, executor, null, master.getTableLockManager());
am.failoverCleanupDone.set(true);
try {
// Make sure our new AM gets callbacks; once registered, can't unregister.
@ -442,7 +443,7 @@ public class TestAssignmentManager {
* To test closed region handler to remove rit and delete corresponding znode
* if region in pending close or closing while processing shutdown of a region
* server.(HBASE-5927).
*
*
* @throws KeeperException
* @throws IOException
* @throws ServiceException
@ -453,12 +454,12 @@ public class TestAssignmentManager {
testCaseWithPartiallyDisabledState(Table.State.DISABLING);
testCaseWithPartiallyDisabledState(Table.State.DISABLED);
}
/**
* To test if the split region is removed from RIT if the region was in SPLITTING state but the RS
* has actually completed the splitting in META but went down. See HBASE-6070 and also HBASE-5806
*
*
* @throws KeeperException
* @throws IOException
*/
@ -469,7 +470,7 @@ public class TestAssignmentManager {
// false indicate the region is not split
testCaseWithSplitRegionPartial(false);
}
private void testCaseWithSplitRegionPartial(boolean regionSplitDone) throws KeeperException,
IOException, NodeExistsException, InterruptedException, ServiceException {
// Create and startup an executor. This is used by AssignmentManager
@ -530,7 +531,7 @@ public class TestAssignmentManager {
// Create an AM.
AssignmentManager am = new AssignmentManager(this.server,
this.serverManager, ct, balancer, executor, null);
this.serverManager, ct, balancer, executor, null, master.getTableLockManager());
// adding region to regions and servers maps.
am.regionOnline(REGIONINFO, SERVERNAME_A);
// adding region in pending close.
@ -651,7 +652,7 @@ public class TestAssignmentManager {
.getConfiguration());
// Create an AM.
AssignmentManager am = new AssignmentManager(this.server,
this.serverManager, ct, balancer, null, null);
this.serverManager, ct, balancer, null, null, master.getTableLockManager());
try {
// First make sure my mock up basically works. Unassign a region.
unassign(am, SERVERNAME_A, hri);
@ -679,7 +680,7 @@ public class TestAssignmentManager {
* Tests the processDeadServersAndRegionsInTransition should not fail with NPE
* when it failed to get the children. Let's abort the system in this
* situation
* @throws ServiceException
* @throws ServiceException
*/
@Test(timeout = 5000)
public void testProcessDeadServersAndRegionsInTransitionShouldNotFailWithNPE()
@ -708,7 +709,7 @@ public class TestAssignmentManager {
}
}
/**
* TestCase verifies that the regionPlan is updated whenever a region fails to open
* TestCase verifies that the regionPlan is updated whenever a region fails to open
* and the master tries to process RS_ZK_FAILED_OPEN state.(HBASE-5546).
*/
@Test(timeout = 5000)
@ -795,7 +796,7 @@ public class TestAssignmentManager {
this.gate.set(true);
return randomServerName;
}
@Override
public Map<ServerName, List<HRegionInfo>> retainAssignment(
Map<HRegionInfo, ServerName> regions, List<ServerName> servers) {
@ -836,7 +837,7 @@ public class TestAssignmentManager {
/**
* Test verifies whether assignment is skipped for regions of tables in DISABLING state during
* clean cluster startup. See HBASE-6281.
*
*
* @throws KeeperException
* @throws IOException
* @throws Exception
@ -882,7 +883,7 @@ public class TestAssignmentManager {
/**
* Test verifies whether all the enabling table regions assigned only once during master startup.
*
*
* @throws KeeperException
* @throws IOException
* @throws Exception
@ -902,7 +903,8 @@ public class TestAssignmentManager {
try {
// set table in enabling state.
am.getZKTable().setEnablingTable(REGIONINFO.getTableNameAsString());
new EnableTableHandler(server, REGIONINFO.getTableName(), am.getCatalogTracker(), am, true)
new EnableTableHandler(server, REGIONINFO.getTableName(), am.getCatalogTracker(),
am, new NullTableLockManager(), true).prepare()
.process();
assertEquals("Number of assignments should be 1.", 1, assignmentCount);
assertTrue("Table should be enabled.",
@ -1071,7 +1073,7 @@ public class TestAssignmentManager {
ExecutorService executor = startupMasterExecutor("mockedAMExecutor");
this.balancer = LoadBalancerFactory.getLoadBalancer(server.getConfiguration());
AssignmentManagerWithExtrasForTesting am = new AssignmentManagerWithExtrasForTesting(
server, manager, ct, this.balancer, executor);
server, manager, ct, this.balancer, executor, new NullTableLockManager());
return am;
}
@ -1090,8 +1092,9 @@ public class TestAssignmentManager {
public AssignmentManagerWithExtrasForTesting(
final Server master, final ServerManager serverManager,
final CatalogTracker catalogTracker, final LoadBalancer balancer,
final ExecutorService service) throws KeeperException, IOException {
super(master, serverManager, catalogTracker, balancer, service, null);
final ExecutorService service, final TableLockManager tableLockManager)
throws KeeperException, IOException {
super(master, serverManager, catalogTracker, balancer, service, null, tableLockManager);
this.es = service;
this.ct = catalogTracker;
}

View File

@ -320,6 +320,11 @@ public class TestCatalogJanitor {
@Override
public void deleteColumn(byte[] tableName, byte[] columnName) throws IOException { }
@Override
public TableLockManager getTableLockManager() {
return null;
}
}
@Test

View File

@ -0,0 +1,294 @@
/*
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.LockTimeoutException;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Tests the default table lock manager
*/
@Category(MediumTests.class)
public class TestTableLockManager {
private static final Log LOG =
LogFactory.getLog(TestTableLockManager.class);
private static final byte[] TABLE_NAME = Bytes.toBytes("TestTableLevelLocks");
private static final byte[] FAMILY = Bytes.toBytes("f1");
private static final byte[] NEW_FAMILY = Bytes.toBytes("f2");
private final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
private static final CountDownLatch deleteColumn = new CountDownLatch(1);
private static final CountDownLatch addColumn = new CountDownLatch(1);
public void prepareMiniCluster() throws Exception {
TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
TEST_UTIL.startMiniCluster(2);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
}
public void prepareMiniZkCluster() throws Exception {
TEST_UTIL.startMiniZKCluster(1);
}
@After
public void tearDown() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test(timeout = 600000)
public void testLockTimeoutException() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt(TableLockManager.TABLE_WRITE_LOCK_TIMEOUT_MS, 3000);
prepareMiniCluster();
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
master.getCoprocessorHost().load(TestLockTimeoutExceptionMasterObserver.class,
0, TEST_UTIL.getConfiguration());
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Object> shouldFinish = executor.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
admin.deleteColumn(TABLE_NAME, FAMILY);
return null;
}
});
deleteColumn.await();
try {
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
admin.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY));
fail("Was expecting TableLockTimeoutException");
} catch (LockTimeoutException ex) {
//expected
}
shouldFinish.get();
}
public static class TestLockTimeoutExceptionMasterObserver extends BaseMasterObserver {
@Override
public void preDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
byte[] tableName, byte[] c) throws IOException {
deleteColumn.countDown();
}
@Override
public void postDeleteColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
byte[] tableName, byte[] c) throws IOException {
Threads.sleep(10000);
}
@Override
public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
byte[] tableName, HColumnDescriptor column) throws IOException {
fail("Add column should have timeouted out for acquiring the table lock");
}
}
@Test(timeout = 600000)
public void testAlterAndDisable() throws Exception {
prepareMiniCluster();
// Send a request to alter a table, then sleep during
// the alteration phase. In the mean time, from another
// thread, send a request to disable, and then delete a table.
HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
master.getCoprocessorHost().load(TestAlterAndDisableMasterObserver.class,
0, TEST_UTIL.getConfiguration());
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<Object> alterTableFuture = executor.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
admin.addColumn(TABLE_NAME, new HColumnDescriptor(NEW_FAMILY));
LOG.info("Added new column family");
HTableDescriptor tableDesc = admin.getTableDescriptor(TABLE_NAME);
assertTrue(tableDesc.getFamiliesKeys().contains(NEW_FAMILY));
return null;
}
});
Future<Object> disableTableFuture = executor.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
admin.disableTable(TABLE_NAME);
assertTrue(admin.isTableDisabled(TABLE_NAME));
admin.deleteTable(TABLE_NAME);
assertFalse(admin.tableExists(TABLE_NAME));
return null;
}
});
try {
disableTableFuture.get();
alterTableFuture.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof AssertionError) {
throw (AssertionError) e.getCause();
}
throw e;
}
}
public static class TestAlterAndDisableMasterObserver extends BaseMasterObserver {
@Override
public void preAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
byte[] tableName, HColumnDescriptor column) throws IOException {
LOG.debug("addColumn called");
addColumn.countDown();
}
@Override
public void postAddColumnHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
byte[] tableName, HColumnDescriptor column) throws IOException {
Threads.sleep(6000);
try {
ctx.getEnvironment().getMasterServices().checkTableModifiable(tableName);
} catch(TableNotDisabledException expected) {
//pass
return;
} catch(IOException ex) {
}
fail("was expecting the table to be enabled");
}
@Override
public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
byte[] tableName) throws IOException {
try {
LOG.debug("Waiting for addColumn to be processed first");
//wait for addColumn to be processed first
addColumn.await();
LOG.debug("addColumn started, we can continue");
} catch (InterruptedException ex) {
LOG.warn("Sleep interrupted while waiting for addColumn countdown");
}
}
@Override
public void postDisableTableHandler(ObserverContext<MasterCoprocessorEnvironment> ctx,
byte[] tableName) throws IOException {
Threads.sleep(3000);
}
}
@Test(timeout = 600000)
public void testDelete() throws Exception {
prepareMiniCluster();
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
admin.disableTable(TABLE_NAME);
admin.deleteTable(TABLE_NAME);
//ensure that znode for the table node has been deleted
ZooKeeperWatcher zkWatcher = TEST_UTIL.getZooKeeperWatcher();
assertTrue(ZKUtil.checkExists(zkWatcher,
ZKUtil.joinZNode(zkWatcher.tableLockZNode, Bytes.toString(TABLE_NAME))) < 0);
}
@Test(timeout = 600000)
public void testReapAllTableLocks() throws Exception {
prepareMiniZkCluster();
ServerName serverName = new ServerName("localhost:10000", 0);
final TableLockManager lockManager = TableLockManager.createTableLockManager(
TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName);
String tables[] = {"table1", "table2", "table3", "table4"};
ExecutorService executor = Executors.newFixedThreadPool(6);
final CountDownLatch writeLocksObtained = new CountDownLatch(4);
final CountDownLatch writeLocksAttempted = new CountDownLatch(10);
//TODO: read lock tables
//6 threads will be stuck waiting for the table lock
for (int i = 0; i < tables.length; i++) {
final String table = tables[i];
for (int j = 0; j < i+1; j++) { //i+1 write locks attempted for table[i]
executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
writeLocksAttempted.countDown();
lockManager.writeLock(Bytes.toBytes(table), "testReapAllTableLocks").acquire();
writeLocksObtained.countDown();
return null;
}
});
}
}
writeLocksObtained.await();
writeLocksAttempted.await();
//now reap all table locks
lockManager.reapAllTableWriteLocks();
TEST_UTIL.getConfiguration().setInt(TableLockManager.TABLE_WRITE_LOCK_TIMEOUT_MS, 0);
TableLockManager zeroTimeoutLockManager = TableLockManager.createTableLockManager(
TEST_UTIL.getConfiguration(), TEST_UTIL.getZooKeeperWatcher(), serverName);
//should not throw table lock timeout exception
zeroTimeoutLockManager.writeLock(Bytes.toBytes(tables[tables.length -1]), "zero timeout")
.acquire();
executor.shutdownNow();
}
}

View File

@ -0,0 +1,359 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper.lock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.InterProcessLock;
import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
@Category(MediumTests.class)
public class TestZKInterProcessReadWriteLock {
private static final Log LOG =
LogFactory.getLog(TestZKInterProcessReadWriteLock.class);
private static final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
private static final int NUM_THREADS = 10;
private static Configuration conf;
private final AtomicBoolean isLockHeld = new AtomicBoolean(false);
private final ExecutorService executor =
Executors.newFixedThreadPool(NUM_THREADS,
new DaemonThreadFactory("TestZKInterProcessReadWriteLock-"));
@BeforeClass
public static void beforeAllTests() throws Exception {
conf = TEST_UTIL.getConfiguration();
TEST_UTIL.startMiniZKCluster();
conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 1000);
ZooKeeperWatcher zkw = getZooKeeperWatcher("setup");
ZKUtil.createWithParents(zkw, zkw.tableLockZNode);
}
@AfterClass
public static void afterAllTests() throws Exception {
TEST_UTIL.shutdownMiniZKCluster();
}
@After
public void tearDown() {
executor.shutdown();
}
private static ZooKeeperWatcher getZooKeeperWatcher(String desc)
throws IOException {
return TEST_UTIL.getZooKeeperWatcher();
}
@Test(timeout = 30000)
public void testWriteLockExcludesWriters() throws Exception {
final String testName = "testWriteLockExcludesWriters";
final ZKInterProcessReadWriteLock readWriteLock =
getReadWriteLock(testName);
List<Future<Void>> results = Lists.newArrayList();
for (int i = 0; i < NUM_THREADS; ++i) {
final String threadDesc = testName + i;
results.add(executor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
ZKInterProcessWriteLock writeLock =
readWriteLock.writeLock(Bytes.toBytes(threadDesc));
try {
writeLock.acquire();
try {
// No one else should hold the lock
assertTrue(isLockHeld.compareAndSet(false, true));
Thread.sleep(1000);
// No one else should have released the lock
assertTrue(isLockHeld.compareAndSet(true, false));
} finally {
isLockHeld.set(false);
writeLock.release();
}
} catch (InterruptedException e) {
LOG.warn(threadDesc + " interrupted", e);
Thread.currentThread().interrupt();
throw new InterruptedIOException();
}
return null;
}
}));
}
MultithreadedTestUtil.assertOnFutures(results);
}
@Test(timeout = 30000)
public void testReadLockDoesNotExcludeReaders() throws Exception {
final String testName = "testReadLockDoesNotExcludeReaders";
final ZKInterProcessReadWriteLock readWriteLock =
getReadWriteLock(testName);
final CountDownLatch locksAcquiredLatch = new CountDownLatch(NUM_THREADS);
final AtomicInteger locksHeld = new AtomicInteger(0);
List<Future<Void>> results = Lists.newArrayList();
for (int i = 0; i < NUM_THREADS; ++i) {
final String threadDesc = testName + i;
results.add(executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
ZKInterProcessReadLock readLock =
readWriteLock.readLock(Bytes.toBytes(threadDesc));
readLock.acquire();
try {
locksHeld.incrementAndGet();
locksAcquiredLatch.countDown();
Thread.sleep(1000);
} finally {
readLock.release();
locksHeld.decrementAndGet();
}
return null;
}
}));
}
locksAcquiredLatch.await();
assertEquals(locksHeld.get(), NUM_THREADS);
MultithreadedTestUtil.assertOnFutures(results);
}
@Test(timeout = 3000)
public void testReadLockExcludesWriters() throws Exception {
// Submit a read lock request first
// Submit a write lock request second
final String testName = "testReadLockExcludesWriters";
List<Future<Void>> results = Lists.newArrayList();
final CountDownLatch readLockAcquiredLatch = new CountDownLatch(1);
Callable<Void> acquireReadLock = new Callable<Void>() {
@Override
public Void call() throws Exception {
final String threadDesc = testName + "-acquireReadLock";
ZKInterProcessReadLock readLock =
getReadWriteLock(testName).readLock(Bytes.toBytes(threadDesc));
readLock.acquire();
try {
assertTrue(isLockHeld.compareAndSet(false, true));
readLockAcquiredLatch.countDown();
Thread.sleep(1000);
} finally {
isLockHeld.set(false);
readLock.release();
}
return null;
}
};
Callable<Void> acquireWriteLock = new Callable<Void>() {
@Override
public Void call() throws Exception {
final String threadDesc = testName + "-acquireWriteLock";
ZKInterProcessWriteLock writeLock =
getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
readLockAcquiredLatch.await();
assertTrue(isLockHeld.get());
writeLock.acquire();
try {
assertFalse(isLockHeld.get());
} finally {
writeLock.release();
}
return null;
}
};
results.add(executor.submit(acquireReadLock));
results.add(executor.submit(acquireWriteLock));
MultithreadedTestUtil.assertOnFutures(results);
}
private static ZKInterProcessReadWriteLock getReadWriteLock(String testName)
throws IOException {
MetadataHandler handler = new MetadataHandler() {
@Override
public void handleMetadata(byte[] ownerMetadata) {
LOG.info("Lock info: " + Bytes.toString(ownerMetadata));
}
};
ZooKeeperWatcher zkWatcher = getZooKeeperWatcher(testName);
String znode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, testName);
return new ZKInterProcessReadWriteLock(zkWatcher, znode, handler);
}
@Test(timeout = 30000)
public void testWriteLockExcludesReaders() throws Exception {
// Submit a read lock request first
// Submit a write lock request second
final String testName = "testReadLockExcludesWriters";
List<Future<Void>> results = Lists.newArrayList();
final CountDownLatch writeLockAcquiredLatch = new CountDownLatch(1);
Callable<Void> acquireWriteLock = new Callable<Void>() {
@Override
public Void call() throws Exception {
final String threadDesc = testName + "-acquireWriteLock";
ZKInterProcessWriteLock writeLock =
getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
writeLock.acquire();
try {
writeLockAcquiredLatch.countDown();
assertTrue(isLockHeld.compareAndSet(false, true));
Thread.sleep(1000);
} finally {
isLockHeld.set(false);
writeLock.release();
}
return null;
}
};
Callable<Void> acquireReadLock = new Callable<Void>() {
@Override
public Void call() throws Exception {
final String threadDesc = testName + "-acquireReadLock";
ZKInterProcessReadLock readLock =
getReadWriteLock(testName).readLock(Bytes.toBytes(threadDesc));
writeLockAcquiredLatch.await();
readLock.acquire();
try {
assertFalse(isLockHeld.get());
} finally {
readLock.release();
}
return null;
}
};
results.add(executor.submit(acquireWriteLock));
results.add(executor.submit(acquireReadLock));
MultithreadedTestUtil.assertOnFutures(results);
}
@Test(timeout = 60000)
public void testTimeout() throws Exception {
final String testName = "testTimeout";
final CountDownLatch lockAcquiredLatch = new CountDownLatch(1);
Callable<Void> shouldHog = new Callable<Void>() {
@Override
public Void call() throws Exception {
final String threadDesc = testName + "-shouldHog";
ZKInterProcessWriteLock lock =
getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
lock.acquire();
lockAcquiredLatch.countDown();
Thread.sleep(10000);
lock.release();
return null;
}
};
Callable<Void> shouldTimeout = new Callable<Void>() {
@Override
public Void call() throws Exception {
final String threadDesc = testName + "-shouldTimeout";
ZKInterProcessWriteLock lock =
getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
lockAcquiredLatch.await();
assertFalse(lock.tryAcquire(5000));
return null;
}
};
Callable<Void> shouldAcquireLock = new Callable<Void>() {
@Override
public Void call() throws Exception {
final String threadDesc = testName + "-shouldAcquireLock";
ZKInterProcessWriteLock lock =
getReadWriteLock(testName).writeLock(Bytes.toBytes(threadDesc));
lockAcquiredLatch.await();
assertTrue(lock.tryAcquire(30000));
lock.release();
return null;
}
};
List<Future<Void>> results = Lists.newArrayList();
results.add(executor.submit(shouldHog));
results.add(executor.submit(shouldTimeout));
results.add(executor.submit(shouldAcquireLock));
MultithreadedTestUtil.assertOnFutures(results);
}
@Test(timeout = 60000)
public void testMultipleClients() throws Exception {
//tests lock usage from multiple zookeeper clients with different sessions.
//acquire one read lock, then one write lock
final String testName = "testMultipleClients";
//different zookeeper sessions with separate identifiers
ZooKeeperWatcher zkWatcher1 = new ZooKeeperWatcher(conf, "testMultipleClients-1", null);
ZooKeeperWatcher zkWatcher2 = new ZooKeeperWatcher(conf, "testMultipleClients-2", null);
String znode = ZKUtil.joinZNode(zkWatcher1.tableLockZNode, testName);
ZKInterProcessReadWriteLock clientLock1
= new ZKInterProcessReadWriteLock(zkWatcher1, znode, null);
ZKInterProcessReadWriteLock clientLock2
= new ZKInterProcessReadWriteLock(zkWatcher2, znode, null);
InterProcessLock lock1 = clientLock1.readLock(Bytes.toBytes("client1"));
lock1.acquire();
//try to acquire, but it will timeout. We are testing whether this will cause any problems
//due to the read lock being from another client
InterProcessLock lock2 = clientLock2.writeLock(Bytes.toBytes("client2"));
assertFalse(lock2.tryAcquire(1000));
lock1.release();
//this time it will acquire
assertTrue(lock2.tryAcquire(5000));
lock2.release();
zkWatcher1.close();
zkWatcher2.close();
}
}