HBASE-11108 Split ZKTable into interface and implementation (Mikhail Antononv)
This commit is contained in:
parent
33f842855a
commit
c5d5a5d1bc
|
@ -0,0 +1,46 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.exceptions.HBaseException;
|
||||
|
||||
/**
|
||||
* Thrown by operations requiring coordination state access or manipulation
|
||||
* when internal error within coordination engine (or other internal implementation) occurs.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@SuppressWarnings("serial")
|
||||
public class CoordinatedStateException extends HBaseException {
|
||||
public CoordinatedStateException() {
|
||||
super();
|
||||
}
|
||||
|
||||
public CoordinatedStateException(final String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
public CoordinatedStateException(final String message, final Throwable t) {
|
||||
super(message, t);
|
||||
}
|
||||
|
||||
public CoordinatedStateException(final Throwable t) {
|
||||
super(t);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Implementations of this interface will keep and return to clients
|
||||
* implementations of classes providing API to execute
|
||||
* coordinated operations. This interface is client-side, so it does NOT
|
||||
* include methods to retrieve the particular interface implementations.
|
||||
*
|
||||
* For each coarse-grained area of operations there will be a separate
|
||||
* interface with implementation, providing API for relevant operations
|
||||
* requiring coordination.
|
||||
*
|
||||
* Property hbase.coordinated.state.manager.class in hbase-site.xml controls
|
||||
* which provider to use.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface CoordinatedStateManager {
|
||||
|
||||
/**
|
||||
* Initialize coordinated state management service.
|
||||
* @param server server instance to run within.
|
||||
*/
|
||||
void initialize(Server server);
|
||||
|
||||
/**
|
||||
* Starts service.
|
||||
*/
|
||||
void start();
|
||||
|
||||
/**
|
||||
* Stops service.
|
||||
*/
|
||||
void stop();
|
||||
|
||||
/**
|
||||
* @return instance of Server coordinated state manager runs within
|
||||
*/
|
||||
Server getServer();
|
||||
|
||||
/**
|
||||
* Returns implementation of TableStateManager.
|
||||
* @throws InterruptedException if operation is interrupted
|
||||
* @throws CoordinatedStateException if error happens in underlying coordination mechanism
|
||||
*/
|
||||
TableStateManager getTableStateManager() throws InterruptedException,
|
||||
CoordinatedStateException;
|
||||
}
|
|
@ -0,0 +1,115 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Helper class for table state management for operations running inside
|
||||
* RegionServer or HMaster.
|
||||
* Depending on implementation, fetches information from HBase system table,
|
||||
* local data store, ZooKeeper ensemble or somewhere else.
|
||||
* Code running on client side (with no coordinated state context) shall instead use
|
||||
* {@link org.apache.hadoop.hbase.zookeeper.ZKTableStateClientSideReader}
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface TableStateManager {
|
||||
|
||||
/**
|
||||
* Sets the table into desired state. Fails silently if the table is already in this state.
|
||||
* @param tableName table to process
|
||||
* @param state new state of this table
|
||||
* @throws CoordinatedStateException if error happened when trying to set table state
|
||||
*/
|
||||
void setTableState(TableName tableName, ZooKeeperProtos.Table.State state)
|
||||
throws CoordinatedStateException;
|
||||
|
||||
/**
|
||||
* Sets the specified table into the newState, but only if the table is already in
|
||||
* one of the possibleCurrentStates (otherwise no operation is performed).
|
||||
* @param tableName table to process
|
||||
* @param newState new state for the table
|
||||
* @param states table should be in one of these states for the operation
|
||||
* to be performed
|
||||
* @throws CoordinatedStateException if error happened while performing operation
|
||||
* @return true if operation succeeded, false otherwise
|
||||
*/
|
||||
boolean setTableStateIfInStates(TableName tableName, ZooKeeperProtos.Table.State newState,
|
||||
ZooKeeperProtos.Table.State... states)
|
||||
throws CoordinatedStateException;
|
||||
|
||||
/**
|
||||
* Sets the specified table into the newState, but only if the table is NOT in
|
||||
* one of the possibleCurrentStates (otherwise no operation is performed).
|
||||
* @param tableName table to process
|
||||
* @param newState new state for the table
|
||||
* @param states table should NOT be in one of these states for the operation
|
||||
* to be performed
|
||||
* @throws CoordinatedStateException if error happened while performing operation
|
||||
* @return true if operation succeeded, false otherwise
|
||||
*/
|
||||
boolean setTableStateIfNotInStates(TableName tableName, ZooKeeperProtos.Table.State newState,
|
||||
ZooKeeperProtos.Table.State... states)
|
||||
throws CoordinatedStateException;
|
||||
|
||||
/**
|
||||
* @return true if the table is in any one of the listed states, false otherwise.
|
||||
*/
|
||||
boolean isTableState(TableName tableName, ZooKeeperProtos.Table.State... states);
|
||||
|
||||
/**
|
||||
* Mark table as deleted. Fails silently if the table is not currently marked as disabled.
|
||||
* @param tableName table to be deleted
|
||||
* @throws CoordinatedStateException if error happened while performing operation
|
||||
*/
|
||||
void setDeletedTable(TableName tableName) throws CoordinatedStateException;
|
||||
|
||||
/**
|
||||
* Checks if table is present.
|
||||
*
|
||||
* @param tableName table we're checking
|
||||
* @return true if the table is present, false otherwise
|
||||
*/
|
||||
boolean isTablePresent(TableName tableName);
|
||||
|
||||
/**
|
||||
* @return set of tables which are in any one of the listed states, empty Set if none
|
||||
*/
|
||||
Set<TableName> getTablesInStates(ZooKeeperProtos.Table.State... states)
|
||||
throws InterruptedIOException, CoordinatedStateException;
|
||||
|
||||
/**
|
||||
* If the table is found in the given state the in-memory state is removed. This
|
||||
* helps in cases where CreateTable is to be retried by the client in case of
|
||||
* failures. If deletePermanentState is true - the flag kept permanently is
|
||||
* also reset.
|
||||
*
|
||||
* @param tableName table we're working on
|
||||
* @param states if table isn't in any one of these states, operation aborts
|
||||
* @param deletePermanentState if true, reset the permanent flag
|
||||
* @throws CoordinatedStateException if error happened in underlying coordination engine
|
||||
*/
|
||||
void checkAndRemoveTableState(TableName tableName, ZooKeeperProtos.Table.State states,
|
||||
boolean deletePermanentState)
|
||||
throws CoordinatedStateException;
|
||||
}
|
|
@ -0,0 +1,168 @@
|
|||
/**
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Non-instantiable class that provides helper functions to learn
|
||||
* about HBase table state for code running on client side (hence, not having
|
||||
* access to consensus context).
|
||||
*
|
||||
* Doesn't cache any table state, just goes directly to ZooKeeper.
|
||||
* TODO: decouple this class from ZooKeeper.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ZKTableStateClientSideReader {
|
||||
|
||||
private ZKTableStateClientSideReader() {}
|
||||
|
||||
/**
|
||||
* Go to zookeeper and see if state of table is {@code ZooKeeperProtos.Table.State#DISABLED}.
|
||||
* This method does not use cache.
|
||||
* This method is for clients other than AssignmentManager
|
||||
* @param zkw ZooKeeperWatcher instance to use
|
||||
* @param tableName table we're checking
|
||||
* @return True if table is enabled.
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public static boolean isDisabledTable(final ZooKeeperWatcher zkw,
|
||||
final TableName tableName)
|
||||
throws KeeperException, InterruptedException {
|
||||
ZooKeeperProtos.Table.State state = getTableState(zkw, tableName);
|
||||
return isTableState(ZooKeeperProtos.Table.State.DISABLED, state);
|
||||
}
|
||||
|
||||
/**
|
||||
* Go to zookeeper and see if state of table is {@code ZooKeeperProtos.Table.State#ENABLED}.
|
||||
* This method does not use cache.
|
||||
* This method is for clients other than AssignmentManager
|
||||
* @param zkw ZooKeeperWatcher instance to use
|
||||
* @param tableName table we're checking
|
||||
* @return True if table is enabled.
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public static boolean isEnabledTable(final ZooKeeperWatcher zkw,
|
||||
final TableName tableName)
|
||||
throws KeeperException, InterruptedException {
|
||||
return getTableState(zkw, tableName) == ZooKeeperProtos.Table.State.ENABLED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Go to zookeeper and see if state of table is {@code ZooKeeperProtos.Table.State#DISABLING}
|
||||
* of {@code ZooKeeperProtos.Table.State#DISABLED}.
|
||||
* This method does not use cache.
|
||||
* This method is for clients other than AssignmentManager.
|
||||
* @param zkw ZooKeeperWatcher instance to use
|
||||
* @param tableName table we're checking
|
||||
* @return True if table is enabled.
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public static boolean isDisablingOrDisabledTable(final ZooKeeperWatcher zkw,
|
||||
final TableName tableName)
|
||||
throws KeeperException, InterruptedException {
|
||||
ZooKeeperProtos.Table.State state = getTableState(zkw, tableName);
|
||||
return isTableState(ZooKeeperProtos.Table.State.DISABLING, state) ||
|
||||
isTableState(ZooKeeperProtos.Table.State.DISABLED, state);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a list of all the tables set as disabled in zookeeper.
|
||||
* @return Set of disabled tables, empty Set if none
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public static Set<TableName> getDisabledTables(ZooKeeperWatcher zkw)
|
||||
throws KeeperException, InterruptedException {
|
||||
Set<TableName> disabledTables = new HashSet<TableName>();
|
||||
List<String> children =
|
||||
ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode);
|
||||
for (String child: children) {
|
||||
TableName tableName =
|
||||
TableName.valueOf(child);
|
||||
ZooKeeperProtos.Table.State state = getTableState(zkw, tableName);
|
||||
if (state == ZooKeeperProtos.Table.State.DISABLED) disabledTables.add(tableName);
|
||||
}
|
||||
return disabledTables;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a list of all the tables set as disabled in zookeeper.
|
||||
* @return Set of disabled tables, empty Set if none
|
||||
* @throws KeeperException
|
||||
*/
|
||||
public static Set<TableName> getDisabledOrDisablingTables(ZooKeeperWatcher zkw)
|
||||
throws KeeperException, InterruptedException {
|
||||
Set<TableName> disabledTables = new HashSet<TableName>();
|
||||
List<String> children =
|
||||
ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode);
|
||||
for (String child: children) {
|
||||
TableName tableName =
|
||||
TableName.valueOf(child);
|
||||
ZooKeeperProtos.Table.State state = getTableState(zkw, tableName);
|
||||
if (state == ZooKeeperProtos.Table.State.DISABLED ||
|
||||
state == ZooKeeperProtos.Table.State.DISABLING)
|
||||
disabledTables.add(tableName);
|
||||
}
|
||||
return disabledTables;
|
||||
}
|
||||
|
||||
static boolean isTableState(final ZooKeeperProtos.Table.State expectedState,
|
||||
final ZooKeeperProtos.Table.State currentState) {
|
||||
return currentState != null && currentState.equals(expectedState);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param zkw ZooKeeperWatcher instance to use
|
||||
* @param tableName table we're checking
|
||||
* @return Null or {@link ZooKeeperProtos.Table.State} found in znode.
|
||||
* @throws KeeperException
|
||||
*/
|
||||
static ZooKeeperProtos.Table.State getTableState(final ZooKeeperWatcher zkw,
|
||||
final TableName tableName)
|
||||
throws KeeperException, InterruptedException {
|
||||
String znode = ZKUtil.joinZNode(zkw.tableZNode, tableName.getNameAsString());
|
||||
byte [] data = ZKUtil.getData(zkw, znode);
|
||||
if (data == null || data.length <= 0) return null;
|
||||
try {
|
||||
ProtobufUtil.expectPBMagicPrefix(data);
|
||||
ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder();
|
||||
int magicLen = ProtobufUtil.lengthOfPBMagic();
|
||||
ZooKeeperProtos.Table t = builder.mergeFrom(data, magicLen, data.length - magicLen).build();
|
||||
return t.getState();
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
KeeperException ke = new KeeperException.DataInconsistencyException();
|
||||
ke.initCause(e);
|
||||
throw ke;
|
||||
} catch (DeserializationException e) {
|
||||
throw ZKUtil.convert(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,330 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import com.google.protobuf.InvalidProtocolBufferException;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableStateManager;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Implementation of TableStateManager which reads, caches and sets state
|
||||
* up in ZooKeeper. If multiple read/write clients, will make for confusion.
|
||||
* Code running on client side without consensus context should use
|
||||
* {@link ZKTableStateClientSideReader} instead.
|
||||
*
|
||||
* <p>To save on trips to the zookeeper ensemble, internally we cache table
|
||||
* state.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ZKTableStateManager implements TableStateManager {
|
||||
// A znode will exist under the table directory if it is in any of the
|
||||
// following states: {@link TableState#ENABLING} , {@link TableState#DISABLING},
|
||||
// or {@link TableState#DISABLED}. If {@link TableState#ENABLED}, there will
|
||||
// be no entry for a table in zk. Thats how it currently works.
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(ZKTableStateManager.class);
|
||||
private final ZooKeeperWatcher watcher;
|
||||
|
||||
/**
|
||||
* Cache of what we found in zookeeper so we don't have to go to zk ensemble
|
||||
* for every query. Synchronize access rather than use concurrent Map because
|
||||
* synchronization needs to span query of zk.
|
||||
*/
|
||||
private final Map<TableName, ZooKeeperProtos.Table.State> cache =
|
||||
new HashMap<TableName, ZooKeeperProtos.Table.State>();
|
||||
|
||||
public ZKTableStateManager(final ZooKeeperWatcher zkw) throws KeeperException,
|
||||
InterruptedException {
|
||||
super();
|
||||
this.watcher = zkw;
|
||||
populateTableStates();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a list of all the tables set as disabled in zookeeper.
|
||||
* @throws KeeperException, InterruptedException
|
||||
*/
|
||||
private void populateTableStates() throws KeeperException, InterruptedException {
|
||||
synchronized (this.cache) {
|
||||
List<String> children = ZKUtil.listChildrenNoWatch(this.watcher, this.watcher.tableZNode);
|
||||
if (children == null) return;
|
||||
for (String child: children) {
|
||||
TableName tableName = TableName.valueOf(child);
|
||||
ZooKeeperProtos.Table.State state = getTableState(this.watcher, tableName);
|
||||
if (state != null) this.cache.put(tableName, state);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets table state in ZK. Sets no watches.
|
||||
*
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void setTableState(TableName tableName, ZooKeeperProtos.Table.State state)
|
||||
throws CoordinatedStateException {
|
||||
synchronized (this.cache) {
|
||||
LOG.warn("Moving table " + tableName + " state from " + this.cache.get(tableName)
|
||||
+ " to " + state);
|
||||
try {
|
||||
setTableStateInZK(tableName, state);
|
||||
} catch (KeeperException e) {
|
||||
throw new CoordinatedStateException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks and sets table state in ZK. Sets no watches.
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public boolean setTableStateIfInStates(TableName tableName,
|
||||
ZooKeeperProtos.Table.State newState,
|
||||
ZooKeeperProtos.Table.State... states)
|
||||
throws CoordinatedStateException {
|
||||
synchronized (this.cache) {
|
||||
// Transition ENABLED->DISABLING has to be performed with a hack, because
|
||||
// we treat empty state as enabled in this case because 0.92- clusters.
|
||||
if (
|
||||
(newState == ZooKeeperProtos.Table.State.DISABLING) &&
|
||||
this.cache.get(tableName) != null && !isTableState(tableName, states) ||
|
||||
(newState != ZooKeeperProtos.Table.State.DISABLING &&
|
||||
!isTableState(tableName, states) )) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
setTableStateInZK(tableName, newState);
|
||||
} catch (KeeperException e) {
|
||||
throw new CoordinatedStateException(e);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks and sets table state in ZK. Sets no watches.
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public boolean setTableStateIfNotInStates(TableName tableName,
|
||||
ZooKeeperProtos.Table.State newState,
|
||||
ZooKeeperProtos.Table.State... states)
|
||||
throws CoordinatedStateException {
|
||||
synchronized (this.cache) {
|
||||
if (isTableState(tableName, states)) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
setTableStateInZK(tableName, newState);
|
||||
} catch (KeeperException e) {
|
||||
throw new CoordinatedStateException(e);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private void setTableStateInZK(final TableName tableName,
|
||||
final ZooKeeperProtos.Table.State state)
|
||||
throws KeeperException {
|
||||
String znode = ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString());
|
||||
if (ZKUtil.checkExists(this.watcher, znode) == -1) {
|
||||
ZKUtil.createAndFailSilent(this.watcher, znode);
|
||||
}
|
||||
synchronized (this.cache) {
|
||||
ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder();
|
||||
builder.setState(state);
|
||||
byte [] data = ProtobufUtil.prependPBMagic(builder.build().toByteArray());
|
||||
ZKUtil.setData(this.watcher, znode, data);
|
||||
this.cache.put(tableName, state);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if table is marked in specified state in ZK.
|
||||
*
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public boolean isTableState(final TableName tableName,
|
||||
final ZooKeeperProtos.Table.State... states) {
|
||||
synchronized (this.cache) {
|
||||
ZooKeeperProtos.Table.State currentState = this.cache.get(tableName);
|
||||
return isTableInState(Arrays.asList(states), currentState);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Deletes the table in zookeeper. Fails silently if the
|
||||
* table is not currently disabled in zookeeper. Sets no watches.
|
||||
*
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void setDeletedTable(final TableName tableName)
|
||||
throws CoordinatedStateException {
|
||||
synchronized (this.cache) {
|
||||
if (this.cache.remove(tableName) == null) {
|
||||
LOG.warn("Moving table " + tableName + " state to deleted but was " +
|
||||
"already deleted");
|
||||
}
|
||||
try {
|
||||
ZKUtil.deleteNodeFailSilent(this.watcher,
|
||||
ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString()));
|
||||
} catch (KeeperException e) {
|
||||
throw new CoordinatedStateException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* check if table is present.
|
||||
*
|
||||
* @param tableName table we're working on
|
||||
* @return true if the table is present
|
||||
*/
|
||||
@Override
|
||||
public boolean isTablePresent(final TableName tableName) {
|
||||
synchronized (this.cache) {
|
||||
ZooKeeperProtos.Table.State state = this.cache.get(tableName);
|
||||
return !(state == null);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a list of all the tables set as disabling in zookeeper.
|
||||
* @return Set of disabling tables, empty Set if none
|
||||
* @throws CoordinatedStateException if error happened in underlying coordination engine
|
||||
*/
|
||||
@Override
|
||||
public Set<TableName> getTablesInStates(ZooKeeperProtos.Table.State... states)
|
||||
throws InterruptedIOException, CoordinatedStateException {
|
||||
try {
|
||||
return getAllTables(states);
|
||||
} catch (KeeperException e) {
|
||||
throw new CoordinatedStateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void checkAndRemoveTableState(TableName tableName, ZooKeeperProtos.Table.State states,
|
||||
boolean deletePermanentState)
|
||||
throws CoordinatedStateException {
|
||||
synchronized (this.cache) {
|
||||
if (isTableState(tableName, states)) {
|
||||
this.cache.remove(tableName);
|
||||
if (deletePermanentState) {
|
||||
try {
|
||||
ZKUtil.deleteNodeFailSilent(this.watcher,
|
||||
ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString()));
|
||||
} catch (KeeperException e) {
|
||||
throw new CoordinatedStateException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a list of all the tables of specified states in zookeeper.
|
||||
* @return Set of tables of specified states, empty Set if none
|
||||
* @throws KeeperException
|
||||
*/
|
||||
Set<TableName> getAllTables(final ZooKeeperProtos.Table.State... states)
|
||||
throws KeeperException, InterruptedIOException {
|
||||
|
||||
Set<TableName> allTables = new HashSet<TableName>();
|
||||
List<String> children =
|
||||
ZKUtil.listChildrenNoWatch(watcher, watcher.tableZNode);
|
||||
if(children == null) return allTables;
|
||||
for (String child: children) {
|
||||
TableName tableName = TableName.valueOf(child);
|
||||
ZooKeeperProtos.Table.State state;
|
||||
try {
|
||||
state = getTableState(watcher, tableName);
|
||||
} catch (InterruptedException e) {
|
||||
throw new InterruptedIOException();
|
||||
}
|
||||
for (ZooKeeperProtos.Table.State expectedState: states) {
|
||||
if (state == expectedState) {
|
||||
allTables.add(tableName);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return allTables;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets table state from ZK.
|
||||
* @param zkw ZooKeeperWatcher instance to use
|
||||
* @param tableName table we're checking
|
||||
* @return Null or {@link ZooKeeperProtos.Table.State} found in znode.
|
||||
* @throws KeeperException
|
||||
*/
|
||||
private ZooKeeperProtos.Table.State getTableState(final ZooKeeperWatcher zkw,
|
||||
final TableName tableName)
|
||||
throws KeeperException, InterruptedException {
|
||||
String znode = ZKUtil.joinZNode(zkw.tableZNode, tableName.getNameAsString());
|
||||
byte [] data = ZKUtil.getData(zkw, znode);
|
||||
if (data == null || data.length <= 0) return null;
|
||||
try {
|
||||
ProtobufUtil.expectPBMagicPrefix(data);
|
||||
ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder();
|
||||
int magicLen = ProtobufUtil.lengthOfPBMagic();
|
||||
ZooKeeperProtos.Table t = builder.mergeFrom(data, magicLen, data.length - magicLen).build();
|
||||
return t.getState();
|
||||
} catch (InvalidProtocolBufferException e) {
|
||||
KeeperException ke = new KeeperException.DataInconsistencyException();
|
||||
ke.initCause(e);
|
||||
throw ke;
|
||||
} catch (DeserializationException e) {
|
||||
throw ZKUtil.convert(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if current state isn't null and is contained
|
||||
* in the list of expected states.
|
||||
*/
|
||||
private boolean isTableInState(final List<ZooKeeperProtos.Table.State> expectedStates,
|
||||
final ZooKeeperProtos.Table.State currentState) {
|
||||
return currentState != null && expectedStates.contains(currentState);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.consensus.ZkCoordinatedStateManager;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
/**
|
||||
* Creates instance of {@link CoordinatedStateManager}
|
||||
* based on configuration.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class CoordinatedStateManagerFactory {
|
||||
|
||||
/**
|
||||
* Creates consensus provider from the given configuration.
|
||||
* @param conf Configuration
|
||||
* @return Implementation of {@link CoordinatedStateManager}
|
||||
*/
|
||||
public static CoordinatedStateManager getCoordinatedStateManager(Configuration conf) {
|
||||
Class<? extends CoordinatedStateManager> coordinatedStateMgrKlass =
|
||||
conf.getClass(HConstants.HBASE_COORDINATED_STATE_MANAGER_CLASS,
|
||||
ZkCoordinatedStateManager.class, CoordinatedStateManager.class);
|
||||
return ReflectionUtils.newInstance(coordinatedStateMgrKlass, conf);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.consensus;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateException;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.TableStateManager;
|
||||
|
||||
/**
|
||||
* Base class for {@link org.apache.hadoop.hbase.CoordinatedStateManager} implementations.
|
||||
* Defines methods to retrieve consensus objects for relevant areas. CoordinatedStateManager
|
||||
* reference returned from Server interface has to be casted to this type to
|
||||
* access those methods.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class BaseCoordinatedStateManager implements CoordinatedStateManager {
|
||||
|
||||
@Override
|
||||
public void initialize(Server server) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Server getServer() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public abstract TableStateManager getTableStateManager() throws InterruptedException,
|
||||
CoordinatedStateException;
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.consensus;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateException;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.TableStateManager;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKTableStateManager;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.CoordinatedStateManager}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ZkCoordinatedStateManager extends BaseCoordinatedStateManager {
|
||||
private static final Log LOG = LogFactory.getLog(ZkCoordinatedStateManager.class);
|
||||
private Server server;
|
||||
private ZooKeeperWatcher watcher;
|
||||
|
||||
@Override
|
||||
public void initialize(Server server) {
|
||||
this.server = server;
|
||||
this.watcher = server.getZooKeeper();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Server getServer() {
|
||||
return server;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableStateManager getTableStateManager() throws InterruptedException,
|
||||
CoordinatedStateException {
|
||||
try {
|
||||
return new ZKTableStateManager(server.getZooKeeper());
|
||||
} catch (KeeperException e) {
|
||||
throw new CoordinatedStateException(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,114 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.zookeeper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.CoordinatedStateException;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableStateManager;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
public class TestZKTableStateManager {
|
||||
private static final Log LOG = LogFactory.getLog(TestZKTableStateManager.class);
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniZKCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniZKCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTableStates()
|
||||
throws CoordinatedStateException, IOException, KeeperException, InterruptedException {
|
||||
final TableName name =
|
||||
TableName.valueOf("testDisabled");
|
||||
Abortable abortable = new Abortable() {
|
||||
@Override
|
||||
public void abort(String why, Throwable e) {
|
||||
LOG.info(why, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAborted() {
|
||||
return false;
|
||||
}
|
||||
|
||||
};
|
||||
ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
|
||||
name.getNameAsString(), abortable, true);
|
||||
TableStateManager zkt = new ZKTableStateManager(zkw);
|
||||
assertFalse(zkt.isTableState(name, Table.State.ENABLED));
|
||||
assertFalse(zkt.isTableState(name, Table.State.DISABLING));
|
||||
assertFalse(zkt.isTableState(name, Table.State.DISABLED));
|
||||
assertFalse(zkt.isTableState(name, Table.State.ENABLING));
|
||||
assertFalse(zkt.isTableState(name, Table.State.DISABLED, Table.State.DISABLING));
|
||||
assertFalse(zkt.isTableState(name, Table.State.DISABLED, Table.State.ENABLING));
|
||||
assertFalse(zkt.isTablePresent(name));
|
||||
zkt.setTableState(name, Table.State.DISABLING);
|
||||
assertTrue(zkt.isTableState(name, Table.State.DISABLING));
|
||||
assertTrue(zkt.isTableState(name, Table.State.DISABLED, Table.State.DISABLING));
|
||||
assertFalse(zkt.getTablesInStates(Table.State.DISABLED).contains(name));
|
||||
assertTrue(zkt.isTablePresent(name));
|
||||
zkt.setTableState(name, Table.State.DISABLED);
|
||||
assertTrue(zkt.isTableState(name, Table.State.DISABLED));
|
||||
assertTrue(zkt.isTableState(name, Table.State.DISABLED, Table.State.DISABLING));
|
||||
assertFalse(zkt.isTableState(name, Table.State.DISABLING));
|
||||
assertTrue(zkt.getTablesInStates(Table.State.DISABLED).contains(name));
|
||||
assertTrue(zkt.isTablePresent(name));
|
||||
zkt.setTableState(name, Table.State.ENABLING);
|
||||
assertTrue(zkt.isTableState(name, Table.State.ENABLING));
|
||||
assertTrue(zkt.isTableState(name, Table.State.DISABLED, Table.State.ENABLING));
|
||||
assertFalse(zkt.isTableState(name, Table.State.DISABLED));
|
||||
assertFalse(zkt.getTablesInStates(Table.State.DISABLED).contains(name));
|
||||
assertTrue(zkt.isTablePresent(name));
|
||||
zkt.setTableState(name, Table.State.ENABLED);
|
||||
assertTrue(zkt.isTableState(name, Table.State.ENABLED));
|
||||
assertFalse(zkt.isTableState(name, Table.State.ENABLING));
|
||||
assertTrue(zkt.isTablePresent(name));
|
||||
zkt.setDeletedTable(name);
|
||||
assertFalse(zkt.isTableState(name, Table.State.ENABLED));
|
||||
assertFalse(zkt.isTableState(name, Table.State.DISABLING));
|
||||
assertFalse(zkt.isTableState(name, Table.State.DISABLED));
|
||||
assertFalse(zkt.isTableState(name, Table.State.ENABLING));
|
||||
assertFalse(zkt.isTableState(name, Table.State.DISABLED, Table.State.DISABLING));
|
||||
assertFalse(zkt.isTableState(name, Table.State.DISABLED, Table.State.ENABLING));
|
||||
assertFalse(zkt.isTablePresent(name));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue