HBASE-11108 Split ZKTable into interface and implementation (Mikhail Antononv)

This commit is contained in:
Michael Stack 2014-05-22 16:15:51 -07:00
parent c5d5a5d1bc
commit ea0731d60f
69 changed files with 431 additions and 1183 deletions

View File

@ -1,58 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Implementations of this interface will keep and return to clients
* implementations of classes providing API to execute
* coordinated operations. This interface is client-sise, so it does NOT
* include method to retrieve the particular consensus providers.
*
* For each coarse-grained area of operations there will be a separate
* interface with implementation, providing API for relevant operations
* requiring coordination.
*
* Property hbase.consensus.provider.class in hbase-site.xml controls
* which provider to use.
*/
@InterfaceAudience.Private
public interface ConsensusProvider {
/**
* Initialize consensus service.
* @param server server instance to run within.
*/
void initialize(Server server);
/**
* Starts consensus service.
*/
void start();
/**
* Stop consensus provider.
*/
void stop();
/**
* @return instance of Server consensus runs within
*/
Server getServer();
}

View File

@ -50,7 +50,7 @@ public interface Server extends Abortable, Stoppable {
ServerName getServerName();
/**
* Get ConsensusProvider instance for this server.
* Get CoordinatedStateManager instance for this server.
*/
ConsensusProvider getConsensusProvider();
CoordinatedStateManager getCoordinatedStateManager();
}

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly;
import org.apache.hadoop.hbase.zookeeper.ZKTableStateClientSideReader;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.zookeeper.KeeperException;
@ -101,9 +101,9 @@ class ZooKeeperRegistry implements Registry {
ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher();
try {
if (enabled) {
return ZKTableReadOnly.isEnabledTable(zkw, tableName);
return ZKTableStateClientSideReader.isEnabledTable(zkw, tableName);
}
return ZKTableReadOnly.isDisabledTable(zkw, tableName);
return ZKTableStateClientSideReader.isDisabledTable(zkw, tableName);
} catch (KeeperException e) {
throw new IOException("Enable/Disable failed", e);
} catch (InterruptedException e) {

View File

@ -1,406 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.zookeeper.KeeperException;
import java.io.InterruptedIOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Helper class for table state tracking for use by AssignmentManager.
* Reads, caches and sets state up in zookeeper. If multiple read/write
* clients, will make for confusion. Read-only clients other than
* AssignmentManager interested in learning table state can use the
* read-only utility methods in {@link ZKTableReadOnly}.
*
* <p>To save on trips to the zookeeper ensemble, internally we cache table
* state.
*/
@InterfaceAudience.Private
public class ZKTable {
// A znode will exist under the table directory if it is in any of the
// following states: {@link TableState#ENABLING} , {@link TableState#DISABLING},
// or {@link TableState#DISABLED}. If {@link TableState#ENABLED}, there will
// be no entry for a table in zk. Thats how it currently works.
private static final Log LOG = LogFactory.getLog(ZKTable.class);
private final ZooKeeperWatcher watcher;
/**
* Cache of what we found in zookeeper so we don't have to go to zk ensemble
* for every query. Synchronize access rather than use concurrent Map because
* synchronization needs to span query of zk.
*/
private final Map<TableName, ZooKeeperProtos.Table.State> cache =
new HashMap<TableName, ZooKeeperProtos.Table.State>();
// TODO: Make it so always a table znode. Put table schema here as well as table state.
// Have watcher on table znode so all are notified of state or schema change.
public ZKTable(final ZooKeeperWatcher zkw) throws KeeperException, InterruptedException {
super();
this.watcher = zkw;
populateTableStates();
}
/**
* Gets a list of all the tables set as disabled in zookeeper.
* @throws KeeperException
*/
private void populateTableStates()
throws KeeperException, InterruptedException {
synchronized (this.cache) {
List<String> children = ZKUtil.listChildrenNoWatch(this.watcher, this.watcher.tableZNode);
if (children == null) return;
for (String child: children) {
TableName tableName = TableName.valueOf(child);
ZooKeeperProtos.Table.State state = ZKTableReadOnly.getTableState(this.watcher, tableName);
if (state != null) this.cache.put(tableName, state);
}
}
}
/**
* Sets the specified table as DISABLED in zookeeper. Fails silently if the
* table is already disabled in zookeeper. Sets no watches.
* @param tableName
* @throws KeeperException unexpected zookeeper exception
*/
public void setDisabledTable(TableName tableName)
throws KeeperException {
synchronized (this.cache) {
if (!isDisablingOrDisabledTable(tableName)) {
LOG.warn("Moving table " + tableName + " state to disabled but was " +
"not first in disabling state: " + this.cache.get(tableName));
}
setTableState(tableName, ZooKeeperProtos.Table.State.DISABLED);
}
}
/**
* Sets the specified table as DISABLING in zookeeper. Fails silently if the
* table is already disabled in zookeeper. Sets no watches.
* @param tableName
* @throws KeeperException unexpected zookeeper exception
*/
public void setDisablingTable(final TableName tableName)
throws KeeperException {
synchronized (this.cache) {
if (!isEnabledOrDisablingTable(tableName)) {
LOG.warn("Moving table " + tableName + " state to disabling but was " +
"not first in enabled state: " + this.cache.get(tableName));
}
setTableState(tableName, ZooKeeperProtos.Table.State.DISABLING);
}
}
/**
* Sets the specified table as ENABLING in zookeeper. Fails silently if the
* table is already disabled in zookeeper. Sets no watches.
* @param tableName
* @throws KeeperException unexpected zookeeper exception
*/
public void setEnablingTable(final TableName tableName)
throws KeeperException {
synchronized (this.cache) {
if (!isDisabledOrEnablingTable(tableName)) {
LOG.warn("Moving table " + tableName + " state to enabling but was " +
"not first in disabled state: " + this.cache.get(tableName));
}
setTableState(tableName, ZooKeeperProtos.Table.State.ENABLING);
}
}
/**
* Sets the specified table as ENABLING in zookeeper atomically
* If the table is already in ENABLING state, no operation is performed
* @param tableName
* @return if the operation succeeds or not
* @throws KeeperException unexpected zookeeper exception
*/
public boolean checkAndSetEnablingTable(final TableName tableName)
throws KeeperException {
synchronized (this.cache) {
if (isEnablingOrEnabledTable(tableName)) {
return false;
}
setTableState(tableName, ZooKeeperProtos.Table.State.ENABLING);
return true;
}
}
/**
* Sets the specified table as ENABLING in zookeeper atomically
* If the table isn't in DISABLED state, no operation is performed
* @param tableName
* @return if the operation succeeds or not
* @throws KeeperException unexpected zookeeper exception
*/
public boolean checkDisabledAndSetEnablingTable(final TableName tableName)
throws KeeperException {
synchronized (this.cache) {
if (!isDisabledTable(tableName)) {
return false;
}
setTableState(tableName, ZooKeeperProtos.Table.State.ENABLING);
return true;
}
}
/**
* Sets the specified table as DISABLING in zookeeper atomically
* If the table isn't in ENABLED state, no operation is performed
* @param tableName
* @return if the operation succeeds or not
* @throws KeeperException unexpected zookeeper exception
*/
public boolean checkEnabledAndSetDisablingTable(final TableName tableName)
throws KeeperException {
synchronized (this.cache) {
if (this.cache.get(tableName) != null && !isEnabledTable(tableName)) {
return false;
}
setTableState(tableName, ZooKeeperProtos.Table.State.DISABLING);
return true;
}
}
private void setTableState(final TableName tableName, final ZooKeeperProtos.Table.State state)
throws KeeperException {
String znode = ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString());
if (ZKUtil.checkExists(this.watcher, znode) == -1) {
ZKUtil.createAndFailSilent(this.watcher, znode);
}
synchronized (this.cache) {
ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder();
builder.setState(state);
byte [] data = ProtobufUtil.prependPBMagic(builder.build().toByteArray());
ZKUtil.setData(this.watcher, znode, data);
this.cache.put(tableName, state);
}
}
public boolean isDisabledTable(final TableName tableName) {
return isTableState(tableName, ZooKeeperProtos.Table.State.DISABLED);
}
public boolean isDisablingTable(final TableName tableName) {
return isTableState(tableName, ZooKeeperProtos.Table.State.DISABLING);
}
public boolean isEnablingTable(final TableName tableName) {
return isTableState(tableName, ZooKeeperProtos.Table.State.ENABLING);
}
public boolean isEnabledTable(TableName tableName) {
return isTableState(tableName, ZooKeeperProtos.Table.State.ENABLED);
}
public boolean isDisablingOrDisabledTable(final TableName tableName) {
synchronized (this.cache) {
return isDisablingTable(tableName) || isDisabledTable(tableName);
}
}
public boolean isEnablingOrEnabledTable(final TableName tableName) {
synchronized (this.cache) {
return isEnablingTable(tableName) || isEnabledTable(tableName);
}
}
public boolean isEnabledOrDisablingTable(final TableName tableName) {
synchronized (this.cache) {
return isEnabledTable(tableName) || isDisablingTable(tableName);
}
}
public boolean isDisabledOrEnablingTable(final TableName tableName) {
synchronized (this.cache) {
return isDisabledTable(tableName) || isEnablingTable(tableName);
}
}
private boolean isTableState(final TableName tableName, final ZooKeeperProtos.Table.State state) {
synchronized (this.cache) {
ZooKeeperProtos.Table.State currentState = this.cache.get(tableName);
return ZKTableReadOnly.isTableState(currentState, state);
}
}
/**
* Deletes the table in zookeeper. Fails silently if the
* table is not currently disabled in zookeeper. Sets no watches.
* @param tableName
* @throws KeeperException unexpected zookeeper exception
*/
public void setDeletedTable(final TableName tableName)
throws KeeperException {
synchronized (this.cache) {
if (this.cache.remove(tableName) == null) {
LOG.warn("Moving table " + tableName + " state to deleted but was " +
"already deleted");
}
ZKUtil.deleteNodeFailSilent(this.watcher,
ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString()));
}
}
/**
* Sets the ENABLED state in the cache and creates or force updates a node to
* ENABLED state for the specified table
*
* @param tableName
* @throws KeeperException
*/
public void setEnabledTable(final TableName tableName) throws KeeperException {
setTableState(tableName, ZooKeeperProtos.Table.State.ENABLED);
}
/**
* check if table is present .
*
* @param tableName
* @return true if the table is present
*/
public boolean isTablePresent(final TableName tableName) {
synchronized (this.cache) {
ZooKeeperProtos.Table.State state = this.cache.get(tableName);
return !(state == null);
}
}
/**
* Gets a list of all the tables set as disabled in zookeeper.
* @return Set of disabled tables, empty Set if none
*/
public Set<TableName> getDisabledTables() {
Set<TableName> disabledTables = new HashSet<TableName>();
synchronized (this.cache) {
Set<TableName> tables = this.cache.keySet();
for (TableName table: tables) {
if (isDisabledTable(table)) disabledTables.add(table);
}
}
return disabledTables;
}
/**
* Gets a list of all the tables set as disabled in zookeeper.
* @return Set of disabled tables, empty Set if none
* @throws KeeperException
*/
public static Set<TableName> getDisabledTables(ZooKeeperWatcher zkw)
throws KeeperException, InterruptedIOException {
return getAllTables(zkw, ZooKeeperProtos.Table.State.DISABLED);
}
/**
* Gets a list of all the tables set as disabling in zookeeper.
* @return Set of disabling tables, empty Set if none
* @throws KeeperException
*/
public static Set<TableName> getDisablingTables(ZooKeeperWatcher zkw)
throws KeeperException, InterruptedIOException {
return getAllTables(zkw, ZooKeeperProtos.Table.State.DISABLING);
}
/**
* Gets a list of all the tables set as enabling in zookeeper.
* @return Set of enabling tables, empty Set if none
* @throws KeeperException
*/
public static Set<TableName> getEnablingTables(ZooKeeperWatcher zkw)
throws KeeperException, InterruptedIOException {
return getAllTables(zkw, ZooKeeperProtos.Table.State.ENABLING);
}
/**
* Gets a list of all the tables set as disabled in zookeeper.
* @return Set of disabled tables, empty Set if none
* @throws KeeperException
*/
public static Set<TableName> getDisabledOrDisablingTables(ZooKeeperWatcher zkw)
throws KeeperException, InterruptedIOException {
return getAllTables(zkw, ZooKeeperProtos.Table.State.DISABLED,
ZooKeeperProtos.Table.State.DISABLING);
}
/**
* If the table is found in ENABLING state the inmemory state is removed. This
* helps in cases where CreateTable is to be retried by the client incase of
* failures. If deleteZNode is true - the znode is also deleted
*
* @param tableName
* @param deleteZNode
* @throws KeeperException
*/
public void removeEnablingTable(final TableName tableName, boolean deleteZNode)
throws KeeperException {
synchronized (this.cache) {
if (isEnablingTable(tableName)) {
this.cache.remove(tableName);
if (deleteZNode) {
ZKUtil.deleteNodeFailSilent(this.watcher,
ZKUtil.joinZNode(this.watcher.tableZNode, tableName.getNameAsString()));
}
}
}
}
/**
* Gets a list of all the tables of specified states in zookeeper.
* @return Set of tables of specified states, empty Set if none
* @throws KeeperException
*/
static Set<TableName> getAllTables(final ZooKeeperWatcher zkw,
final ZooKeeperProtos.Table.State... states) throws KeeperException, InterruptedIOException {
Set<TableName> allTables = new HashSet<TableName>();
List<String> children =
ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode);
if(children == null) return allTables;
for (String child: children) {
TableName tableName = TableName.valueOf(child);
ZooKeeperProtos.Table.State state = null;
try {
state = ZKTableReadOnly.getTableState(zkw, tableName);
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
for (ZooKeeperProtos.Table.State expectedState: states) {
if (state == expectedState) {
allTables.add(tableName);
break;
}
}
}
return allTables;
}
}

View File

@ -1,167 +0,0 @@
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.zookeeper.KeeperException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Non-instantiable class that provides helper functions for
* clients other than AssignmentManager for reading the
* state of a table in ZK.
*
* <p>Does not cache state like {@link ZKTable}, actually reads from ZK each call.
*/
@InterfaceAudience.Private
public class ZKTableReadOnly {
private ZKTableReadOnly() {}
/**
* Go to zookeeper and see if state of table is {@code ZooKeeperProtos.Table.State#DISABLED}.
* This method does not use cache.
* This method is for clients other than AssignmentManager
* @param zkw
* @param tableName
* @return True if table is enabled.
* @throws KeeperException
*/
public static boolean isDisabledTable(final ZooKeeperWatcher zkw,
final TableName tableName)
throws KeeperException, InterruptedException {
ZooKeeperProtos.Table.State state = getTableState(zkw, tableName);
return isTableState(ZooKeeperProtos.Table.State.DISABLED, state);
}
/**
* Go to zookeeper and see if state of table is {@code ZooKeeperProtos.Table.State#ENABLED}.
* This method does not use cache.
* This method is for clients other than AssignmentManager
* @param zkw
* @param tableName
* @return True if table is enabled.
* @throws KeeperException
*/
public static boolean isEnabledTable(final ZooKeeperWatcher zkw,
final TableName tableName)
throws KeeperException, InterruptedException {
return getTableState(zkw, tableName) == ZooKeeperProtos.Table.State.ENABLED;
}
/**
* Go to zookeeper and see if state of table is {@code ZooKeeperProtos.Table.State#DISABLING}
* of {@code ZooKeeperProtos.Table.State#DISABLED}.
* This method does not use cache.
* This method is for clients other than AssignmentManager.
* @param zkw
* @param tableName
* @return True if table is enabled.
* @throws KeeperException
*/
public static boolean isDisablingOrDisabledTable(final ZooKeeperWatcher zkw,
final TableName tableName)
throws KeeperException, InterruptedException {
ZooKeeperProtos.Table.State state = getTableState(zkw, tableName);
return isTableState(ZooKeeperProtos.Table.State.DISABLING, state) ||
isTableState(ZooKeeperProtos.Table.State.DISABLED, state);
}
/**
* Gets a list of all the tables set as disabled in zookeeper.
* @return Set of disabled tables, empty Set if none
* @throws KeeperException
*/
public static Set<TableName> getDisabledTables(ZooKeeperWatcher zkw)
throws KeeperException, InterruptedException {
Set<TableName> disabledTables = new HashSet<TableName>();
List<String> children =
ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode);
for (String child: children) {
TableName tableName =
TableName.valueOf(child);
ZooKeeperProtos.Table.State state = getTableState(zkw, tableName);
if (state == ZooKeeperProtos.Table.State.DISABLED) disabledTables.add(tableName);
}
return disabledTables;
}
/**
* Gets a list of all the tables set as disabled in zookeeper.
* @return Set of disabled tables, empty Set if none
* @throws KeeperException
*/
public static Set<TableName> getDisabledOrDisablingTables(ZooKeeperWatcher zkw)
throws KeeperException, InterruptedException {
Set<TableName> disabledTables = new HashSet<TableName>();
List<String> children =
ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode);
for (String child: children) {
TableName tableName =
TableName.valueOf(child);
ZooKeeperProtos.Table.State state = getTableState(zkw, tableName);
if (state == ZooKeeperProtos.Table.State.DISABLED ||
state == ZooKeeperProtos.Table.State.DISABLING)
disabledTables.add(tableName);
}
return disabledTables;
}
static boolean isTableState(final ZooKeeperProtos.Table.State expectedState,
final ZooKeeperProtos.Table.State currentState) {
return currentState != null && currentState.equals(expectedState);
}
/**
* @param zkw
* @param tableName
* @return Null or {@link ZooKeeperProtos.Table.State} found in znode.
* @throws KeeperException
*/
static ZooKeeperProtos.Table.State getTableState(final ZooKeeperWatcher zkw,
final TableName tableName)
throws KeeperException, InterruptedException {
String znode = ZKUtil.joinZNode(zkw.tableZNode, tableName.getNameAsString());
byte [] data = ZKUtil.getData(zkw, znode);
if (data == null || data.length <= 0) return null;
try {
ProtobufUtil.expectPBMagicPrefix(data);
ZooKeeperProtos.Table.Builder builder = ZooKeeperProtos.Table.newBuilder();
int magicLen = ProtobufUtil.lengthOfPBMagic();
ZooKeeperProtos.Table t = builder.mergeFrom(data, magicLen, data.length - magicLen).build();
return t.getState();
} catch (InvalidProtocolBufferException e) {
KeeperException ke = new KeeperException.DataInconsistencyException();
ke.initCause(e);
throw ke;
} catch (DeserializationException e) {
throw ZKUtil.convert(e);
}
}
}

View File

@ -989,7 +989,8 @@ public final class HConstants {
public static final String REPLICATION_CODEC_CONF_KEY = "hbase.replication.rpc.codec";
/** Config for pluggable consensus provider */
public static final String HBASE_CONSENSUS_PROVIDER_CLASS = "hbase.consensus.provider.class";
public static final String HBASE_COORDINATED_STATE_MANAGER_CLASS =
"hbase.coordinated.state.manager.class";
private HConstants() {
// Can't be instantiated with this ctor.

View File

@ -1180,8 +1180,8 @@ possible configurations would overwhelm and obscure the important.
and add the fully qualified class name here.</description>
</property>
<property>
<name>hbase.consensus.provider.class</name>
<value>org.apache.hadoop.hbase.consensus.ZkConsensusProvider</value>
<description>Fully qualified name of class implementing consensus.</description>
<name>hbase.coordinated.state.manager.class</name>
<value>org.apache.hadoop.hbase.consensus.ZkCoordinatedStateManager</value>
<description>Fully qualified name of class implementing coordinated state manager.</description>
</property>
</configuration>

View File

@ -1,43 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.consensus.ZkConsensusProvider;
import org.apache.hadoop.util.ReflectionUtils;
/**
* Creates instance of {@link ConsensusProvider}
* based on configuration.
*/
@InterfaceAudience.Private
public class ConsensusProviderFactory {
/**
* Creates consensus provider from the given configuration.
* @param conf Configuration
* @return A {@link ConsensusProvider}
*/
public static ConsensusProvider getConsensusProvider(Configuration conf) {
Class<? extends ConsensusProvider> consensusKlass =
conf.getClass(HConstants.HBASE_CONSENSUS_PROVIDER_CLASS, ZkConsensusProvider.class,
ConsensusProvider.class);
return ReflectionUtils.newInstance(consensusKlass, conf);
}
}

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
@ -174,10 +173,10 @@ public class LocalHBaseCluster {
// its HConnection instance rather than share (see HBASE_INSTANCES down in
// the guts of HConnectionManager.
// Also, create separate ConsensusProvider instance per Server.
// This is special case when we have to have more than 1 ConsensusProvider
// Also, create separate CoordinatedStateManager instance per Server.
// This is special case when we have to have more than 1 CoordinatedStateManager
// within 1 process.
ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(conf);
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
JVMClusterUtil.RegionServerThread rst =
JVMClusterUtil.createRegionServerThread(config, cp,
@ -207,10 +206,10 @@ public class LocalHBaseCluster {
// its HConnection instance rather than share (see HBASE_INSTANCES down in
// the guts of HConnectionManager.
// Also, create separate ConsensusProvider instance per Server.
// This is special case when we have to have more than 1 ConsensusProvider
// Also, create separate CoordinatedStateManager instance per Server.
// This is special case when we have to have more than 1 CoordinatedStateManager
// within 1 process.
ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(conf);
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
JVMClusterUtil.MasterThread mt = JVMClusterUtil.createMasterThread(c, cp,
(Class<? extends HMaster>) conf.getClass(HConstants.MASTER_IMPL, this.masterClass), index);

View File

@ -1,49 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.consensus;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.Server;
/**
* Base class for {@link org.apache.hadoop.hbase.ConsensusProvider} implementations.
* Defines methods to retrieve consensus objects for relevant areas. ConsensusProvider
* reference returned from Server interface has to be casted to this type to
* access those methods.
*/
@InterfaceAudience.Private
public abstract class BaseConsensusProvider implements ConsensusProvider {
@Override
public void initialize(Server server) {
}
@Override
public void start() {
}
@Override
public void stop() {
}
@Override
public Server getServer() {
return null;
}
}

View File

@ -1,42 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.consensus;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
* ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.ConsensusProvider}.
*/
@InterfaceAudience.Private
public class ZkConsensusProvider extends BaseConsensusProvider {
private Server server;
private ZooKeeperWatcher watcher;
@Override
public void initialize(Server server) {
this.server = server;
this.watcher = server.getZooKeeper();
}
@Override
public Server getServer() {
return server;
}
}

View File

@ -43,6 +43,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -52,6 +53,7 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.TableStateManager;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Result;
@ -69,6 +71,7 @@ import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
@ -82,7 +85,6 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Triple;
import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.ipc.RemoteException;
@ -161,7 +163,7 @@ public class AssignmentManager extends ZooKeeperListener {
final NavigableMap<String, RegionPlan> regionPlans =
new TreeMap<String, RegionPlan>();
private final ZKTable zkTable;
private final TableStateManager tableStateManager;
private final ExecutorService executorService;
@ -233,7 +235,8 @@ public class AssignmentManager extends ZooKeeperListener {
public AssignmentManager(Server server, ServerManager serverManager,
CatalogTracker catalogTracker, final LoadBalancer balancer,
final ExecutorService service, MetricsMaster metricsMaster,
final TableLockManager tableLockManager) throws KeeperException, IOException {
final TableLockManager tableLockManager) throws KeeperException,
IOException, CoordinatedStateException {
super(server.getZooKeeper());
this.server = server;
this.serverManager = serverManager;
@ -247,7 +250,11 @@ public class AssignmentManager extends ZooKeeperListener {
HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
FavoredNodeLoadBalancer.class);
try {
this.zkTable = new ZKTable(this.watcher);
if (server.getCoordinatedStateManager() != null) {
this.tableStateManager = server.getCoordinatedStateManager().getTableStateManager();
} else {
this.tableStateManager = null;
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
@ -277,12 +284,12 @@ public class AssignmentManager extends ZooKeeperListener {
}
/**
* @return Instance of ZKTable.
* @return Instance of ZKTableStateManager.
*/
public ZKTable getZKTable() {
public TableStateManager getTableStateManager() {
// These are 'expensive' to make involving trip to zk ensemble so allow
// sharing.
return this.zkTable;
return this.tableStateManager;
}
/**
@ -387,9 +394,10 @@ public class AssignmentManager extends ZooKeeperListener {
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
* @throws CoordinatedStateException
*/
void joinCluster() throws IOException,
KeeperException, InterruptedException {
KeeperException, InterruptedException, CoordinatedStateException {
// Concurrency note: In the below the accesses on regionsInTransition are
// outside of a synchronization block where usually all accesses to RIT are
// synchronized. The presumption is that in this case it is safe since this
@ -400,8 +408,9 @@ public class AssignmentManager extends ZooKeeperListener {
// Scan hbase:meta to build list of existing regions, servers, and assignment
// Returns servers who have not checked in (assumed dead) and their regions
Map<ServerName, List<HRegionInfo>> deadServers = rebuildUserRegions();
Map<ServerName, List<HRegionInfo>> deadServers;
deadServers = rebuildUserRegions();
// This method will assign all user regions if a clean server startup or
// it will reconstruct master state and cleanup any leftovers from
// previous master process.
@ -424,7 +433,7 @@ public class AssignmentManager extends ZooKeeperListener {
*/
void processDeadServersAndRegionsInTransition(
final Map<ServerName, List<HRegionInfo>> deadServers)
throws KeeperException, IOException, InterruptedException {
throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
watcher.assignmentZNode);
@ -1134,7 +1143,10 @@ public class AssignmentManager extends ZooKeeperListener {
HRegionInfo regionInfo = rs.getRegion();
String regionNameStr = regionInfo.getRegionNameAsString();
LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
boolean disabled = getZKTable().isDisablingOrDisabledTable(regionInfo.getTable());
boolean disabled = getTableStateManager().isTableState(regionInfo.getTable(),
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
ServerName serverName = rs.getServerName();
if (serverManager.isServerOnline(serverName)) {
if (rs.isOnServer(serverName)
@ -1819,7 +1831,8 @@ public class AssignmentManager extends ZooKeeperListener {
// assignRegion then we need to make the table ENABLED. Hence in such case the table
// will not be in ENABLING or ENABLED state.
TableName tableName = region.getTable();
if (!zkTable.isEnablingTable(tableName) && !zkTable.isEnabledTable(tableName)) {
if (!tableStateManager.isTableState(tableName,
ZooKeeperProtos.Table.State.ENABLED, ZooKeeperProtos.Table.State.ENABLING)) {
LOG.debug("Setting table " + tableName + " to ENABLED state.");
setEnabledTable(tableName);
}
@ -2001,11 +2014,11 @@ public class AssignmentManager extends ZooKeeperListener {
}
private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
TableName tableName = region.getTable();
boolean disabled = this.zkTable.isDisabledTable(tableName);
if (disabled || this.zkTable.isDisablingTable(tableName)) {
LOG.info("Table " + tableName + (disabled ? " disabled;" : " disabling;") +
" skipping assign of " + region.getRegionNameAsString());
if (this.tableStateManager.isTableState(region.getTable(),
ZooKeeperProtos.Table.State.DISABLED,
ZooKeeperProtos.Table.State.DISABLING)) {
LOG.info("Table " + region.getTable() + " is disabled or disabling;"
+ " skipping assign of " + region.getRegionNameAsString());
offlineDisabledRegion(region);
return true;
}
@ -2455,7 +2468,7 @@ public class AssignmentManager extends ZooKeeperListener {
* @throws KeeperException
*/
private void assignAllUserRegions()
throws IOException, InterruptedException, KeeperException {
throws IOException, InterruptedException, KeeperException, CoordinatedStateException {
// Cleanup any existing ZK nodes and start watching
ZKAssign.deleteAllNodes(watcher);
ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
@ -2465,8 +2478,11 @@ public class AssignmentManager extends ZooKeeperListener {
// Skip assignment for regions of tables in DISABLING state because during clean cluster startup
// no RS is alive and regions map also doesn't have any information about the regions.
// See HBASE-6281.
Set<TableName> disabledOrDisablingOrEnabling = ZKTable.getDisabledOrDisablingTables(watcher);
disabledOrDisablingOrEnabling.addAll(ZKTable.getEnablingTables(watcher));
Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
ZooKeeperProtos.Table.State.DISABLED,
ZooKeeperProtos.Table.State.DISABLING,
ZooKeeperProtos.Table.State.ENABLING);
// Scan hbase:meta for all user regions, skipping any disabled tables
Map<HRegionInfo, ServerName> allRegions;
SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment =
@ -2488,7 +2504,8 @@ public class AssignmentManager extends ZooKeeperListener {
for (HRegionInfo hri : allRegions.keySet()) {
TableName tableName = hri.getTable();
if (!zkTable.isEnabledTable(tableName)) {
if (!tableStateManager.isTableState(tableName,
ZooKeeperProtos.Table.State.ENABLED)) {
setEnabledTable(tableName);
}
}
@ -2527,12 +2544,17 @@ public class AssignmentManager extends ZooKeeperListener {
* in META
* @throws IOException
*/
Map<ServerName, List<HRegionInfo>> rebuildUserRegions() throws IOException, KeeperException {
Set<TableName> enablingTables = ZKTable.getEnablingTables(watcher);
Set<TableName> disabledOrEnablingTables = ZKTable.getDisabledTables(watcher);
disabledOrEnablingTables.addAll(enablingTables);
Set<TableName> disabledOrDisablingOrEnabling = ZKTable.getDisablingTables(watcher);
disabledOrDisablingOrEnabling.addAll(disabledOrEnablingTables);
Map<ServerName, List<HRegionInfo>> rebuildUserRegions() throws
IOException, KeeperException, CoordinatedStateException {
Set<TableName> enablingTables = tableStateManager.getTablesInStates(
ZooKeeperProtos.Table.State.ENABLING);
Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.ENABLING);
Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
ZooKeeperProtos.Table.State.DISABLED,
ZooKeeperProtos.Table.State.DISABLING,
ZooKeeperProtos.Table.State.ENABLING);
// Region assignment from META
List<Result> results = MetaReader.fullScan(this.catalogTracker);
@ -2584,7 +2606,8 @@ public class AssignmentManager extends ZooKeeperListener {
// need to enable the table if not disabled or disabling or enabling
// this will be used in rolling restarts
if (!disabledOrDisablingOrEnabling.contains(tableName)
&& !getZKTable().isEnabledTable(tableName)) {
&& !getTableStateManager().isTableState(tableName,
ZooKeeperProtos.Table.State.ENABLED)) {
setEnabledTable(tableName);
}
} else {
@ -2598,7 +2621,8 @@ public class AssignmentManager extends ZooKeeperListener {
// need to enable the table if not disabled or disabling or enabling
// this will be used in rolling restarts
if (!disabledOrDisablingOrEnabling.contains(tableName)
&& !getZKTable().isEnabledTable(tableName)) {
&& !getTableStateManager().isTableState(tableName,
ZooKeeperProtos.Table.State.ENABLED)) {
setEnabledTable(tableName);
}
}
@ -2615,8 +2639,9 @@ public class AssignmentManager extends ZooKeeperListener {
* @throws IOException
*/
private void recoverTableInDisablingState()
throws KeeperException, TableNotFoundException, IOException {
Set<TableName> disablingTables = ZKTable.getDisablingTables(watcher);
throws KeeperException, IOException, CoordinatedStateException {
Set<TableName> disablingTables =
tableStateManager.getTablesInStates(ZooKeeperProtos.Table.State.DISABLING);
if (disablingTables.size() != 0) {
for (TableName tableName : disablingTables) {
// Recover by calling DisableTableHandler
@ -2638,8 +2663,9 @@ public class AssignmentManager extends ZooKeeperListener {
* @throws IOException
*/
private void recoverTableInEnablingState()
throws KeeperException, TableNotFoundException, IOException {
Set<TableName> enablingTables = ZKTable.getEnablingTables(watcher);
throws KeeperException, IOException, CoordinatedStateException {
Set<TableName> enablingTables = tableStateManager.
getTablesInStates(ZooKeeperProtos.Table.State.ENABLING);
if (enablingTables.size() != 0) {
for (TableName tableName : enablingTables) {
// Recover by calling EnableTableHandler
@ -2876,7 +2902,8 @@ public class AssignmentManager extends ZooKeeperListener {
} catch (KeeperException ke) {
server.abort("Unexpected ZK exception deleting node " + hri, ke);
}
if (zkTable.isDisablingOrDisabledTable(hri.getTable())) {
if (tableStateManager.isTableState(hri.getTable(),
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
regionStates.regionOffline(hri);
it.remove();
continue;
@ -2897,7 +2924,8 @@ public class AssignmentManager extends ZooKeeperListener {
public void balance(final RegionPlan plan) {
HRegionInfo hri = plan.getRegionInfo();
TableName tableName = hri.getTable();
if (zkTable.isDisablingOrDisabledTable(tableName)) {
if (tableStateManager.isTableState(tableName,
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
LOG.info("Ignored moving region of disabling/disabled table "
+ tableName);
return;
@ -2940,8 +2968,9 @@ public class AssignmentManager extends ZooKeeperListener {
protected void setEnabledTable(TableName tableName) {
try {
this.zkTable.setEnabledTable(tableName);
} catch (KeeperException e) {
this.tableStateManager.setTableState(tableName,
ZooKeeperProtos.Table.State.ENABLED);
} catch (CoordinatedStateException e) {
// here we can abort as it is the start up flow
String errorMsg = "Unable to ensure that the table " + tableName
+ " will be" + " enabled because of a ZooKeeper issue";
@ -3113,7 +3142,8 @@ public class AssignmentManager extends ZooKeeperListener {
+ hri_b.getRegionNameAsString() + ", on " + sn);
// User could disable the table before master knows the new region.
if (zkTable.isDisablingOrDisabledTable(p.getTable())) {
if (tableStateManager.isTableState(p.getTable(),
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
unassign(p);
}
}
@ -3239,7 +3269,8 @@ public class AssignmentManager extends ZooKeeperListener {
+ hri_b.getRegionNameAsString() + ", on " + sn);
// User could disable the table before master knows the new region.
if (zkTable.isDisablingOrDisabledTable(p.getTable())) {
if (tableStateManager.isTableState(p.getTable(),
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
unassign(hri_a);
unassign(hri_b);
}

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -68,7 +69,7 @@ import org.apache.hadoop.hbase.client.MetaScanner;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.executor.ExecutorType;
@ -99,6 +100,7 @@ import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy;
@ -257,9 +259,9 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
* @throws KeeperException
* @throws IOException
*/
public HMaster(final Configuration conf, ConsensusProvider consensusProvider)
public HMaster(final Configuration conf, CoordinatedStateManager csm)
throws IOException, KeeperException, InterruptedException {
super(conf, consensusProvider);
super(conf, csm);
this.rsFatals = new MemoryBoundedLogMessageBuffer(
conf.getLong("hbase.master.buffer.for.rs.fatals", 1*1024*1024));
@ -397,9 +399,11 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
* Initialize all ZK based system trackers.
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
* @throws CoordinatedStateException
*/
void initializeZKBasedSystemTrackers() throws IOException,
InterruptedException, KeeperException {
InterruptedException, KeeperException, CoordinatedStateException {
this.balancer = LoadBalancerFactory.getLoadBalancer(conf);
this.loadBalancerTracker = new LoadBalancerTracker(zooKeeper, this);
this.loadBalancerTracker.start();
@ -453,9 +457,10 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
* @throws CoordinatedStateException
*/
private void finishActiveMasterInitialization(MonitoredTask status)
throws IOException, InterruptedException, KeeperException {
throws IOException, InterruptedException, KeeperException, CoordinatedStateException {
isActiveMaster = true;
@ -765,7 +770,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
}
private void enableMeta(TableName metaTableName) {
if (!this.assignmentManager.getZKTable().isEnabledTable(metaTableName)) {
if (!this.assignmentManager.getTableStateManager().isTableState(metaTableName,
ZooKeeperProtos.Table.State.ENABLED)) {
this.assignmentManager.setEnabledTable(metaTableName);
}
}
@ -1477,8 +1483,8 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
if (!MetaReader.tableExists(getCatalogTracker(), tableName)) {
throw new TableNotFoundException(tableName);
}
if (!getAssignmentManager().getZKTable().
isDisabledTable(tableName)) {
if (!getAssignmentManager().getTableStateManager().
isTableState(tableName, ZooKeeperProtos.Table.State.DISABLED)) {
throw new TableNotDisabledException(tableName);
}
}
@ -1770,10 +1776,10 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
* @return HMaster instance.
*/
public static HMaster constructMaster(Class<? extends HMaster> masterClass,
final Configuration conf, final ConsensusProvider cp) {
final Configuration conf, final CoordinatedStateManager cp) {
try {
Constructor<? extends HMaster> c =
masterClass.getConstructor(Configuration.class, ConsensusProvider.class);
masterClass.getConstructor(Configuration.class, CoordinatedStateManager.class);
return c.newInstance(conf, cp);
} catch (InvocationTargetException ite) {
Throwable target = ite.getTargetException() != null?

View File

@ -30,14 +30,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZNodeClearer;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.ConsensusProviderFactory;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.ServerCommandLine;
@ -187,8 +187,9 @@ public class HMasterCommandLine extends ServerCommandLine {
waitOnMasterThreads(cluster);
} else {
logProcessInfo(getConf());
ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(conf);
HMaster master = HMaster.constructMaster(masterClass, conf, cp);
CoordinatedStateManager csm =
CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
HMaster master = HMaster.constructMaster(masterClass, conf, csm);
if (master.isStopped()) {
LOG.info("Won't bring the Master up as a shutdown is requested");
return 1;
@ -258,9 +259,9 @@ public class HMasterCommandLine extends ServerCommandLine {
public static class LocalHMaster extends HMaster {
private MiniZooKeeperCluster zkcluster = null;
public LocalHMaster(Configuration conf, ConsensusProvider consensusProvider)
public LocalHMaster(Configuration conf, CoordinatedStateManager csm)
throws IOException, KeeperException, InterruptedException {
super(conf, consensusProvider);
super(conf, csm);
}
@Override

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
/**
* Handles CLOSED region event on Master.
@ -91,8 +92,8 @@ public class ClosedRegionHandler extends EventHandler implements TotesHRegionInf
public void process() {
LOG.debug("Handling CLOSED event for " + regionInfo.getEncodedName());
// Check if this table is being disabled or not
if (this.assignmentManager.getZKTable().
isDisablingOrDisabledTable(this.regionInfo.getTable())) {
if (this.assignmentManager.getTableStateManager().isTableState(this.regionInfo.getTable(),
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
assignmentManager.offlineDisabledRegion(regionInfo);
return;
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@ -46,10 +47,10 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
import org.apache.zookeeper.KeeperException;
/**
* Handler to create a table.
@ -130,10 +131,13 @@ public class CreateTableHandler extends EventHandler {
// We could have cleared the hbase.rootdir and not zk. How can we detect this case?
// Having to clean zk AND hdfs is awkward.
try {
if (!assignmentManager.getZKTable().checkAndSetEnablingTable(tableName)) {
if (!assignmentManager.getTableStateManager().setTableStateIfNotInStates(tableName,
ZooKeeperProtos.Table.State.ENABLING,
ZooKeeperProtos.Table.State.ENABLING,
ZooKeeperProtos.Table.State.ENABLED)) {
throw new TableExistsException(tableName);
}
} catch (KeeperException e) {
} catch (CoordinatedStateException e) {
throw new IOException("Unable to ensure that the table will be" +
" enabling because of a ZooKeeper issue", e);
}
@ -146,8 +150,9 @@ public class CreateTableHandler extends EventHandler {
// again with the same Active master
// It will block the creation saying TableAlreadyExists.
try {
assignmentManager.getZKTable().removeEnablingTable(tableName, false);
} catch (KeeperException e) {
assignmentManager.getTableStateManager().checkAndRemoveTableState(tableName,
ZooKeeperProtos.Table.State.ENABLING, false);
} catch (CoordinatedStateException e) {
// Keeper exception should not happen here
LOG.error("Got a keeper exception while removing the ENABLING table znode "
+ tableName, e);
@ -211,7 +216,7 @@ public class CreateTableHandler extends EventHandler {
* - Update ZooKeeper with the enabled state
*/
private void handleCreateTable(TableName tableName)
throws IOException, KeeperException {
throws IOException, CoordinatedStateException {
Path tempdir = fileSystemManager.getTempDir();
FileSystem fs = fileSystemManager.getFileSystem();
@ -239,8 +244,9 @@ public class CreateTableHandler extends EventHandler {
// 6. Set table enabled flag up in zk.
try {
assignmentManager.getZKTable().setEnabledTable(tableName);
} catch (KeeperException e) {
assignmentManager.getTableStateManager().setTableState(tableName,
ZooKeeperProtos.Table.State.ENABLED);
} catch (CoordinatedStateException e) {
throw new IOException("Unable to ensure that " + tableName + " will be" +
" enabled because of a ZooKeeper issue", e);
}

View File

@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.zookeeper.KeeperException;
@InterfaceAudience.Private
public class DeleteTableHandler extends TableEventHandler {
@ -62,7 +62,7 @@ public class DeleteTableHandler extends TableEventHandler {
}
protected void waitRegionInTransition(final List<HRegionInfo> regions)
throws IOException, KeeperException {
throws IOException, CoordinatedStateException {
AssignmentManager am = this.masterServices.getAssignmentManager();
RegionStates states = am.getRegionStates();
long waitTime = server.getConfiguration().
@ -93,7 +93,7 @@ public class DeleteTableHandler extends TableEventHandler {
@Override
protected void handleTableOperation(List<HRegionInfo> regions)
throws IOException, KeeperException {
throws IOException, CoordinatedStateException {
MasterCoprocessorHost cpHost = ((HMaster) this.server).getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.preDeleteTableHandler(this.tableName);
@ -118,7 +118,7 @@ public class DeleteTableHandler extends TableEventHandler {
// 5. If entry for this table in zk, and up in AssignmentManager, remove it.
LOG.debug("Marking '" + tableName + "' as deleted.");
am.getZKTable().setDeletedTable(tableName);
am.getTableStateManager().setDeletedTable(tableName);
}
if (cpHost != null) {
@ -130,7 +130,7 @@ public class DeleteTableHandler extends TableEventHandler {
* Removes the table from .META. and archives the HDFS files.
*/
protected void removeTableData(final List<HRegionInfo> regions)
throws IOException, KeeperException {
throws IOException, CoordinatedStateException {
// 1. Remove regions from META
LOG.debug("Deleting regions from META");
MetaEditor.deleteRegions(this.server.getCatalogTracker(), regions);

View File

@ -25,6 +25,7 @@ import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
@ -43,7 +44,7 @@ import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.htrace.Trace;
/**
@ -94,14 +95,15 @@ public class DisableTableHandler extends EventHandler {
//TODO: reevaluate this since we have table locks now
if (!skipTableStateCheck) {
try {
if (!this.assignmentManager.getZKTable().checkEnabledAndSetDisablingTable
(this.tableName)) {
if (!this.assignmentManager.getTableStateManager().setTableStateIfInStates(
this.tableName, ZooKeeperProtos.Table.State.DISABLING,
ZooKeeperProtos.Table.State.ENABLED)) {
LOG.info("Table " + tableName + " isn't enabled; skipping disable");
throw new TableNotEnabledException(this.tableName);
}
} catch (KeeperException e) {
} catch (CoordinatedStateException e) {
throw new IOException("Unable to ensure that the table will be" +
" disabling because of a ZooKeeper issue", e);
" disabling because of a coordination engine issue", e);
}
}
success = true;
@ -139,7 +141,7 @@ public class DisableTableHandler extends EventHandler {
}
} catch (IOException e) {
LOG.error("Error trying to disable table " + this.tableName, e);
} catch (KeeperException e) {
} catch (CoordinatedStateException e) {
LOG.error("Error trying to disable table " + this.tableName, e);
} finally {
releaseTableLock();
@ -156,9 +158,10 @@ public class DisableTableHandler extends EventHandler {
}
}
private void handleDisableTable() throws IOException, KeeperException {
private void handleDisableTable() throws IOException, CoordinatedStateException {
// Set table disabling flag up in zk.
this.assignmentManager.getZKTable().setDisablingTable(this.tableName);
this.assignmentManager.getTableStateManager().setTableState(this.tableName,
ZooKeeperProtos.Table.State.DISABLING);
boolean done = false;
while (true) {
// Get list of online regions that are of this table. Regions that are
@ -186,7 +189,8 @@ public class DisableTableHandler extends EventHandler {
}
}
// Flip the table to disabled if success.
if (done) this.assignmentManager.getZKTable().setDisabledTable(this.tableName);
if (done) this.assignmentManager.getTableStateManager().setTableState(this.tableName,
ZooKeeperProtos.Table.State.DISABLED);
LOG.info("Disabled table is done=" + done);
}

View File

@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
@ -45,8 +46,8 @@ import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.zookeeper.KeeperException;
/**
* Handler to run enable of a table.
@ -88,9 +89,10 @@ public class EnableTableHandler extends EventHandler {
throw new TableNotFoundException(tableName);
}
try {
this.assignmentManager.getZKTable().removeEnablingTable(tableName, true);
this.assignmentManager.getTableStateManager().checkAndRemoveTableState(tableName,
ZooKeeperProtos.Table.State.ENABLING, true);
throw new TableNotFoundException(tableName);
} catch (KeeperException e) {
} catch (CoordinatedStateException e) {
// TODO : Use HBCK to clear such nodes
LOG.warn("Failed to delete the ENABLING node for the table " + tableName
+ ". The table will remain unusable. Run HBCK to manually fix the problem.");
@ -103,14 +105,15 @@ public class EnableTableHandler extends EventHandler {
// DISABLED or ENABLED.
if (!skipTableStateCheck) {
try {
if (!this.assignmentManager.getZKTable().checkDisabledAndSetEnablingTable
(this.tableName)) {
if (!this.assignmentManager.getTableStateManager().setTableStateIfInStates(
this.tableName, ZooKeeperProtos.Table.State.ENABLING,
ZooKeeperProtos.Table.State.DISABLED)) {
LOG.info("Table " + tableName + " isn't disabled; skipping enable");
throw new TableNotDisabledException(this.tableName);
}
} catch (KeeperException e) {
} catch (CoordinatedStateException e) {
throw new IOException("Unable to ensure that the table will be" +
" enabling because of a ZooKeeper issue", e);
" enabling because of a coordination engine issue", e);
}
}
success = true;
@ -147,7 +150,7 @@ public class EnableTableHandler extends EventHandler {
}
} catch (IOException e) {
LOG.error("Error trying to enable the table " + this.tableName, e);
} catch (KeeperException e) {
} catch (CoordinatedStateException e) {
LOG.error("Error trying to enable the table " + this.tableName, e);
} catch (InterruptedException e) {
LOG.error("Error trying to enable the table " + this.tableName, e);
@ -166,12 +169,14 @@ public class EnableTableHandler extends EventHandler {
}
}
private void handleEnableTable() throws IOException, KeeperException, InterruptedException {
private void handleEnableTable() throws IOException, CoordinatedStateException,
InterruptedException {
// I could check table is disabling and if so, not enable but require
// that user first finish disabling but that might be obnoxious.
// Set table enabling flag up in zk.
this.assignmentManager.getZKTable().setEnablingTable(this.tableName);
this.assignmentManager.getTableStateManager().setTableState(this.tableName,
ZooKeeperProtos.Table.State.ENABLING);
boolean done = false;
ServerManager serverManager = ((HMaster)this.server).getServerManager();
// Get the regions of this table. We're done when all listed
@ -206,8 +211,8 @@ public class EnableTableHandler extends EventHandler {
}
if (done) {
// Flip the table to enabled.
this.assignmentManager.getZKTable().setEnabledTable(
this.tableName);
this.assignmentManager.getTableStateManager().setTableState(
this.tableName, ZooKeeperProtos.Table.State.ENABLED);
LOG.info("Table '" + this.tableName
+ "' was successfully enabled. Status: done=" + done);
} else {

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.zookeeper.KeeperException;
@ -113,7 +114,8 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf
" because regions is NOT in RIT -- presuming this is because it SPLIT");
}
if (!openedNodeDeleted) {
if (this.assignmentManager.getZKTable().isDisablingOrDisabledTable(regionInfo.getTable())) {
if (this.assignmentManager.getTableStateManager().isTableState(regionInfo.getTable(),
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
debugLog(regionInfo, "Opened region "
+ regionInfo.getShortNameToLog() + " but "
+ "this table is disabled, triggering close of region");

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.zookeeper.KeeperException;
@ -257,7 +258,8 @@ public class ServerShutdownHandler extends EventHandler {
toAssignRegions.add(hri);
} else if (rit != null) {
if (rit.isPendingCloseOrClosing()
&& am.getZKTable().isDisablingOrDisabledTable(hri.getTable())) {
&& am.getTableStateManager().isTableState(hri.getTable(),
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
// If the table was partially disabled and the RS went down, we should clear the RIT
// and remove the node for the region.
// The rit that we use may be stale in case the table was in DISABLING state
@ -334,14 +336,15 @@ public class ServerShutdownHandler extends EventHandler {
public static boolean processDeadRegion(HRegionInfo hri, Result result,
AssignmentManager assignmentManager, CatalogTracker catalogTracker)
throws IOException {
boolean tablePresent = assignmentManager.getZKTable().isTablePresent(hri.getTable());
boolean tablePresent = assignmentManager.getTableStateManager().isTablePresent(hri.getTable());
if (!tablePresent) {
LOG.info("The table " + hri.getTable()
+ " was deleted. Hence not proceeding.");
return false;
}
// If table is not disabled but the region is offlined,
boolean disabled = assignmentManager.getZKTable().isDisabledTable(hri.getTable());
boolean disabled = assignmentManager.getTableStateManager().isTableState(hri.getTable(),
ZooKeeperProtos.Table.State.DISABLED);
if (disabled){
LOG.info("The table " + hri.getTable()
+ " was disabled. Hence not proceeding.");
@ -353,7 +356,8 @@ public class ServerShutdownHandler extends EventHandler {
//to the dead server. We don't have to do anything.
return false;
}
boolean disabling = assignmentManager.getZKTable().isDisablingTable(hri.getTable());
boolean disabling = assignmentManager.getTableStateManager().isTableState(hri.getTable(),
ZooKeeperProtos.Table.State.DISABLING);
if (disabling) {
LOG.info("The table " + hri.getTable()
+ " is disabled. Hence not assigning region" + hri.getEncodedName());

View File

@ -29,6 +29,7 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
@ -44,8 +45,8 @@ import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.BulkReOpen;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -128,8 +129,8 @@ public abstract class TableEventHandler extends EventHandler {
tableName);
handleTableOperation(hris);
if (eventType.isOnlineSchemaChangeSupported() && this.masterServices.
getAssignmentManager().getZKTable().
isEnabledTable(tableName)) {
getAssignmentManager().getTableStateManager().isTableState(
tableName, ZooKeeperProtos.Table.State.ENABLED)) {
if (reOpenAllRegions(hris)) {
LOG.info("Completed table operation " + eventType + " on table " +
tableName);
@ -141,7 +142,7 @@ public abstract class TableEventHandler extends EventHandler {
} catch (IOException e) {
LOG.error("Error manipulating table " + tableName, e);
completed(e);
} catch (KeeperException e) {
} catch (CoordinatedStateException e) {
LOG.error("Error manipulating table " + tableName, e);
completed(e);
} finally {
@ -249,5 +250,5 @@ public abstract class TableEventHandler extends EventHandler {
}
protected abstract void handleTableOperation(List<HRegionInfo> regions)
throws IOException, KeeperException;
throws IOException, CoordinatedStateException;
}

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.master.handler;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
@ -27,25 +26,20 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionStates;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.zookeeper.KeeperException;
/**
* Truncate the table by removing META and the HDFS files and recreating it.
@ -67,7 +61,7 @@ public class TruncateTableHandler extends DeleteTableHandler {
@Override
protected void handleTableOperation(List<HRegionInfo> regions)
throws IOException, KeeperException {
throws IOException, CoordinatedStateException {
MasterCoprocessorHost cpHost = ((HMaster) this.server).getMasterCoprocessorHost();
if (cpHost != null) {
cpHost.preTruncateTableHandler(this.tableName);
@ -137,8 +131,9 @@ public class TruncateTableHandler extends DeleteTableHandler {
// 6. Set table enabled flag up in zk.
try {
assignmentManager.getZKTable().setEnabledTable(tableName);
} catch (KeeperException e) {
assignmentManager.getTableStateManager().setTableState(tableName,
ZooKeeperProtos.Table.State.ENABLED);
} catch (CoordinatedStateException e) {
throw new IOException("Unable to ensure that " + tableName + " will be" +
" enabled because of a ZooKeeper issue", e);
}

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription.Type;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
@ -556,13 +557,15 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
// if the table is enabled, then have the RS run actually the snapshot work
TableName snapshotTable = TableName.valueOf(snapshot.getTable());
AssignmentManager assignmentMgr = master.getAssignmentManager();
if (assignmentMgr.getZKTable().isEnabledTable(snapshotTable)) {
if (assignmentMgr.getTableStateManager().isTableState(snapshotTable,
ZooKeeperProtos.Table.State.ENABLED)) {
LOG.debug("Table enabled, starting distributed snapshot.");
snapshotEnabledTable(snapshot);
LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
}
// For disabled table, snapshot is created by the master
else if (assignmentMgr.getZKTable().isDisabledTable(snapshotTable)) {
else if (assignmentMgr.getTableStateManager().isTableState(snapshotTable,
ZooKeeperProtos.Table.State.DISABLED)) {
LOG.debug("Table is disabled, running snapshot entirely on master.");
snapshotDisabledTable(snapshot);
LOG.debug("Started snapshot: " + ClientSnapshotDescriptionUtils.toString(snapshot));
@ -692,8 +695,8 @@ public class SnapshotManager extends MasterProcedureManager implements Stoppable
// Execute the restore/clone operation
if (MetaReader.tableExists(master.getCatalogTracker(), tableName)) {
if (master.getAssignmentManager().getZKTable().isEnabledTable(
TableName.valueOf(fsSnapshot.getTable()))) {
if (master.getAssignmentManager().getTableStateManager().isTableState(
TableName.valueOf(fsSnapshot.getTable()), ZooKeeperProtos.Table.State.ENABLED)) {
throw new UnsupportedOperationException("Table '" +
TableName.valueOf(fsSnapshot.getTable()) + "' must be disabled in order to " +
"perform a restore operation" +

View File

@ -58,6 +58,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -75,8 +77,6 @@ import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.ConsensusProviderFactory;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
@ -392,7 +392,7 @@ public class HRegionServer extends HasThread implements
protected final RSRpcServices rpcServices;
protected ConsensusProvider consensusProvider;
protected CoordinatedStateManager csm;
/**
* Starts a HRegionServer at the default location.
@ -401,17 +401,17 @@ public class HRegionServer extends HasThread implements
* @throws InterruptedException
*/
public HRegionServer(Configuration conf) throws IOException, InterruptedException {
this(conf, ConsensusProviderFactory.getConsensusProvider(conf));
this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
}
/**
* Starts a HRegionServer at the default location
* @param conf
* @param consensusProvider implementation of ConsensusProvider to be used
* @param csm implementation of CoordinatedStateManager to be used
* @throws IOException
* @throws InterruptedException
*/
public HRegionServer(Configuration conf, ConsensusProvider consensusProvider)
public HRegionServer(Configuration conf, CoordinatedStateManager csm)
throws IOException, InterruptedException {
this.fsOk = true;
this.conf = conf;
@ -483,9 +483,9 @@ public class HRegionServer extends HasThread implements
zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
rpcServices.isa.getPort(), this, canCreateBaseZNode());
this.consensusProvider = consensusProvider;
this.consensusProvider.initialize(this);
this.consensusProvider.start();
this.csm = csm;
this.csm.initialize(this);
this.csm.start();
tableLockManager = TableLockManager.createTableLockManager(
conf, zooKeeper, serverName);
@ -2155,8 +2155,8 @@ public class HRegionServer extends HasThread implements
}
@Override
public ConsensusProvider getConsensusProvider() {
return consensusProvider;
public CoordinatedStateManager getCoordinatedStateManager() {
return csm;
}
@Override
@ -2253,10 +2253,10 @@ public class HRegionServer extends HasThread implements
*/
public static HRegionServer constructRegionServer(
Class<? extends HRegionServer> regionServerClass,
final Configuration conf2, ConsensusProvider cp) {
final Configuration conf2, CoordinatedStateManager cp) {
try {
Constructor<? extends HRegionServer> c = regionServerClass
.getConstructor(Configuration.class, ConsensusProvider.class);
.getConstructor(Configuration.class, CoordinatedStateManager.class);
return c.newInstance(conf2, cp);
} catch (Exception e) {
throw new RuntimeException("Failed construction of " + "Regionserver: "

View File

@ -23,10 +23,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.ConsensusProviderFactory;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.util.ServerCommandLine;
/**
@ -52,7 +52,7 @@ public class HRegionServerCommandLine extends ServerCommandLine {
private int start() throws Exception {
Configuration conf = getConf();
ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(conf);
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
try {
// If 'local', don't start a region server here. Defer to
// LocalHBaseCluster. It manages 'local' clusters.

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SplitLogCounters;
@ -123,7 +122,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
}
public SplitLogWorker(final ZooKeeperWatcher watcher, final Configuration conf,
RegionServerServices server, final LastSequenceId sequenceIdChecker) {
final RegionServerServices server, final LastSequenceId sequenceIdChecker) {
this(watcher, conf, server, new TaskExecutor() {
@Override
public Status exec(String filename, CancelableProgressable p) {
@ -141,7 +140,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
// encountered a bad non-retry-able persistent error.
try {
if (!HLogSplitter.splitLogFile(rootdir, fs.getFileStatus(new Path(rootdir, filename)),
fs, conf, p, sequenceIdChecker, watcher)) {
fs, conf, p, sequenceIdChecker, watcher, server.getCoordinatedStateManager())) {
return Status.PREEMPTED;
}
} catch (InterruptedIOException iioe) {

View File

@ -56,6 +56,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -66,6 +68,7 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.TableStateManager;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.client.ConnectionUtils;
@ -89,6 +92,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.StoreSequenceId;
import org.apache.hadoop.hbase.regionserver.HRegion;
@ -104,10 +108,8 @@ import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.zookeeper.KeeperException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@ -135,6 +137,7 @@ public class HLogSplitter {
private Set<TableName> disablingOrDisabledTables =
new HashSet<TableName>();
private ZooKeeperWatcher watcher;
private CoordinatedStateManager csm;
// If an exception is thrown by one of the other threads, it will be
// stored here.
@ -168,7 +171,8 @@ public class HLogSplitter {
private final int minBatchSize;
HLogSplitter(Configuration conf, Path rootDir,
FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw) {
FileSystem fs, LastSequenceId idChecker, ZooKeeperWatcher zkw,
CoordinatedStateManager csm) {
this.conf = HBaseConfiguration.create(conf);
String codecClassName = conf
.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
@ -177,6 +181,7 @@ public class HLogSplitter {
this.fs = fs;
this.sequenceIdChecker = idChecker;
this.watcher = zkw;
this.csm = csm;
entryBuffers = new EntryBuffers(
this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
@ -188,7 +193,7 @@ public class HLogSplitter {
this.distributedLogReplay = HLogSplitter.isDistributedLogReplay(this.conf);
this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
if (zkw != null && this.distributedLogReplay) {
if (zkw != null && csm != null && this.distributedLogReplay) {
outputSink = new LogReplayOutputSink(numWriterThreads);
} else {
if (this.distributedLogReplay) {
@ -219,8 +224,9 @@ public class HLogSplitter {
*/
public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
ZooKeeperWatcher zkw) throws IOException {
HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw);
ZooKeeperWatcher zkw, CoordinatedStateManager cp) throws IOException {
HLogSplitter s = new HLogSplitter(conf, rootDir, fs, idChecker, zkw,
cp);
return s.splitLogFile(logfile, reporter);
}
@ -234,7 +240,7 @@ public class HLogSplitter {
List<Path> splits = new ArrayList<Path>();
if (logfiles != null && logfiles.length > 0) {
for (FileStatus logfile: logfiles) {
HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null);
HLogSplitter s = new HLogSplitter(conf, rootDir, fs, null, null, null);
if (s.splitLogFile(logfile, null)) {
finishSplitLogFile(rootDir, oldLogDir, logfile.getPath(), conf);
if (s.outputSink.splits != null) {
@ -288,10 +294,12 @@ public class HLogSplitter {
LOG.warn("Nothing to split in log file " + logPath);
return true;
}
if(watcher != null) {
if(watcher != null && csm != null) {
try {
disablingOrDisabledTables = ZKTable.getDisabledOrDisablingTables(watcher);
} catch (KeeperException e) {
TableStateManager tsm = csm.getTableStateManager();
disablingOrDisabledTables = tsm.getTablesInStates(
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
} catch (CoordinatedStateException e) {
throw new IOException("Can't get disabling/disabled tables", e);
}
}

View File

@ -29,9 +29,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.util.Tool;
@ -146,7 +144,7 @@ public class ReplicationSyncUp extends Configured implements Tool {
}
@Override
public ConsensusProvider getConsensusProvider() {
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}

View File

@ -103,7 +103,7 @@ import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler;
import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl;
import org.apache.hadoop.hbase.util.hbck.TableLockChecker;
import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly;
import org.apache.hadoop.hbase.zookeeper.ZKTableStateClientSideReader;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.AccessControlException;
@ -1385,7 +1385,7 @@ public class HBaseFsck extends Configured {
ZooKeeperWatcher zkw = createZooKeeperWatcher();
try {
for (TableName tableName :
ZKTableReadOnly.getDisabledOrDisablingTables(zkw)) {
ZKTableStateClientSideReader.getDisabledOrDisablingTables(zkw)) {
disabledTables.add(tableName);
}
} catch (KeeperException ke) {

View File

@ -29,7 +29,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.util.ReflectionUtils;
@ -81,14 +81,14 @@ public class JVMClusterUtil {
* @return Region server added.
*/
public static JVMClusterUtil.RegionServerThread createRegionServerThread(
final Configuration c, ConsensusProvider cp, final Class<? extends HRegionServer> hrsc,
final Configuration c, CoordinatedStateManager cp, final Class<? extends HRegionServer> hrsc,
final int index)
throws IOException {
HRegionServer server;
try {
Constructor<? extends HRegionServer> ctor = hrsc.getConstructor(Configuration.class,
ConsensusProvider.class);
CoordinatedStateManager.class);
ctor.setAccessible(true);
server = ctor.newInstance(c, cp);
} catch (InvocationTargetException ite) {
@ -133,12 +133,12 @@ public class JVMClusterUtil {
* @return Master added.
*/
public static JVMClusterUtil.MasterThread createMasterThread(
final Configuration c, ConsensusProvider cp, final Class<? extends HMaster> hmc,
final Configuration c, CoordinatedStateManager cp, final Class<? extends HMaster> hmc,
final int index)
throws IOException {
HMaster server;
try {
server = hmc.getConstructor(Configuration.class, ConsensusProvider.class).
server = hmc.getConstructor(Configuration.class, CoordinatedStateManager.class).
newInstance(c, cp);
} catch (InvocationTargetException ite) {
Throwable target = ite.getTargetException();

View File

@ -110,7 +110,7 @@ public class MiniHBaseCluster extends HBaseCluster {
private User user = null;
public static boolean TEST_SKIP_CLOSE = false;
public MiniHBaseClusterRegionServer(Configuration conf, ConsensusProvider cp)
public MiniHBaseClusterRegionServer(Configuration conf, CoordinatedStateManager cp)
throws IOException, InterruptedException {
super(conf, cp);
this.user = User.getCurrent();

View File

@ -136,7 +136,7 @@ class MockRegionServerServices implements RegionServerServices {
}
@Override
public ConsensusProvider getConsensusProvider() {
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}

View File

@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.consensus.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorType;
@ -108,6 +109,12 @@ public class TestDrainingServer {
Mockito.when(server.getServerName()).thenReturn(ServerName.valueOf("masterMock,1,1"));
Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher);
CoordinatedStateManager cp = new ZkCoordinatedStateManager();
cp.initialize(server);
cp.start();
Mockito.when(server.getCoordinatedStateManager()).thenReturn(cp);
Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers);
Mockito.when(serverManager.getOnlineServersList())
.thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
@ -204,6 +211,12 @@ public class TestDrainingServer {
Mockito.when(server.getServerName()).thenReturn(ServerName.valueOf("masterMock,1,1"));
Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher);
CoordinatedStateManager cp = new ZkCoordinatedStateManager();
cp.initialize(server);
cp.start();
Mockito.when(server.getCoordinatedStateManager()).thenReturn(cp);
Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers);
Mockito.when(serverManager.getOnlineServersList()).thenReturn(
new ArrayList<ServerName>(onlineServers.keySet()));
@ -291,4 +304,4 @@ public class TestDrainingServer {
executor.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, 3);
return executor;
}
}
}

View File

@ -64,7 +64,7 @@ public class TestLocalHBaseCluster {
* running in local mode.
*/
public static class MyHMaster extends HMaster {
public MyHMaster(Configuration conf, ConsensusProvider cp)
public MyHMaster(Configuration conf, CoordinatedStateManager cp)
throws IOException, KeeperException,
InterruptedException {
super(conf, cp);
@ -80,7 +80,7 @@ public class TestLocalHBaseCluster {
*/
public static class MyHRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
public MyHRegionServer(Configuration conf, ConsensusProvider cp) throws IOException,
public MyHRegionServer(Configuration conf, CoordinatedStateManager cp) throws IOException,
InterruptedException {
super(conf, cp);
}

View File

@ -60,12 +60,13 @@ import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtilsForTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKTableReadOnly;
import org.apache.hadoop.hbase.zookeeper.ZKTableStateClientSideReader;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
import org.junit.AfterClass;
@ -255,8 +256,8 @@ public class TestAdmin {
this.admin.disableTable(ht.getName());
assertTrue("Table must be disabled.", TEST_UTIL.getHBaseCluster()
.getMaster().getAssignmentManager().getZKTable().isDisabledTable(
ht.getName()));
.getMaster().getAssignmentManager().getTableStateManager().isTableState(
ht.getName(), ZooKeeperProtos.Table.State.DISABLED));
// Test that table is disabled
get = new Get(row);
@ -270,8 +271,8 @@ public class TestAdmin {
assertTrue(ok);
this.admin.enableTable(table);
assertTrue("Table must be enabled.", TEST_UTIL.getHBaseCluster()
.getMaster().getAssignmentManager().getZKTable().isEnabledTable(
ht.getName()));
.getMaster().getAssignmentManager().getTableStateManager().isTableState(
ht.getName(), ZooKeeperProtos.Table.State.ENABLED));
// Test that table is enabled
try {
@ -343,8 +344,8 @@ public class TestAdmin {
tables = this.admin.listTables();
assertEquals(numTables + 1, tables.length);
assertTrue("Table must be enabled.", TEST_UTIL.getHBaseCluster()
.getMaster().getAssignmentManager().getZKTable().isEnabledTable(
TableName.valueOf("testCreateTable")));
.getMaster().getAssignmentManager().getTableStateManager().isTableState(
TableName.valueOf("testCreateTable"), ZooKeeperProtos.Table.State.ENABLED));
}
@Test (timeout=300000)
@ -1125,8 +1126,8 @@ public class TestAdmin {
ZooKeeperWatcher zkw = HBaseTestingUtility.getZooKeeperWatcher(TEST_UTIL);
TableName tableName = TableName.valueOf("testMasterAdmin");
TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY).close();
while (!ZKTableReadOnly.isEnabledTable(zkw,
TableName.valueOf("testMasterAdmin"))) {
while (!ZKTableStateClientSideReader.isEnabledTable(zkw,
TableName.valueOf("testMasterAdmin"))) {
Thread.sleep(10);
}
this.admin.disableTable(tableName);

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
@ -130,7 +130,7 @@ public class TestClientScannerRPCTimeout {
}
private static class RegionServerWithScanTimeout extends MiniHBaseClusterRegionServer {
public RegionServerWithScanTimeout(Configuration conf, ConsensusProvider cp)
public RegionServerWithScanTimeout(Configuration conf, CoordinatedStateManager cp)
throws IOException, InterruptedException {
super(conf, cp);
}

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
@ -269,7 +269,7 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
}
@Override
public ConsensusProvider getConsensusProvider() {
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}

View File

@ -28,12 +28,12 @@ import java.util.concurrent.Semaphore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
@ -282,7 +282,7 @@ public class TestActiveMasterManager {
}
@Override
public ConsensusProvider getConsensusProvider() {
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}

View File

@ -32,6 +32,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@ -50,8 +52,8 @@ import org.apache.hadoop.hbase.catalog.MetaMockingUtil;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.ConsensusProviderFactory;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.consensus.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
@ -116,6 +118,7 @@ public class TestAssignmentManager {
private Server server;
private ServerManager serverManager;
private ZooKeeperWatcher watcher;
private CoordinatedStateManager cp;
private LoadBalancer balancer;
private HMaster master;
@ -146,6 +149,12 @@ public class TestAssignmentManager {
Mockito.doThrow(new RuntimeException("Aborted")).
when(server).abort(Mockito.anyString(), (Throwable)Mockito.anyObject());
cp = new ZkCoordinatedStateManager();
cp.initialize(this.server);
cp.start();
Mockito.when(server.getCoordinatedStateManager()).thenReturn(cp);
// Mock a ServerManager. Say server SERVERNAME_{A,B} are online. Also
// make it so if close or open, we return 'success'.
this.serverManager = Mockito.mock(ServerManager.class);
@ -184,6 +193,7 @@ public class TestAssignmentManager {
// Clean up all znodes
ZKAssign.deleteAllNodes(this.watcher);
this.watcher.close();
this.cp.stop();
}
}
@ -197,7 +207,8 @@ public class TestAssignmentManager {
*/
@Test(timeout = 60000)
public void testBalanceOnMasterFailoverScenarioWithOpenedNode()
throws IOException, KeeperException, InterruptedException, ServiceException, DeserializationException {
throws IOException, KeeperException, InterruptedException, ServiceException,
DeserializationException, CoordinatedStateException {
AssignmentManagerWithExtrasForTesting am =
setUpMockedAssignmentManager(this.server, this.serverManager);
try {
@ -244,8 +255,9 @@ public class TestAssignmentManager {
}
@Test(timeout = 60000)
public void testBalanceOnMasterFailoverScenarioWithClosedNode()
throws IOException, KeeperException, InterruptedException, ServiceException, DeserializationException {
public void testBalanceOnMasterFailoverScenarioWithClosedNode()
throws IOException, KeeperException, InterruptedException, ServiceException,
DeserializationException, CoordinatedStateException {
AssignmentManagerWithExtrasForTesting am =
setUpMockedAssignmentManager(this.server, this.serverManager);
try {
@ -294,7 +306,8 @@ public class TestAssignmentManager {
@Test(timeout = 60000)
public void testBalanceOnMasterFailoverScenarioWithOfflineNode()
throws IOException, KeeperException, InterruptedException, ServiceException, DeserializationException {
throws IOException, KeeperException, InterruptedException, ServiceException,
DeserializationException, CoordinatedStateException {
AssignmentManagerWithExtrasForTesting am =
setUpMockedAssignmentManager(this.server, this.serverManager);
try {
@ -359,8 +372,8 @@ public class TestAssignmentManager {
* @throws DeserializationException
*/
@Test
public void testBalance()
throws IOException, KeeperException, DeserializationException, InterruptedException {
public void testBalance() throws IOException, KeeperException, DeserializationException,
InterruptedException, CoordinatedStateException {
// Create and startup an executor. This is used by AssignmentManager
// handling zk callbacks.
ExecutorService executor = startupMasterExecutor("testBalanceExecutor");
@ -435,7 +448,7 @@ public class TestAssignmentManager {
*/
@Test
public void testShutdownHandler()
throws KeeperException, IOException, ServiceException {
throws KeeperException, IOException, CoordinatedStateException, ServiceException {
// Create and startup an executor. This is used by AssignmentManager
// handling zk callbacks.
ExecutorService executor = startupMasterExecutor("testShutdownHandler");
@ -466,7 +479,7 @@ public class TestAssignmentManager {
*/
@Test
public void testSSHWhenDisableTableInProgress() throws KeeperException, IOException,
ServiceException {
CoordinatedStateException, ServiceException {
testCaseWithPartiallyDisabledState(Table.State.DISABLING);
testCaseWithPartiallyDisabledState(Table.State.DISABLED);
}
@ -488,7 +501,8 @@ public class TestAssignmentManager {
}
private void testCaseWithSplitRegionPartial(boolean regionSplitDone) throws KeeperException,
IOException, NodeExistsException, InterruptedException, ServiceException {
IOException, InterruptedException,
CoordinatedStateException, ServiceException {
// Create and startup an executor. This is used by AssignmentManager
// handling zk callbacks.
ExecutorService executor = startupMasterExecutor("testSSHWhenSplitRegionInProgress");
@ -504,7 +518,8 @@ public class TestAssignmentManager {
// adding region in pending close.
am.getRegionStates().updateRegionState(
REGIONINFO, State.SPLITTING, SERVERNAME_A);
am.getZKTable().setEnabledTable(REGIONINFO.getTable());
am.getTableStateManager().setTableState(REGIONINFO.getTable(),
Table.State.ENABLED);
RegionTransition data = RegionTransition.createRegionTransition(EventType.RS_ZK_REGION_SPLITTING,
REGIONINFO.getRegionName(), SERVERNAME_A);
String node = ZKAssign.getNodeName(this.watcher, REGIONINFO.getEncodedName());
@ -536,7 +551,7 @@ public class TestAssignmentManager {
}
private void testCaseWithPartiallyDisabledState(Table.State state) throws KeeperException,
IOException, NodeExistsException, ServiceException {
IOException, CoordinatedStateException, ServiceException {
// Create and startup an executor. This is used by AssignmentManager
// handling zk callbacks.
ExecutorService executor = startupMasterExecutor("testSSHWhenDisableTableInProgress");
@ -553,9 +568,11 @@ public class TestAssignmentManager {
// adding region in pending close.
am.getRegionStates().updateRegionState(REGIONINFO, State.PENDING_CLOSE);
if (state == Table.State.DISABLING) {
am.getZKTable().setDisablingTable(REGIONINFO.getTable());
am.getTableStateManager().setTableState(REGIONINFO.getTable(),
Table.State.DISABLING);
} else {
am.getZKTable().setDisabledTable(REGIONINFO.getTable());
am.getTableStateManager().setTableState(REGIONINFO.getTable(),
Table.State.DISABLED);
}
RegionTransition data = RegionTransition.createRegionTransition(EventType.M_ZK_REGION_CLOSING,
REGIONINFO.getRegionName(), SERVERNAME_A);
@ -668,7 +685,8 @@ public class TestAssignmentManager {
}
@Test
public void testUnassignWithSplitAtSameTime() throws KeeperException, IOException {
public void testUnassignWithSplitAtSameTime() throws KeeperException,
IOException, CoordinatedStateException {
// Region to use in test.
final HRegionInfo hri = HRegionInfo.FIRST_META_REGIONINFO;
// First amend the servermanager mock so that when we do send close of the
@ -713,7 +731,8 @@ public class TestAssignmentManager {
*/
@Test(timeout = 60000)
public void testProcessDeadServersAndRegionsInTransitionShouldNotFailWithNPE()
throws IOException, KeeperException, InterruptedException, ServiceException {
throws IOException, KeeperException, CoordinatedStateException,
InterruptedException, ServiceException {
final RecoverableZooKeeper recoverableZk = Mockito
.mock(RecoverableZooKeeper.class);
AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(
@ -744,7 +763,7 @@ public class TestAssignmentManager {
*/
@Test(timeout = 60000)
public void testRegionPlanIsUpdatedWhenRegionFailsToOpen() throws IOException, KeeperException,
ServiceException, InterruptedException {
ServiceException, InterruptedException, CoordinatedStateException {
this.server.getConfiguration().setClass(
HConstants.HBASE_MASTER_LOADBALANCER_CLASS, MockedLoadBalancer.class,
LoadBalancer.class);
@ -842,7 +861,7 @@ public class TestAssignmentManager {
*/
@Test(timeout = 60000)
public void testRegionInOpeningStateOnDeadRSWhileMasterFailover() throws IOException,
KeeperException, ServiceException, InterruptedException {
KeeperException, ServiceException, CoordinatedStateException, InterruptedException {
AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(
this.server, this.serverManager);
ZKAssign.createNodeOffline(this.watcher, REGIONINFO, SERVERNAME_A);
@ -858,7 +877,8 @@ public class TestAssignmentManager {
am.gate.set(false);
CatalogTracker ct = Mockito.mock(CatalogTracker.class);
assertFalse(am.processRegionsInTransition(rt, REGIONINFO, version));
am.getZKTable().setEnabledTable(REGIONINFO.getTable());
am.getTableStateManager().setTableState(REGIONINFO.getTable(),
Table.State.ENABLED);
processServerShutdownHandler(ct, am, false);
// Waiting for the assignment to get completed.
while (!am.gate.get()) {
@ -889,8 +909,9 @@ public class TestAssignmentManager {
// To avoid cast exception in DisableTableHandler process.
HTU.getConfiguration().setInt(HConstants.MASTER_PORT, 0);
ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(HTU.getConfiguration());
Server server = new HMaster(HTU.getConfiguration(), cp);
CoordinatedStateManager csm = CoordinatedStateManagerFactory.getCoordinatedStateManager(
HTU.getConfiguration());
Server server = new HMaster(HTU.getConfiguration(), csm);
AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(server,
this.serverManager);
AtomicBoolean gate = new AtomicBoolean(false);
@ -899,7 +920,8 @@ public class TestAssignmentManager {
}
try{
// set table in disabling state.
am.getZKTable().setDisablingTable(REGIONINFO.getTable());
am.getTableStateManager().setTableState(REGIONINFO.getTable(),
Table.State.DISABLING);
am.joinCluster();
// should not call retainAssignment if we get empty regions in assignAllUserRegions.
assertFalse(
@ -907,12 +929,14 @@ public class TestAssignmentManager {
gate.get());
// need to change table state from disabling to disabled.
assertTrue("Table should be disabled.",
am.getZKTable().isDisabledTable(REGIONINFO.getTable()));
am.getTableStateManager().isTableState(REGIONINFO.getTable(),
Table.State.DISABLED));
} finally {
this.server.getConfiguration().setClass(
HConstants.HBASE_MASTER_LOADBALANCER_CLASS, SimpleLoadBalancer.class,
LoadBalancer.class);
am.getZKTable().setEnabledTable(REGIONINFO.getTable());
am.getTableStateManager().setTableState(REGIONINFO.getTable(),
Table.State.ENABLED);
am.shutdown();
}
}
@ -932,24 +956,28 @@ public class TestAssignmentManager {
Mockito.when(this.serverManager.createDestinationServersList()).thenReturn(destServers);
Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(true);
HTU.getConfiguration().setInt(HConstants.MASTER_PORT, 0);
ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(HTU.getConfiguration());
Server server = new HMaster(HTU.getConfiguration(), cp);
CoordinatedStateManager csm = CoordinatedStateManagerFactory.getCoordinatedStateManager(
HTU.getConfiguration());
Server server = new HMaster(HTU.getConfiguration(), csm);
Whitebox.setInternalState(server, "serverManager", this.serverManager);
AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(server,
this.serverManager);
try {
// set table in enabling state.
am.getZKTable().setEnablingTable(REGIONINFO.getTable());
am.getTableStateManager().setTableState(REGIONINFO.getTable(),
Table.State.ENABLING);
new EnableTableHandler(server, REGIONINFO.getTable(),
am.getCatalogTracker(), am, new NullTableLockManager(), true).prepare()
.process();
assertEquals("Number of assignments should be 1.", 1, assignmentCount);
assertTrue("Table should be enabled.",
am.getZKTable().isEnabledTable(REGIONINFO.getTable()));
am.getTableStateManager().isTableState(REGIONINFO.getTable(),
Table.State.ENABLED));
} finally {
enabling = false;
assignmentCount = 0;
am.getZKTable().setEnabledTable(REGIONINFO.getTable());
am.getTableStateManager().setTableState(REGIONINFO.getTable(),
Table.State.ENABLED);
am.shutdown();
ZKAssign.deleteAllNodes(this.watcher);
}
@ -964,24 +992,26 @@ public class TestAssignmentManager {
*/
@Test
public void testMasterRestartShouldRemoveStaleZnodesOfUnknownTableAsForMeta()
throws KeeperException, IOException, Exception {
throws Exception {
List<ServerName> destServers = new ArrayList<ServerName>(1);
destServers.add(SERVERNAME_A);
Mockito.when(this.serverManager.createDestinationServersList()).thenReturn(destServers);
Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(true);
HTU.getConfiguration().setInt(HConstants.MASTER_PORT, 0);
ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(HTU.getConfiguration());
Server server = new HMaster(HTU.getConfiguration(), cp);
CoordinatedStateManager csm = CoordinatedStateManagerFactory.getCoordinatedStateManager(
HTU.getConfiguration());
Server server = new HMaster(HTU.getConfiguration(), csm);
Whitebox.setInternalState(server, "serverManager", this.serverManager);
AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(server,
this.serverManager);
try {
TableName tableName = TableName.valueOf("dummyTable");
// set table in enabling state.
am.getZKTable().setEnablingTable(tableName);
am.getTableStateManager().setTableState(tableName,
Table.State.ENABLING);
am.joinCluster();
assertFalse("Table should not be present in zookeeper.",
am.getZKTable().isTablePresent(tableName));
am.getTableStateManager().isTablePresent(tableName));
} finally {
}
}
@ -992,7 +1022,7 @@ public class TestAssignmentManager {
*/
@Test
public void testSSHTimesOutOpeningRegionTransition()
throws KeeperException, IOException, ServiceException {
throws KeeperException, IOException, CoordinatedStateException, ServiceException {
// We need a mocked catalog tracker.
CatalogTracker ct = Mockito.mock(CatalogTracker.class);
// Create an AM.
@ -1006,7 +1036,8 @@ public class TestAssignmentManager {
// adding region plan
am.regionPlans.put(REGIONINFO.getEncodedName(),
new RegionPlan(REGIONINFO, SERVERNAME_B, SERVERNAME_A));
am.getZKTable().setEnabledTable(REGIONINFO.getTable());
am.getTableStateManager().setTableState(REGIONINFO.getTable(),
Table.State.ENABLED);
try {
am.assignInvoked = false;
@ -1102,7 +1133,8 @@ public class TestAssignmentManager {
* @throws KeeperException
*/
private AssignmentManagerWithExtrasForTesting setUpMockedAssignmentManager(final Server server,
final ServerManager manager) throws IOException, KeeperException, ServiceException {
final ServerManager manager) throws IOException, KeeperException,
ServiceException, CoordinatedStateException {
// We need a mocked catalog tracker. Its used by our AM instance.
CatalogTracker ct = Mockito.mock(CatalogTracker.class);
// Make an RS Interface implementation. Make it so a scanner can go against
@ -1173,7 +1205,7 @@ public class TestAssignmentManager {
final Server master, final ServerManager serverManager,
final CatalogTracker catalogTracker, final LoadBalancer balancer,
final ExecutorService service, final TableLockManager tableLockManager)
throws KeeperException, IOException {
throws KeeperException, IOException, CoordinatedStateException {
super(master, serverManager, catalogTracker, balancer, service, null, tableLockManager);
this.es = service;
this.ct = catalogTracker;
@ -1267,6 +1299,8 @@ public class TestAssignmentManager {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (CoordinatedStateException e) {
throw new RuntimeException(e);
}
}
};
@ -1309,7 +1343,8 @@ public class TestAssignmentManager {
* assignment). So during master failover, we can ignored such events too.
*/
@Test
public void testAssignmentEventIgnoredIfNotExpected() throws KeeperException, IOException {
public void testAssignmentEventIgnoredIfNotExpected() throws KeeperException, IOException,
CoordinatedStateException {
// Region to use in test.
final HRegionInfo hri = HRegionInfo.FIRST_META_REGIONINFO;
// Need a mocked catalog tracker.

View File

@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -567,7 +568,7 @@ public class TestAssignmentManagerOnCluster {
Thread.sleep(100);
}
am.getZKTable().setDisablingTable(table);
am.getTableStateManager().setTableState(table, ZooKeeperProtos.Table.State.DISABLING);
List<HRegionInfo> toAssignRegions = am.processServerShutdown(destServerName);
assertTrue("Regions to be assigned should be empty.", toAssignRegions.isEmpty());
assertTrue("Regions to be assigned should be empty.", am.getRegionStates()
@ -576,7 +577,7 @@ public class TestAssignmentManagerOnCluster {
if (hri != null && serverName != null) {
am.regionOnline(hri, serverName);
}
am.getZKTable().setDisabledTable(table);
am.getTableStateManager().setTableState(table, ZooKeeperProtos.Table.State.DISABLED);
TEST_UTIL.deleteTable(table);
}
}
@ -838,7 +839,7 @@ public class TestAssignmentManagerOnCluster {
public static class MyMaster extends HMaster {
AtomicBoolean enabled = new AtomicBoolean(true);
public MyMaster(Configuration conf, ConsensusProvider cp)
public MyMaster(Configuration conf, CoordinatedStateManager cp)
throws IOException, KeeperException,
InterruptedException {
super(conf, cp);

View File

@ -56,7 +56,7 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.master.CatalogJanitor.SplitParentFirstComparator;
@ -165,7 +165,7 @@ public class TestCatalogJanitor {
}
@Override
public ConsensusProvider getConsensusProvider() {
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}
@ -249,7 +249,7 @@ public class TestCatalogJanitor {
}
@Override
public ConsensusProvider getConsensusProvider() {
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}

View File

@ -27,7 +27,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -62,7 +62,7 @@ public class TestClockSkewDetection {
}
@Override
public ConsensusProvider getConsensusProvider() {
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}

View File

@ -26,8 +26,8 @@ import java.net.SocketTimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.ConsensusProviderFactory;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
@ -48,7 +48,7 @@ public class TestHMasterRPCException {
TEST_UTIL.startMiniZKCluster();
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(HConstants.MASTER_PORT, "0");
ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(conf);
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
HMaster hm = new HMaster(conf, cp);
ServerName sm = hm.getServerName();
RpcClient rpcClient = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT);

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@ -67,7 +68,8 @@ public class TestMaster {
HMaster m = cluster.getMaster();
HTable ht = TEST_UTIL.createTable(TABLENAME, FAMILYNAME);
assertTrue(m.assignmentManager.getZKTable().isEnabledTable(TABLENAME));
assertTrue(m.assignmentManager.getTableStateManager().isTableState(TABLENAME,
ZooKeeperProtos.Table.State.ENABLED));
TEST_UTIL.loadTable(ht, FAMILYNAME, false);
ht.close();

View File

@ -47,9 +47,11 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableStateManager;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
@ -63,7 +65,7 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.MetaRegionTracker;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZKTableStateManager;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
@ -305,8 +307,8 @@ public class TestMasterFailover {
log("Beginning to mock scenarios");
// Disable the disabledTable in ZK
ZKTable zktable = new ZKTable(zkw);
zktable.setDisabledTable(disabledTable);
TableStateManager zktable = new ZKTableStateManager(zkw);
zktable.setTableState(disabledTable, ZooKeeperProtos.Table.State.DISABLED);
/*
* ZK = OFFLINE
@ -620,7 +622,8 @@ public class TestMasterFailover {
log("Assignment completed");
assertTrue(" Table must be enabled.", master.getAssignmentManager()
.getZKTable().isEnabledTable(TableName.valueOf("enabledTable")));
.getTableStateManager().isTableState(TableName.valueOf("enabledTable"),
ZooKeeperProtos.Table.State.ENABLED));
// we also need regions assigned out on the dead server
List<HRegionInfo> enabledAndOnDeadRegions = new ArrayList<HRegionInfo>();
enabledAndOnDeadRegions.addAll(enabledRegions.subList(0, 6));
@ -694,11 +697,12 @@ public class TestMasterFailover {
log("Beginning to mock scenarios");
// Disable the disabledTable in ZK
ZKTable zktable = new ZKTable(zkw);
zktable.setDisabledTable(disabledTable);
TableStateManager zktable = new ZKTableStateManager(zkw);
zktable.setTableState(disabledTable, ZooKeeperProtos.Table.State.DISABLED);
assertTrue(" The enabled table should be identified on master fail over.",
zktable.isEnabledTable(TableName.valueOf("enabledTable")));
zktable.isTableState(TableName.valueOf("enabledTable"),
ZooKeeperProtos.Table.State.ENABLED));
/*
* ZK = CLOSING

View File

@ -23,11 +23,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
@ -50,7 +50,7 @@ public class TestMasterMetrics {
private static HBaseTestingUtility TEST_UTIL;
public static class MyMaster extends HMaster {
public MyMaster(Configuration conf, ConsensusProvider cp) throws IOException,
public MyMaster(Configuration conf, CoordinatedStateManager cp) throws IOException,
KeeperException, InterruptedException {
super(conf, cp);
}

View File

@ -32,6 +32,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -46,8 +49,6 @@ import org.apache.hadoop.hbase.catalog.MetaMockingUtil;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.ConsensusProviderFactory;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
@ -125,7 +126,7 @@ public class TestMasterNoCluster {
@Test (timeout=30000)
public void testStopDuringStart()
throws IOException, KeeperException, InterruptedException {
ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(
TESTUTIL.getConfiguration());
HMaster master = new HMaster(TESTUTIL.getConfiguration(), cp);
master.start();
@ -177,7 +178,7 @@ public class TestMasterNoCluster {
// and get notification on transitions. We need to fake out any rpcs the
// master does opening/closing regions. Also need to fake out the address
// of the 'remote' mocked up regionservers.
ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(
TESTUTIL.getConfiguration());
HMaster master = new HMaster(conf, cp) {
InetAddress getRemoteInetAddress(final int port, final long serverStartCode)
@ -260,7 +261,7 @@ public class TestMasterNoCluster {
final ServerName deadServer = ServerName.valueOf("test.sample", 1, 100);
final MockRegionServer rs0 = new MockRegionServer(conf, newServer);
ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(
TESTUTIL.getConfiguration());
HMaster master = new HMaster(conf, cp) {
@Override
@ -269,7 +270,7 @@ public class TestMasterNoCluster {
@Override
void initializeZKBasedSystemTrackers() throws IOException,
InterruptedException, KeeperException {
InterruptedException, KeeperException, CoordinatedStateException {
super.initializeZKBasedSystemTrackers();
// Record a newer server in server manager at first
serverManager.recordNewServerWithLock(newServer, ServerLoad.EMPTY_SERVERLOAD);

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
@ -96,8 +97,9 @@ public class TestMasterRestartAfterDisablingTable {
cluster.waitForActiveAndReadyMaster();
assertTrue("The table should not be in enabled state", cluster.getMaster()
.getAssignmentManager().getZKTable().isDisablingOrDisabledTable(
TableName.valueOf("tableRestart")));
.getAssignmentManager().getTableStateManager().isTableState(
TableName.valueOf("tableRestart"), ZooKeeperProtos.Table.State.DISABLED,
ZooKeeperProtos.Table.State.DISABLING));
log("Enabling table\n");
// Need a new Admin, the previous one is on the old master
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
@ -111,8 +113,8 @@ public class TestMasterRestartAfterDisablingTable {
+ " switch except for the catalog and namespace tables.",
6, regions.size());
assertTrue("The table should be in enabled state", cluster.getMaster()
.getAssignmentManager().getZKTable()
.isEnabledTable(TableName.valueOf("tableRestart")));
.getAssignmentManager().getTableStateManager()
.isTableState(TableName.valueOf("tableRestart"), ZooKeeperProtos.Table.State.ENABLED));
ht.close();
TEST_UTIL.shutdownMiniCluster();
}

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MockServer;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZKTableStateManager;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@ -130,7 +130,7 @@ public class TestOpenedRegionHandler {
// create a node with OPENED state
zkw = HBaseTestingUtility.createAndForceNodeToOpenedState(TEST_UTIL,
region, server.getServerName());
when(am.getZKTable()).thenReturn(new ZKTable(zkw));
when(am.getTableStateManager()).thenReturn(new ZKTableStateManager(zkw));
Stat stat = new Stat();
String nodeName = ZKAssign.getNodeName(zkw, region.getRegionInfo()
.getEncodedName());

View File

@ -29,13 +29,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -210,7 +210,7 @@ public class TestHFileCleaner {
}
@Override
public ConsensusProvider getConsensusProvider() {
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}

View File

@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileArchiveUtil;
@ -146,7 +146,7 @@ public class TestHFileLinkCleaner {
}
@Override
public ConsensusProvider getConsensusProvider() {
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
@ -152,7 +152,7 @@ public class TestLogsCleaner {
}
@Override
public ConsensusProvider getConsensusProvider() {
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}

View File

@ -25,7 +25,7 @@ import java.util.List;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
@ -42,7 +42,7 @@ import com.google.protobuf.ServiceException;
public class OOMERegionServer extends HRegionServer {
private List<Put> retainer = new ArrayList<Put>();
public OOMERegionServer(HBaseConfiguration conf, ConsensusProvider cp)
public OOMERegionServer(HBaseConfiguration conf, CoordinatedStateManager cp)
throws IOException, InterruptedException {
super(conf, cp);
}

View File

@ -27,11 +27,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.ConsensusProviderFactory;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
@ -72,7 +72,7 @@ public class TestClusterId {
TEST_UTIL.startMiniDFSCluster(1);
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(conf);
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
//start region server, needs to be separate
//so we get an unset clusterId
rst = JVMClusterUtil.createRegionServerThread(conf,cp,

View File

@ -28,12 +28,12 @@ import java.lang.management.ManagementFactory;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
@ -420,7 +420,7 @@ public class TestHeapMemoryManager {
}
@Override
public ConsensusProvider getConsensusProvider() {
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}

View File

@ -25,12 +25,12 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.ConsensusProviderFactory;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Get;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
@ -58,7 +58,7 @@ public class TestPriorityRpc {
public void setup() {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean("hbase.testing.nocluster", true); // No need to do ZK
ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(conf);
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
regionServer = HRegionServer.constructRegionServer(HRegionServer.class, conf, cp);
priority = regionServer.rpcServices.getPriority();
}

View File

@ -25,6 +25,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@ -32,7 +33,6 @@ import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
@ -104,7 +104,7 @@ public class TestRSKilledWhenInitializing {
public static class MockedRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
public MockedRegionServer(Configuration conf, ConsensusProvider cp)
public MockedRegionServer(Configuration conf, CoordinatedStateManager cp)
throws IOException, InterruptedException {
super(conf, cp);
}

View File

@ -31,6 +31,8 @@ import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@ -42,8 +44,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.ConsensusProviderFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.util.Bytes;
@ -220,7 +220,7 @@ public class TestRegionMergeTransaction {
// Run the execute. Look at what it returns.
TEST_UTIL.getConfiguration().setInt(HConstants.REGIONSERVER_PORT, 0);
ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(
TEST_UTIL.getConfiguration());
Server mockServer = new HRegionServer(TEST_UTIL.getConfiguration(), cp);
HRegion mergedRegion = mt.execute(mockServer, null);
@ -269,7 +269,7 @@ public class TestRegionMergeTransaction {
// Run the execute. Look at what it returns.
boolean expectedException = false;
TEST_UTIL.getConfiguration().setInt(HConstants.REGIONSERVER_PORT, 0);
ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(
TEST_UTIL.getConfiguration());
Server mockServer = new HRegionServer(TEST_UTIL.getConfiguration(), cp);
try {
@ -330,7 +330,7 @@ public class TestRegionMergeTransaction {
// Run the execute. Look at what it returns.
boolean expectedException = false;
TEST_UTIL.getConfiguration().setInt(HConstants.REGIONSERVER_PORT, 0);
ConsensusProvider cp = ConsensusProviderFactory.getConsensusProvider(
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(
TEST_UTIL.getConfiguration());
Server mockServer = new HRegionServer(TEST_UTIL.getConfiguration(), cp);
try {

View File

@ -65,7 +65,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@ -1298,7 +1298,7 @@ public class TestSplitTransactionOnCluster {
public static class MockMasterWithoutCatalogJanitor extends HMaster {
public MockMasterWithoutCatalogJanitor(Configuration conf, ConsensusProvider cp)
public MockMasterWithoutCatalogJanitor(Configuration conf, CoordinatedStateManager cp)
throws IOException, KeeperException,
InterruptedException {
super(conf, cp);

View File

@ -112,7 +112,7 @@ public class TestHLogMethods {
public void testEntrySink() throws Exception {
Configuration conf = new Configuration();
HLogSplitter splitter = new HLogSplitter(
conf, mock(Path.class), mock(FileSystem.class), null, null);
conf, mock(Path.class), mock(FileSystem.class), null, null, null);
EntryBuffers sink = splitter.new EntryBuffers(1*1024*1024);
for (int i = 0; i < 1000; i++) {

View File

@ -805,7 +805,7 @@ public class TestHLogSplit {
logfiles != null && logfiles.length > 0);
// Set up a splitter that will throw an IOE on the output side
HLogSplitter logSplitter = new HLogSplitter(
conf, HBASEDIR, fs, null, null) {
conf, HBASEDIR, fs, null, null, null) {
protected HLog.Writer createWriter(FileSystem fs,
Path logfile, Configuration conf) throws IOException {
HLog.Writer mockWriter = Mockito.mock(HLog.Writer.class);
@ -938,7 +938,7 @@ public class TestHLogSplit {
try {
conf.setInt("hbase.splitlog.report.period", 1000);
boolean ret = HLogSplitter.splitLogFile(
HBASEDIR, logfile, spiedFs, conf, localReporter, null, null);
HBASEDIR, logfile, spiedFs, conf, localReporter, null, null, null);
assertFalse("Log splitting should failed", ret);
assertTrue(count.get() > 0);
} catch (IOException e) {
@ -997,7 +997,7 @@ public class TestHLogSplit {
// Create a splitter that reads and writes the data without touching disk
HLogSplitter logSplitter = new HLogSplitter(
localConf, HBASEDIR, fs, null, null) {
localConf, HBASEDIR, fs, null, null, null) {
/* Produce a mock writer that doesn't write anywhere */
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
@ -1282,7 +1282,7 @@ public class TestHLogSplit {
logfiles != null && logfiles.length > 0);
HLogSplitter logSplitter = new HLogSplitter(
conf, HBASEDIR, fs, null, null) {
conf, HBASEDIR, fs, null, null, null) {
protected HLog.Writer createWriter(FileSystem fs, Path logfile, Configuration conf)
throws IOException {
HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, logfile, conf);

View File

@ -873,7 +873,7 @@ public class TestWALReplay {
wal.close();
FileStatus[] listStatus = this.fs.listStatus(wal.getDir());
HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0],
this.fs, this.conf, null, null, null);
this.fs, this.conf, null, null, null, null);
FileStatus[] listStatus1 = this.fs.listStatus(
new Path(FSUtils.getTableDir(hbaseRootDir, tableName),
new Path(hri.getEncodedName(), "recovered.edits")));
@ -1034,4 +1034,4 @@ public class TestWALReplay {
htd.addFamily(c);
return htd;
}
}
}

View File

@ -24,13 +24,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -143,7 +143,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
}
@Override
public ConsensusProvider getConsensusProvider() {
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}

View File

@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -219,7 +219,7 @@ public class TestReplicationTrackerZKImpl {
}
@Override
public ConsensusProvider getConsensusProvider() {
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}

View File

@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@ -392,7 +392,7 @@ public class TestReplicationSourceManager {
}
@Override
public ConsensusProvider getConsensusProvider() {
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}

View File

@ -34,6 +34,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
@ -43,7 +44,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
@ -154,7 +154,7 @@ public class TestTokenAuthentication {
}
@Override
public ConsensusProvider getConsensusProvider() {
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}

View File

@ -22,12 +22,12 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.ConsensusProvider;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
@ -96,7 +96,7 @@ public class MockServer implements Server {
}
@Override
public ConsensusProvider getConsensusProvider() {
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}

View File

@ -1,110 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.*;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestZKTable {
private static final Log LOG = LogFactory.getLog(TestZooKeeperNodeTracker.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniZKCluster();
}
@Test
public void testTableStates()
throws ZooKeeperConnectionException, IOException, KeeperException, InterruptedException {
final TableName name =
TableName.valueOf("testDisabled");
Abortable abortable = new Abortable() {
@Override
public void abort(String why, Throwable e) {
LOG.info(why, e);
}
@Override
public boolean isAborted() {
return false;
}
};
ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
name.getNameAsString(), abortable, true);
ZKTable zkt = new ZKTable(zkw);
assertFalse(zkt.isEnabledTable(name));
assertFalse(zkt.isDisablingTable(name));
assertFalse(zkt.isDisabledTable(name));
assertFalse(zkt.isEnablingTable(name));
assertFalse(zkt.isDisablingOrDisabledTable(name));
assertFalse(zkt.isDisabledOrEnablingTable(name));
assertFalse(zkt.isTablePresent(name));
zkt.setDisablingTable(name);
assertTrue(zkt.isDisablingTable(name));
assertTrue(zkt.isDisablingOrDisabledTable(name));
assertFalse(zkt.getDisabledTables().contains(name));
assertTrue(zkt.isTablePresent(name));
zkt.setDisabledTable(name);
assertTrue(zkt.isDisabledTable(name));
assertTrue(zkt.isDisablingOrDisabledTable(name));
assertFalse(zkt.isDisablingTable(name));
assertTrue(zkt.getDisabledTables().contains(name));
assertTrue(zkt.isTablePresent(name));
zkt.setEnablingTable(name);
assertTrue(zkt.isEnablingTable(name));
assertTrue(zkt.isDisabledOrEnablingTable(name));
assertFalse(zkt.isDisabledTable(name));
assertFalse(zkt.getDisabledTables().contains(name));
assertTrue(zkt.isTablePresent(name));
zkt.setEnabledTable(name);
assertTrue(zkt.isEnabledTable(name));
assertFalse(zkt.isEnablingTable(name));
assertTrue(zkt.isTablePresent(name));
zkt.setDeletedTable(name);
assertFalse(zkt.isEnabledTable(name));
assertFalse(zkt.isDisablingTable(name));
assertFalse(zkt.isDisabledTable(name));
assertFalse(zkt.isEnablingTable(name));
assertFalse(zkt.isDisablingOrDisabledTable(name));
assertFalse(zkt.isDisabledOrEnablingTable(name));
assertFalse(zkt.isTablePresent(name));
}
}