HBASE-8205. HBCK support for table locks
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1467497 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1c81734163
commit
af834c404f
|
@ -4925,6 +4925,10 @@ public final class ZooKeeperProtos {
|
|||
// optional string purpose = 5;
|
||||
boolean hasPurpose();
|
||||
String getPurpose();
|
||||
|
||||
// optional int64 createTime = 6;
|
||||
boolean hasCreateTime();
|
||||
long getCreateTime();
|
||||
}
|
||||
public static final class TableLock extends
|
||||
com.google.protobuf.GeneratedMessage
|
||||
|
@ -5030,12 +5034,23 @@ public final class ZooKeeperProtos {
|
|||
}
|
||||
}
|
||||
|
||||
// optional int64 createTime = 6;
|
||||
public static final int CREATETIME_FIELD_NUMBER = 6;
|
||||
private long createTime_;
|
||||
public boolean hasCreateTime() {
|
||||
return ((bitField0_ & 0x00000020) == 0x00000020);
|
||||
}
|
||||
public long getCreateTime() {
|
||||
return createTime_;
|
||||
}
|
||||
|
||||
private void initFields() {
|
||||
tableName_ = com.google.protobuf.ByteString.EMPTY;
|
||||
lockOwner_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ServerName.getDefaultInstance();
|
||||
threadId_ = 0L;
|
||||
isShared_ = false;
|
||||
purpose_ = "";
|
||||
createTime_ = 0L;
|
||||
}
|
||||
private byte memoizedIsInitialized = -1;
|
||||
public final boolean isInitialized() {
|
||||
|
@ -5070,6 +5085,9 @@ public final class ZooKeeperProtos {
|
|||
if (((bitField0_ & 0x00000010) == 0x00000010)) {
|
||||
output.writeBytes(5, getPurposeBytes());
|
||||
}
|
||||
if (((bitField0_ & 0x00000020) == 0x00000020)) {
|
||||
output.writeInt64(6, createTime_);
|
||||
}
|
||||
getUnknownFields().writeTo(output);
|
||||
}
|
||||
|
||||
|
@ -5099,6 +5117,10 @@ public final class ZooKeeperProtos {
|
|||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeBytesSize(5, getPurposeBytes());
|
||||
}
|
||||
if (((bitField0_ & 0x00000020) == 0x00000020)) {
|
||||
size += com.google.protobuf.CodedOutputStream
|
||||
.computeInt64Size(6, createTime_);
|
||||
}
|
||||
size += getUnknownFields().getSerializedSize();
|
||||
memoizedSerializedSize = size;
|
||||
return size;
|
||||
|
@ -5147,6 +5169,11 @@ public final class ZooKeeperProtos {
|
|||
result = result && getPurpose()
|
||||
.equals(other.getPurpose());
|
||||
}
|
||||
result = result && (hasCreateTime() == other.hasCreateTime());
|
||||
if (hasCreateTime()) {
|
||||
result = result && (getCreateTime()
|
||||
== other.getCreateTime());
|
||||
}
|
||||
result = result &&
|
||||
getUnknownFields().equals(other.getUnknownFields());
|
||||
return result;
|
||||
|
@ -5176,6 +5203,10 @@ public final class ZooKeeperProtos {
|
|||
hash = (37 * hash) + PURPOSE_FIELD_NUMBER;
|
||||
hash = (53 * hash) + getPurpose().hashCode();
|
||||
}
|
||||
if (hasCreateTime()) {
|
||||
hash = (37 * hash) + CREATETIME_FIELD_NUMBER;
|
||||
hash = (53 * hash) + hashLong(getCreateTime());
|
||||
}
|
||||
hash = (29 * hash) + getUnknownFields().hashCode();
|
||||
return hash;
|
||||
}
|
||||
|
@ -5307,6 +5338,8 @@ public final class ZooKeeperProtos {
|
|||
bitField0_ = (bitField0_ & ~0x00000008);
|
||||
purpose_ = "";
|
||||
bitField0_ = (bitField0_ & ~0x00000010);
|
||||
createTime_ = 0L;
|
||||
bitField0_ = (bitField0_ & ~0x00000020);
|
||||
return this;
|
||||
}
|
||||
|
||||
|
@ -5369,6 +5402,10 @@ public final class ZooKeeperProtos {
|
|||
to_bitField0_ |= 0x00000010;
|
||||
}
|
||||
result.purpose_ = purpose_;
|
||||
if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
|
||||
to_bitField0_ |= 0x00000020;
|
||||
}
|
||||
result.createTime_ = createTime_;
|
||||
result.bitField0_ = to_bitField0_;
|
||||
onBuilt();
|
||||
return result;
|
||||
|
@ -5400,6 +5437,9 @@ public final class ZooKeeperProtos {
|
|||
if (other.hasPurpose()) {
|
||||
setPurpose(other.getPurpose());
|
||||
}
|
||||
if (other.hasCreateTime()) {
|
||||
setCreateTime(other.getCreateTime());
|
||||
}
|
||||
this.mergeUnknownFields(other.getUnknownFields());
|
||||
return this;
|
||||
}
|
||||
|
@ -5466,6 +5506,11 @@ public final class ZooKeeperProtos {
|
|||
purpose_ = input.readBytes();
|
||||
break;
|
||||
}
|
||||
case 48: {
|
||||
bitField0_ |= 0x00000020;
|
||||
createTime_ = input.readInt64();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -5664,6 +5709,27 @@ public final class ZooKeeperProtos {
|
|||
onChanged();
|
||||
}
|
||||
|
||||
// optional int64 createTime = 6;
|
||||
private long createTime_ ;
|
||||
public boolean hasCreateTime() {
|
||||
return ((bitField0_ & 0x00000020) == 0x00000020);
|
||||
}
|
||||
public long getCreateTime() {
|
||||
return createTime_;
|
||||
}
|
||||
public Builder setCreateTime(long value) {
|
||||
bitField0_ |= 0x00000020;
|
||||
createTime_ = value;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
public Builder clearCreateTime() {
|
||||
bitField0_ = (bitField0_ & ~0x00000020);
|
||||
createTime_ = 0L;
|
||||
onChanged();
|
||||
return this;
|
||||
}
|
||||
|
||||
// @@protoc_insertion_point(builder_scope:TableLock)
|
||||
}
|
||||
|
||||
|
@ -5758,11 +5824,12 @@ public final class ZooKeeperProtos {
|
|||
"tate.State\"\"\n\005State\022\013\n\007ENABLED\020\000\022\014\n\010DISA" +
|
||||
"BLED\020\001\"+\n\027ReplicationHLogPosition\022\020\n\010pos" +
|
||||
"ition\030\001 \002(\003\"$\n\017ReplicationLock\022\021\n\tlockOw" +
|
||||
"ner\030\001 \002(\t\"s\n\tTableLock\022\021\n\ttableName\030\001 \001(",
|
||||
"\014\022\036\n\tlockOwner\030\002 \001(\0132\013.ServerName\022\020\n\010thr" +
|
||||
"eadId\030\003 \001(\003\022\020\n\010isShared\030\004 \001(\010\022\017\n\007purpose" +
|
||||
"\030\005 \001(\tBE\n*org.apache.hadoop.hbase.protob" +
|
||||
"uf.generatedB\017ZooKeeperProtosH\001\210\001\001\240\001\001"
|
||||
"ner\030\001 \002(\t\"\207\001\n\tTableLock\022\021\n\ttableName\030\001 \001",
|
||||
"(\014\022\036\n\tlockOwner\030\002 \001(\0132\013.ServerName\022\020\n\010th" +
|
||||
"readId\030\003 \001(\003\022\020\n\010isShared\030\004 \001(\010\022\017\n\007purpos" +
|
||||
"e\030\005 \001(\t\022\022\n\ncreateTime\030\006 \001(\003BE\n*org.apach" +
|
||||
"e.hadoop.hbase.protobuf.generatedB\017ZooKe" +
|
||||
"eperProtosH\001\210\001\001\240\001\001"
|
||||
};
|
||||
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
|
||||
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
|
||||
|
@ -5854,7 +5921,7 @@ public final class ZooKeeperProtos {
|
|||
internal_static_TableLock_fieldAccessorTable = new
|
||||
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
|
||||
internal_static_TableLock_descriptor,
|
||||
new java.lang.String[] { "TableName", "LockOwner", "ThreadId", "IsShared", "Purpose", },
|
||||
new java.lang.String[] { "TableName", "LockOwner", "ThreadId", "IsShared", "Purpose", "CreateTime", },
|
||||
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.class,
|
||||
org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.TableLock.Builder.class);
|
||||
return null;
|
||||
|
|
|
@ -143,4 +143,5 @@ message TableLock {
|
|||
optional int64 threadId = 3;
|
||||
optional bool isShared = 4;
|
||||
optional string purpose = 5;
|
||||
optional int64 createTime = 6;
|
||||
}
|
||||
|
|
|
@ -63,11 +63,23 @@ public interface InterProcessLock {
|
|||
|
||||
/**
|
||||
* If supported, attempts to reap all the locks of this type by forcefully
|
||||
* deleting the locks. Lock reaping is different than coordinated lock revocation
|
||||
* deleting the locks (both held and attempted) that have expired according
|
||||
* to the given timeout. Lock reaping is different than coordinated lock revocation
|
||||
* in that, there is no coordination, and the behavior is undefined if the
|
||||
* lock holder is still alive.
|
||||
* @throws IOException If there is an unrecoverable error reaping the locks
|
||||
*/
|
||||
public void reapExpiredLocks(long expireTimeoutMs) throws IOException;
|
||||
|
||||
/**
|
||||
* If supported, attempts to reap all the locks of this type by forcefully
|
||||
* deleting the locks (both held and attempted). Lock reaping is different
|
||||
* than coordinated lock revocation in that, there is no coordination, and
|
||||
* the behavior is undefined if the lock holder is still alive.
|
||||
* Calling this should have the same affect as calling {@link #reapExpiredLocks(long)}
|
||||
* with timeout=0.
|
||||
* @throws IOException If there is an unrecoverable error reaping the locks
|
||||
*/
|
||||
public void reapAllLocks() throws IOException;
|
||||
|
||||
/**
|
||||
|
@ -83,4 +95,11 @@ public interface InterProcessLock {
|
|||
*/
|
||||
public void handleMetadata(byte[] metadata);
|
||||
}
|
||||
|
||||
/**
|
||||
* Visits the locks (both held and attempted) of this type with the given
|
||||
* {@link MetadataHandler}.
|
||||
* @throws InterruptedException If there is an unrecoverable error
|
||||
*/
|
||||
public void visitLocks(MetadataHandler handler) throws IOException;
|
||||
}
|
||||
|
|
|
@ -28,18 +28,20 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
public interface InterProcessReadWriteLock {
|
||||
|
||||
/**
|
||||
* Obtain a reader lock containing given metadata.
|
||||
* Obtain a read lock containing given metadata.
|
||||
* @param metadata Serialized lock metadata (this may contain information
|
||||
* such as the process owning the lock or the purpose for
|
||||
* which the lock was acquired). Must not be null.
|
||||
* @return An instantiated InterProcessReadWriteLock instance
|
||||
* which the lock was acquired).
|
||||
* @return An instantiated InterProcessLock instance
|
||||
*/
|
||||
public InterProcessLock readLock(byte[] metadata);
|
||||
|
||||
/**
|
||||
* Obtain a writer lock containing given metadata.
|
||||
* @param metadata See documentation of metadata parameter in readLock()
|
||||
* @return An instantiated InterProcessReadWriteLock instance
|
||||
* Obtain a write lock containing given metadata.
|
||||
* @param metadata Serialized lock metadata (this may contain information
|
||||
* such as the process owning the lock or the purpose for
|
||||
* which the lock was acquired).
|
||||
* @return An instantiated InterProcessLock instance
|
||||
*/
|
||||
public InterProcessLock writeLock(byte[] metadata);
|
||||
}
|
||||
|
|
|
@ -721,7 +721,7 @@ Server {
|
|||
//are invalidated
|
||||
this.tableLockManager = TableLockManager.createTableLockManager(conf, zooKeeper, serverName);
|
||||
if (!masterRecovery) {
|
||||
this.tableLockManager.reapAllTableWriteLocks();
|
||||
this.tableLockManager.reapWriteLocks();
|
||||
}
|
||||
|
||||
status.setStatus("Initializing ZK system trackers");
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
|
|||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.lock.ZKInterProcessReadWriteLock;
|
||||
|
@ -66,12 +67,17 @@ public abstract class TableLockManager {
|
|||
protected static final String TABLE_READ_LOCK_TIMEOUT_MS =
|
||||
"hbase.table.read.lock.timeout.ms";
|
||||
|
||||
protected static final int DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS =
|
||||
protected static final long DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS =
|
||||
600 * 1000; //10 min default
|
||||
|
||||
protected static final int DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS =
|
||||
protected static final long DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS =
|
||||
600 * 1000; //10 min default
|
||||
|
||||
public static final String TABLE_LOCK_EXPIRE_TIMEOUT = "hbase.table.lock.expire.ms";
|
||||
|
||||
public static final long DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS =
|
||||
600 * 1000; //10 min default
|
||||
|
||||
/**
|
||||
* A distributed lock for a table.
|
||||
*/
|
||||
|
@ -109,14 +115,32 @@ public abstract class TableLockManager {
|
|||
public abstract TableLock readLock(byte[] tableName, String purpose);
|
||||
|
||||
/**
|
||||
* Force releases all table write locks and lock attempts even if this thread does
|
||||
* Visits all table locks(read and write), and lock attempts with the given callback
|
||||
* MetadataHandler.
|
||||
* @param handler the metadata handler to call
|
||||
* @throws IOException If there is an unrecoverable error
|
||||
*/
|
||||
public abstract void visitAllLocks(MetadataHandler handler) throws IOException;
|
||||
|
||||
/**
|
||||
* Force releases all table locks(read and write) that have been held longer than
|
||||
* "hbase.table.lock.expire.ms". Assumption is that the clock skew between zookeeper
|
||||
* and this servers is negligible.
|
||||
* The behavior of the lock holders still thinking that they have the lock is undefined.
|
||||
* @throws IOException If there is an unrecoverable error
|
||||
*/
|
||||
public abstract void reapAllExpiredLocks() throws IOException;
|
||||
|
||||
/**
|
||||
* Force releases table write locks and lock attempts even if this thread does
|
||||
* not own the lock. The behavior of the lock holders still thinking that they
|
||||
* have the lock is undefined. This should be used carefully and only when
|
||||
* we can ensure that all write-lock holders have died. For example if only
|
||||
* the master can hold write locks, then we can reap it's locks when the backup
|
||||
* master starts.
|
||||
* @throws IOException If there is an unrecoverable error
|
||||
*/
|
||||
public abstract void reapAllTableWriteLocks() throws IOException;
|
||||
public abstract void reapWriteLocks() throws IOException;
|
||||
|
||||
/**
|
||||
* Called after a table has been deleted, and after the table lock is released.
|
||||
|
@ -135,11 +159,14 @@ public abstract class TableLockManager {
|
|||
// Initialize table level lock manager for schema changes, if enabled.
|
||||
if (conf.getBoolean(TABLE_LOCK_ENABLE,
|
||||
DEFAULT_TABLE_LOCK_ENABLE)) {
|
||||
int writeLockTimeoutMs = conf.getInt(TABLE_WRITE_LOCK_TIMEOUT_MS,
|
||||
long writeLockTimeoutMs = conf.getLong(TABLE_WRITE_LOCK_TIMEOUT_MS,
|
||||
DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS);
|
||||
int readLockTimeoutMs = conf.getInt(TABLE_READ_LOCK_TIMEOUT_MS,
|
||||
long readLockTimeoutMs = conf.getLong(TABLE_READ_LOCK_TIMEOUT_MS,
|
||||
DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS);
|
||||
return new ZKTableLockManager(zkWatcher, serverName, writeLockTimeoutMs, readLockTimeoutMs);
|
||||
long lockExpireTimeoutMs = conf.getLong(TABLE_LOCK_EXPIRE_TIMEOUT,
|
||||
DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS);
|
||||
|
||||
return new ZKTableLockManager(zkWatcher, serverName, writeLockTimeoutMs, readLockTimeoutMs, lockExpireTimeoutMs);
|
||||
}
|
||||
|
||||
return new NullTableLockManager();
|
||||
|
@ -167,11 +194,33 @@ public abstract class TableLockManager {
|
|||
return new NullTableLock();
|
||||
}
|
||||
@Override
|
||||
public void reapAllTableWriteLocks() throws IOException {
|
||||
public void reapAllExpiredLocks() throws IOException {
|
||||
}
|
||||
@Override
|
||||
public void reapWriteLocks() throws IOException {
|
||||
}
|
||||
@Override
|
||||
public void tableDeleted(byte[] tableName) throws IOException {
|
||||
}
|
||||
@Override
|
||||
public void visitAllLocks(MetadataHandler handler) throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
/** Public for hbck */
|
||||
public static ZooKeeperProtos.TableLock fromBytes(byte[] bytes) {
|
||||
int pblen = ProtobufUtil.lengthOfPBMagic();
|
||||
if (bytes == null || bytes.length < pblen) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder().mergeFrom(
|
||||
bytes, pblen, bytes.length - pblen).build();
|
||||
return data;
|
||||
} catch (InvalidProtocolBufferException ex) {
|
||||
LOG.warn("Exception in deserialization", ex);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -192,9 +241,9 @@ public abstract class TableLockManager {
|
|||
}
|
||||
LOG.debug("Table is locked by: " +
|
||||
String.format("[tableName=%s, lockOwner=%s, threadId=%s, " +
|
||||
"purpose=%s, isShared=%s]", Bytes.toString(data.getTableName().toByteArray()),
|
||||
"purpose=%s, isShared=%s, createTime=%s]", Bytes.toString(data.getTableName().toByteArray()),
|
||||
ProtobufUtil.toServerName(data.getLockOwner()), data.getThreadId(),
|
||||
data.getPurpose(), data.getIsShared()));
|
||||
data.getPurpose(), data.getIsShared(), data.getCreateTime()));
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -278,7 +327,8 @@ public abstract class TableLockManager {
|
|||
.setLockOwner(ProtobufUtil.toServerName(serverName))
|
||||
.setThreadId(Thread.currentThread().getId())
|
||||
.setPurpose(purpose)
|
||||
.setIsShared(isShared).build();
|
||||
.setIsShared(isShared)
|
||||
.setCreateTime(EnvironmentEdgeManager.currentTimeMillis()).build();
|
||||
byte[] lockMetadata = toBytes(data);
|
||||
|
||||
InterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(zkWatcher, tableLockZNode,
|
||||
|
@ -291,25 +341,11 @@ public abstract class TableLockManager {
|
|||
return ProtobufUtil.prependPBMagic(data.toByteArray());
|
||||
}
|
||||
|
||||
private static ZooKeeperProtos.TableLock fromBytes(byte[] bytes) {
|
||||
int pblen = ProtobufUtil.lengthOfPBMagic();
|
||||
if (bytes == null || bytes.length < pblen) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder().mergeFrom(
|
||||
bytes, pblen, bytes.length - pblen).build();
|
||||
return data;
|
||||
} catch (InvalidProtocolBufferException ex) {
|
||||
LOG.warn("Exception in deserialization", ex);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private final ServerName serverName;
|
||||
private final ZooKeeperWatcher zkWatcher;
|
||||
private final long writeLockTimeoutMs;
|
||||
private final long readLockTimeoutMs;
|
||||
private final long lockExpireTimeoutMs;
|
||||
|
||||
/**
|
||||
* Initialize a new manager for table-level locks.
|
||||
|
@ -322,11 +358,12 @@ public abstract class TableLockManager {
|
|||
* given table, or -1 for no timeout
|
||||
*/
|
||||
public ZKTableLockManager(ZooKeeperWatcher zkWatcher,
|
||||
ServerName serverName, long writeLockTimeoutMs, long readLockTimeoutMs) {
|
||||
ServerName serverName, long writeLockTimeoutMs, long readLockTimeoutMs, long lockExpireTimeoutMs) {
|
||||
this.zkWatcher = zkWatcher;
|
||||
this.serverName = serverName;
|
||||
this.writeLockTimeoutMs = writeLockTimeoutMs;
|
||||
this.readLockTimeoutMs = readLockTimeoutMs;
|
||||
this.lockExpireTimeoutMs = lockExpireTimeoutMs;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -340,19 +377,33 @@ public abstract class TableLockManager {
|
|||
serverName, readLockTimeoutMs, true, purpose);
|
||||
}
|
||||
|
||||
public void visitAllLocks(MetadataHandler handler) throws IOException {
|
||||
for (String tableName : getTableNames()) {
|
||||
String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
|
||||
ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
|
||||
zkWatcher, tableLockZNode, null);
|
||||
lock.readLock(null).visitLocks(handler);
|
||||
lock.writeLock(null).visitLocks(handler);
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> getTableNames() throws IOException {
|
||||
|
||||
List<String> tableNames;
|
||||
try {
|
||||
tableNames = ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.tableLockZNode);
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Unexpected ZooKeeper error when listing children", e);
|
||||
throw new IOException("Unexpected ZooKeeper exception", e);
|
||||
}
|
||||
return tableNames;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reapAllTableWriteLocks() throws IOException {
|
||||
public void reapWriteLocks() throws IOException {
|
||||
//get the table names
|
||||
try {
|
||||
List<String> tableNames;
|
||||
try {
|
||||
tableNames = ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.tableLockZNode);
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Unexpected ZooKeeper error when listing children", e);
|
||||
throw new IOException("Unexpected ZooKeeper exception", e);
|
||||
}
|
||||
|
||||
for (String tableName : tableNames) {
|
||||
for (String tableName : getTableNames()) {
|
||||
String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
|
||||
ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
|
||||
zkWatcher, tableLockZNode, null);
|
||||
|
@ -365,6 +416,24 @@ public abstract class TableLockManager {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reapAllExpiredLocks() throws IOException {
|
||||
//get the table names
|
||||
try {
|
||||
for (String tableName : getTableNames()) {
|
||||
String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
|
||||
ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
|
||||
zkWatcher, tableLockZNode, null);
|
||||
lock.readLock(null).reapExpiredLocks(lockExpireTimeoutMs);
|
||||
lock.writeLock(null).reapExpiredLocks(lockExpireTimeoutMs);
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
throw ex;
|
||||
} catch (Exception ex) {
|
||||
throw new IOException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tableDeleted(byte[] tableName) throws IOException {
|
||||
//table write lock from DeleteHandler is already released, just delete the parent znode
|
||||
|
|
|
@ -64,9 +64,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.exceptions.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.catalog.MetaEditor;
|
||||
import org.apache.hadoop.hbase.client.AdminProtocol;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
|
@ -82,6 +80,8 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
|
|||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
import org.apache.hadoop.hbase.exceptions.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||
|
@ -95,6 +95,7 @@ import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
|
|||
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
|
||||
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler;
|
||||
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl;
|
||||
import org.apache.hadoop.hbase.util.hbck.TableLockChecker;
|
||||
import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
|
@ -197,6 +198,7 @@ public class HBaseFsck extends Configured implements Tool {
|
|||
private boolean fixSplitParents = false; // fix lingering split parents
|
||||
private boolean fixReferenceFiles = false; // fix lingering reference store file
|
||||
private boolean fixEmptyMetaCells = false; // fix (remove) empty REGIONINFO_QUALIFIER rows
|
||||
private boolean fixTableLocks = false; // fix table locks which are expired
|
||||
|
||||
// limit checking/fixes to listed tables, if empty attempt to check/fix all
|
||||
// .META. are always checked
|
||||
|
@ -455,6 +457,8 @@ public class HBaseFsck extends Configured implements Tool {
|
|||
|
||||
offlineReferenceFileRepair();
|
||||
|
||||
checkAndFixTableLocks();
|
||||
|
||||
// Print table summary
|
||||
printTableSummary(tablesInfo);
|
||||
return errors.summarize();
|
||||
|
@ -2471,6 +2475,15 @@ public class HBaseFsck extends Configured implements Tool {
|
|||
return hbi;
|
||||
}
|
||||
|
||||
private void checkAndFixTableLocks() throws IOException {
|
||||
TableLockChecker checker = new TableLockChecker(createZooKeeperWatcher(), errors);
|
||||
checker.checkTableLocks();
|
||||
|
||||
if (this.fixTableLocks) {
|
||||
checker.fixExpiredTableLocks();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check values in regionInfo for .META.
|
||||
* Check if zero or more than one regions with META are found.
|
||||
|
@ -2897,7 +2910,7 @@ public class HBaseFsck extends Configured implements Tool {
|
|||
FIRST_REGION_STARTKEY_NOT_EMPTY, LAST_REGION_ENDKEY_NOT_EMPTY, DUPE_STARTKEYS,
|
||||
HOLE_IN_REGION_CHAIN, OVERLAP_IN_REGION_CHAIN, REGION_CYCLE, DEGENERATE_REGION,
|
||||
ORPHAN_HDFS_REGION, LINGERING_SPLIT_PARENT, NO_TABLEINFO_FILE, LINGERING_REFERENCE_HFILE,
|
||||
WRONG_USAGE, EMPTY_META_CELL
|
||||
WRONG_USAGE, EMPTY_META_CELL, EXPIRED_TABLE_LOCK
|
||||
}
|
||||
public void clear();
|
||||
public void report(String message);
|
||||
|
@ -3237,6 +3250,14 @@ public class HBaseFsck extends Configured implements Tool {
|
|||
checkMetaOnly = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set table locks fix mode.
|
||||
* Delete table locks held for a long time
|
||||
*/
|
||||
public void setFixTableLocks(boolean shouldFix) {
|
||||
fixTableLocks = shouldFix;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if we should rerun fsck again. This checks if we've tried to
|
||||
* fix something and we should rerun fsck tool again.
|
||||
|
@ -3476,9 +3497,13 @@ public class HBaseFsck extends Configured implements Tool {
|
|||
out.println("");
|
||||
out.println(" Metadata Repair shortcuts");
|
||||
out.println(" -repair Shortcut for -fixAssignments -fixMeta -fixHdfsHoles " +
|
||||
"-fixHdfsOrphans -fixHdfsOverlaps -fixVersionFile -sidelineBigOverlaps -fixReferenceFiles");
|
||||
"-fixHdfsOrphans -fixHdfsOverlaps -fixVersionFile -sidelineBigOverlaps -fixReferenceFiles -fixTableLocks");
|
||||
out.println(" -repairHoles Shortcut for -fixAssignments -fixMeta -fixHdfsHoles");
|
||||
|
||||
out.println("");
|
||||
out.println(" Table lock options");
|
||||
out.println(" -fixTableLocks Deletes table locks held for a long time (hbase.table.lock.expire.ms, 10min by default)");
|
||||
|
||||
out.flush();
|
||||
errors.reportError(ERROR_CODE.WRONG_USAGE, sw.toString());
|
||||
|
||||
|
@ -3603,6 +3628,7 @@ public class HBaseFsck extends Configured implements Tool {
|
|||
setFixSplitParents(false);
|
||||
setCheckHdfs(true);
|
||||
setFixReferenceFiles(true);
|
||||
setFixTableLocks(true);
|
||||
} else if (cmd.equals("-repairHoles")) {
|
||||
// this will make all missing hdfs regions available but may lose data
|
||||
setFixHdfsHoles(true);
|
||||
|
@ -3647,6 +3673,8 @@ public class HBaseFsck extends Configured implements Tool {
|
|||
setSummary();
|
||||
} else if (cmd.equals("-metaonly")) {
|
||||
setCheckMetaOnly();
|
||||
} else if (cmd.equals("-fixTableLocks")) {
|
||||
setFixTableLocks(true);
|
||||
} else if (cmd.startsWith("-")) {
|
||||
errors.reportError(ERROR_CODE.WRONG_USAGE, "Unrecognized option:" + cmd);
|
||||
return printUsageAndExit();
|
||||
|
|
|
@ -62,6 +62,7 @@ public abstract class ZKInterProcessLockBase implements InterProcessLock {
|
|||
protected final ZooKeeperWatcher zkWatcher;
|
||||
protected final String parentLockNode;
|
||||
protected final String fullyQualifiedZNode;
|
||||
protected final String childZNode;
|
||||
protected final byte[] metadata;
|
||||
protected final MetadataHandler handler;
|
||||
|
||||
|
@ -113,18 +114,22 @@ public abstract class ZKInterProcessLockBase implements InterProcessLock {
|
|||
/** Parses sequenceId from the znode name. Zookeeper documentation
|
||||
* states: The sequence number is always fixed length of 10 digits, 0 padded
|
||||
*/
|
||||
public static int getChildSequenceId(String childZNode) {
|
||||
public static long getChildSequenceId(String childZNode) {
|
||||
Preconditions.checkNotNull(childZNode);
|
||||
assert childZNode.length() >= 10;
|
||||
String sequenceIdStr = childZNode.substring(childZNode.length() - 10);
|
||||
return Integer.parseInt(sequenceIdStr);
|
||||
return Long.parseLong(sequenceIdStr);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compare(String zNode1, String zNode2) {
|
||||
int seq1 = getChildSequenceId(zNode1);
|
||||
int seq2 = getChildSequenceId(zNode2);
|
||||
return seq1 - seq2;
|
||||
long seq1 = getChildSequenceId(zNode1);
|
||||
long seq2 = getChildSequenceId(zNode2);
|
||||
if (seq1 == seq2) {
|
||||
return 0;
|
||||
} else {
|
||||
return seq1 < seq2 ? -1 : 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -143,6 +148,7 @@ public abstract class ZKInterProcessLockBase implements InterProcessLock {
|
|||
this.fullyQualifiedZNode = ZKUtil.joinZNode(parentLockNode, childNode);
|
||||
this.metadata = metadata;
|
||||
this.handler = handler;
|
||||
this.childZNode = childNode;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -232,6 +238,17 @@ public abstract class ZKInterProcessLockBase implements InterProcessLock {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a child znode represents a read lock.
|
||||
* @param child The child znode we want to check.
|
||||
* @return whether the child znode represents a read lock
|
||||
*/
|
||||
protected static boolean isChildReadLock(String child) {
|
||||
int idx = child.lastIndexOf(ZKUtil.ZNODE_PATH_SEPARATOR);
|
||||
String suffix = child.substring(idx + 1);
|
||||
return suffix.startsWith(WRITE_LOCK_CHILD_NODE_PREFIX);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a child znode represents a write lock.
|
||||
* @param child The child znode we want to check.
|
||||
|
@ -243,6 +260,17 @@ public abstract class ZKInterProcessLockBase implements InterProcessLock {
|
|||
return suffix.startsWith(WRITE_LOCK_CHILD_NODE_PREFIX);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a child znode represents the same type(read or write) of lock
|
||||
* @param child The child znode we want to check.
|
||||
* @return whether the child znode represents the same type(read or write) of lock
|
||||
*/
|
||||
protected boolean isChildOfSameType(String child) {
|
||||
int idx = child.lastIndexOf(ZKUtil.ZNODE_PATH_SEPARATOR);
|
||||
String suffix = child.substring(idx + 1);
|
||||
return suffix.startsWith(this.childZNode);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update state as to indicate that a lock is held
|
||||
* @param createdZNode The lock znode
|
||||
|
@ -303,32 +331,108 @@ public abstract class ZKInterProcessLockBase implements InterProcessLock {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process metadata stored in a ZNode using a callback
|
||||
* <p>
|
||||
* @param lockZNode The node holding the metadata
|
||||
* @return True if metadata was ready and processed, false otherwise.
|
||||
*/
|
||||
protected boolean handleLockMetadata(String lockZNode) {
|
||||
return handleLockMetadata(lockZNode, handler);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process metadata stored in a ZNode using a callback object passed to
|
||||
* this instance.
|
||||
* <p>
|
||||
* @param lockZNode The node holding the metadata
|
||||
* @return True if metadata was ready and processed
|
||||
* @throws IOException If an unexpected ZooKeeper error occurs
|
||||
* @throws InterruptedException If interrupted when reading the metadata
|
||||
* @param handler the metadata handler
|
||||
* @return True if metadata was ready and processed, false on exception.
|
||||
*/
|
||||
protected boolean handleLockMetadata(String lockZNode)
|
||||
throws IOException, InterruptedException {
|
||||
byte[] metadata = null;
|
||||
try {
|
||||
metadata = ZKUtil.getData(zkWatcher, lockZNode);
|
||||
} catch (KeeperException ex) {
|
||||
LOG.warn("Cannot getData for znode:" + lockZNode, ex);
|
||||
}
|
||||
if (metadata == null) {
|
||||
protected boolean handleLockMetadata(String lockZNode, MetadataHandler handler) {
|
||||
if (handler == null) {
|
||||
return false;
|
||||
}
|
||||
if (handler != null) {
|
||||
try {
|
||||
byte[] metadata = ZKUtil.getData(zkWatcher, lockZNode);
|
||||
handler.handleMetadata(metadata);
|
||||
} catch (KeeperException ex) {
|
||||
LOG.warn("Error processing lock metadata in " + lockZNode);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reapAllLocks() throws IOException {
|
||||
reapExpiredLocks(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Will delete all lock znodes of this type (either read or write) which are "expired"
|
||||
* according to timeout. Assumption is that the clock skew between zookeeper and this servers
|
||||
* is negligible.
|
||||
* Referred in zk recipe as "Revocable Shared Locks with Freaking Laser Beams".
|
||||
* (http://zookeeper.apache.org/doc/trunk/recipes.html).
|
||||
*/
|
||||
public void reapExpiredLocks(long timeout) throws IOException {
|
||||
List<String> children;
|
||||
try {
|
||||
children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode);
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Unexpected ZooKeeper error when listing children", e);
|
||||
throw new IOException("Unexpected ZooKeeper exception", e);
|
||||
}
|
||||
|
||||
KeeperException deferred = null;
|
||||
Stat stat = new Stat();
|
||||
long expireDate = System.currentTimeMillis() - timeout; //we are using cTime in zookeeper
|
||||
for (String child : children) {
|
||||
if (isChildOfSameType(child)) {
|
||||
String znode = ZKUtil.joinZNode(parentLockNode, child);
|
||||
try {
|
||||
ZKUtil.getDataNoWatch(zkWatcher, znode, stat);
|
||||
if (stat.getCtime() < expireDate) {
|
||||
LOG.info("Reaping lock for znode:" + znode);
|
||||
ZKUtil.deleteNodeFailSilent(zkWatcher, znode);
|
||||
}
|
||||
} catch (KeeperException ex) {
|
||||
LOG.warn("Error reaping the znode for write lock :" + znode);
|
||||
deferred = ex;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (deferred != null) {
|
||||
throw new IOException("ZK exception while reaping locks:", deferred);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Visits the locks (both held and attempted) with the given MetadataHandler.
|
||||
* @throws InterruptedException If there is an unrecoverable error
|
||||
*/
|
||||
public void visitLocks(MetadataHandler handler) throws IOException {
|
||||
List<String> children;
|
||||
try {
|
||||
children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode);
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Unexpected ZooKeeper error when listing children", e);
|
||||
throw new IOException("Unexpected ZooKeeper exception", e);
|
||||
}
|
||||
if (children.size() > 0) {
|
||||
for (String child : children) {
|
||||
if (isChildOfSameType(child)) {
|
||||
String znode = ZKUtil.joinZNode(parentLockNode, child);
|
||||
String childWatchesZNode = getLockPath(child, children);
|
||||
if (childWatchesZNode == null) {
|
||||
LOG.info("Lock is held by: " + child);
|
||||
}
|
||||
handleLockMetadata(znode, handler);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine based on a list of children under a ZNode, whether or not a
|
||||
* process which created a specified ZNode has obtained a lock. If a lock is
|
||||
|
@ -343,5 +447,5 @@ public abstract class ZKInterProcessLockBase implements InterProcessLock {
|
|||
* acquired lock.
|
||||
*/
|
||||
protected abstract String getLockPath(String myZNode, List<String> children)
|
||||
throws IOException, InterruptedException;
|
||||
throws IOException;
|
||||
}
|
||||
|
|
|
@ -48,8 +48,7 @@ public class ZKInterProcessReadLock extends ZKInterProcessLockBase {
|
|||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
protected String getLockPath(String createdZNode, List<String> children)
|
||||
throws IOException, InterruptedException {
|
||||
protected String getLockPath(String createdZNode, List<String> children) throws IOException {
|
||||
TreeSet<String> writeChildren =
|
||||
new TreeSet<String>(ZNodeComparator.COMPARATOR);
|
||||
for (String child : children) {
|
||||
|
@ -67,17 +66,8 @@ public class ZKInterProcessReadLock extends ZKInterProcessLockBase {
|
|||
String pathToWatch = lowerChildren.last();
|
||||
String nodeHoldingLock = lowerChildren.first();
|
||||
String znode = ZKUtil.joinZNode(parentLockNode, nodeHoldingLock);
|
||||
try {
|
||||
handleLockMetadata(znode);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Error processing lock metadata in " + nodeHoldingLock, e);
|
||||
}
|
||||
handleLockMetadata(znode);
|
||||
|
||||
return pathToWatch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reapAllLocks() throws IOException {
|
||||
throw new UnsupportedOperationException(
|
||||
"Lock reaping is not supported for ZK based read locks");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* ZooKeeper based write lock:
|
||||
|
@ -47,8 +46,7 @@ public class ZKInterProcessWriteLock extends ZKInterProcessLockBase {
|
|||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
protected String getLockPath(String createdZNode, List<String> children)
|
||||
throws IOException, InterruptedException {
|
||||
protected String getLockPath(String createdZNode, List<String> children) throws IOException {
|
||||
TreeSet<String> sortedChildren =
|
||||
new TreeSet<String>(ZNodeComparator.COMPARATOR);
|
||||
sortedChildren.addAll(children);
|
||||
|
@ -56,43 +54,8 @@ public class ZKInterProcessWriteLock extends ZKInterProcessLockBase {
|
|||
if (pathToWatch != null) {
|
||||
String nodeHoldingLock = sortedChildren.first();
|
||||
String znode = ZKUtil.joinZNode(parentLockNode, nodeHoldingLock);
|
||||
try {
|
||||
handleLockMetadata(znode);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Error processing lock metadata in " + nodeHoldingLock, e);
|
||||
}
|
||||
handleLockMetadata(znode);
|
||||
}
|
||||
return pathToWatch;
|
||||
}
|
||||
|
||||
/**
|
||||
* Referred in zk recipe as "Revocable Shared Locks with Freaking Laser Beams"
|
||||
* (http://zookeeper.apache.org/doc/trunk/recipes.html).
|
||||
*/
|
||||
public void reapAllLocks() throws IOException {
|
||||
List<String> children;
|
||||
try {
|
||||
children = ZKUtil.listChildrenNoWatch(zkWatcher, parentLockNode);
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Unexpected ZooKeeper error when listing children", e);
|
||||
throw new IOException("Unexpected ZooKeeper exception", e);
|
||||
}
|
||||
|
||||
KeeperException deferred = null;
|
||||
for (String child : children) {
|
||||
if (isChildWriteLock(child)) {
|
||||
String znode = ZKUtil.joinZNode(parentLockNode, child);
|
||||
LOG.info("Reaping write lock for znode:" + znode);
|
||||
try {
|
||||
ZKUtil.deleteNodeFailSilent(zkWatcher, znode);
|
||||
} catch (KeeperException ex) {
|
||||
LOG.warn("Error reaping the znode for write lock :" + znode);
|
||||
deferred = ex;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (deferred != null) {
|
||||
throw new IOException("ZK exception while reaping locks:", deferred);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -287,7 +287,7 @@ public class TestTableLockManager {
|
|||
writeLocksAttempted.await();
|
||||
|
||||
//now reap all table locks
|
||||
lockManager.reapAllTableWriteLocks();
|
||||
lockManager.reapWriteLocks();
|
||||
|
||||
TEST_UTIL.getConfiguration().setInt(TableLockManager.TABLE_WRITE_LOCK_TIMEOUT_MS, 0);
|
||||
TableLockManager zeroTimeoutLockManager = TableLockManager.createTableLockManager(
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
|
@ -73,6 +74,8 @@ import org.apache.hadoop.hbase.io.hfile.TestHFile;
|
|||
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.RegionStates;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
|
||||
|
@ -1299,7 +1302,7 @@ public class TestHBaseFsck {
|
|||
// TODO: fixHdfsHoles does not work against splits, since the parent dir lingers on
|
||||
// for some time until children references are deleted. HBCK erroneously sees this as
|
||||
// overlapping regions
|
||||
HBaseFsck hbck = doFsck(conf, true, true, false, false, false, true, true, true, false, null);
|
||||
HBaseFsck hbck = doFsck(conf, true, true, false, false, false, true, true, true, false, false, null);
|
||||
assertErrors(hbck, new ERROR_CODE[] {}); //no LINGERING_SPLIT_PARENT reported
|
||||
|
||||
// assert that the split META entry is still there.
|
||||
|
@ -1361,7 +1364,7 @@ public class TestHBaseFsck {
|
|||
ERROR_CODE.NOT_IN_META_OR_DEPLOYED, ERROR_CODE.HOLE_IN_REGION_CHAIN}); //no LINGERING_SPLIT_PARENT
|
||||
|
||||
// now fix it. The fix should not revert the region split, but add daughters to META
|
||||
hbck = doFsck(conf, true, true, false, false, false, false, false, false, false, null);
|
||||
hbck = doFsck(conf, true, true, false, false, false, false, false, false, false, false, null);
|
||||
assertErrors(hbck, new ERROR_CODE[] {ERROR_CODE.NOT_IN_META_OR_DEPLOYED,
|
||||
ERROR_CODE.NOT_IN_META_OR_DEPLOYED, ERROR_CODE.HOLE_IN_REGION_CHAIN});
|
||||
|
||||
|
@ -1936,6 +1939,71 @@ public class TestHBaseFsck {
|
|||
}
|
||||
}
|
||||
|
||||
@Test(timeout=30000)
|
||||
public void testCheckTableLocks() throws Exception {
|
||||
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(0);
|
||||
EnvironmentEdgeManager.injectEdge(edge);
|
||||
// check no errors
|
||||
HBaseFsck hbck = doFsck(conf, false);
|
||||
assertNoErrors(hbck);
|
||||
|
||||
ServerName mockName = new ServerName("localhost", 60000, 1);
|
||||
|
||||
// obtain one lock
|
||||
final TableLockManager tableLockManager = TableLockManager.createTableLockManager(conf, TEST_UTIL.getZooKeeperWatcher(), mockName);
|
||||
TableLock writeLock = tableLockManager.writeLock(Bytes.toBytes("foo"), "testCheckTableLocks");
|
||||
writeLock.acquire();
|
||||
hbck = doFsck(conf, false);
|
||||
assertNoErrors(hbck); // should not have expired, no problems
|
||||
|
||||
edge.incrementTime(conf.getLong(TableLockManager.TABLE_LOCK_EXPIRE_TIMEOUT,
|
||||
TableLockManager.DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS)); // let table lock expire
|
||||
|
||||
hbck = doFsck(conf, false);
|
||||
assertErrors(hbck, new ERROR_CODE[] {ERROR_CODE.EXPIRED_TABLE_LOCK});
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
new Thread() {
|
||||
public void run() {
|
||||
TableLock readLock = tableLockManager.writeLock(Bytes.toBytes("foo"), "testCheckTableLocks");
|
||||
try {
|
||||
latch.countDown();
|
||||
readLock.acquire();
|
||||
} catch (IOException ex) {
|
||||
fail();
|
||||
} catch (IllegalStateException ex) {
|
||||
return; // expected, since this will be reaped under us.
|
||||
}
|
||||
fail("should not have come here");
|
||||
};
|
||||
}.start();
|
||||
|
||||
latch.await(); // wait until thread starts
|
||||
Threads.sleep(300); // wait some more to ensure writeLock.acquire() is called
|
||||
|
||||
hbck = doFsck(conf, false);
|
||||
assertErrors(hbck, new ERROR_CODE[] {ERROR_CODE.EXPIRED_TABLE_LOCK}); // still one expired, one not-expired
|
||||
|
||||
edge.incrementTime(conf.getLong(TableLockManager.TABLE_LOCK_EXPIRE_TIMEOUT,
|
||||
TableLockManager.DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS)); // let table lock expire
|
||||
|
||||
hbck = doFsck(conf, false);
|
||||
assertErrors(hbck, new ERROR_CODE[] {ERROR_CODE.EXPIRED_TABLE_LOCK, ERROR_CODE.EXPIRED_TABLE_LOCK}); // both are expired
|
||||
|
||||
conf.setLong(TableLockManager.TABLE_LOCK_EXPIRE_TIMEOUT, 1); // reaping from ZKInterProcessWriteLock uses znode cTime,
|
||||
// which is not injectable through EnvironmentEdge
|
||||
Threads.sleep(10);
|
||||
hbck = doFsck(conf, true); // now fix both cases
|
||||
|
||||
hbck = doFsck(conf, false);
|
||||
assertNoErrors(hbck);
|
||||
|
||||
// ensure that locks are deleted
|
||||
writeLock = tableLockManager.writeLock(Bytes.toBytes("foo"), "should acquire without blocking");
|
||||
writeLock.acquire(); // this should not block.
|
||||
writeLock.release(); // release for clean state
|
||||
}
|
||||
|
||||
@org.junit.Rule
|
||||
public TestName name = new TestName();
|
||||
}
|
||||
|
|
|
@ -38,13 +38,13 @@ public class HbckTestingUtil {
|
|||
|
||||
public static HBaseFsck doFsck(
|
||||
Configuration conf, boolean fix, String table) throws Exception {
|
||||
return doFsck(conf, fix, fix, fix, fix,fix, fix, fix, fix, fix, table);
|
||||
return doFsck(conf, fix, fix, fix, fix,fix, fix, fix, fix, fix, fix, table);
|
||||
}
|
||||
|
||||
public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments,
|
||||
boolean fixMeta, boolean fixHdfsHoles, boolean fixHdfsOverlaps,
|
||||
boolean fixHdfsOrphans, boolean fixTableOrphans, boolean fixVersionFile,
|
||||
boolean fixReferenceFiles, boolean fixEmptyMetaRegionInfo, String table) throws Exception {
|
||||
boolean fixReferenceFiles, boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, String table) throws Exception {
|
||||
HBaseFsck fsck = new HBaseFsck(conf, exec);
|
||||
fsck.connect();
|
||||
fsck.setDisplayFullReport(); // i.e. -details
|
||||
|
@ -58,6 +58,7 @@ public class HbckTestingUtil {
|
|||
fsck.setFixVersionFile(fixVersionFile);
|
||||
fsck.setFixReferenceFiles(fixReferenceFiles);
|
||||
fsck.setFixEmptyMetaCells(fixEmptyMetaRegionInfo);
|
||||
fsck.setFixTableLocks(fixTableLocks);
|
||||
if (table != null) {
|
||||
fsck.includeTable(table);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue