HBASE-13252 Get rid of managed connections and connection caching
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
61f4ce6880
commit
578df6dfc0
|
@ -83,7 +83,7 @@ log4j.logger.org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher=INFO
|
|||
#log4j.logger.org.apache.hadoop.dfs=DEBUG
|
||||
# Set this class to log INFO only otherwise its OTT
|
||||
# Enable this to get detailed connection error/retry logging.
|
||||
# log4j.logger.org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation=TRACE
|
||||
# log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=TRACE
|
||||
|
||||
|
||||
# Uncomment this line to enable tracing on _every_ RPC call (this can be a lot of output)
|
||||
|
@ -91,4 +91,4 @@ log4j.logger.org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher=INFO
|
|||
|
||||
# Uncomment the below if you want to remove logging of client region caching'
|
||||
# and scan of hbase:meta messages
|
||||
# log4j.logger.org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation=INFO
|
||||
# log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=INFO
|
||||
|
|
|
@ -40,14 +40,12 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Consistency;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Mutation;
|
||||
import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
|
||||
|
@ -254,19 +252,6 @@ public class MetaTableAccessor {
|
|||
if (connection == null || connection.isClosed()) {
|
||||
throw new NullPointerException("No connection");
|
||||
}
|
||||
// If the passed in 'connection' is 'managed' -- i.e. every second test uses
|
||||
// a Table or an HBaseAdmin with managed connections -- then doing
|
||||
// connection.getTable will throw an exception saying you are NOT to use
|
||||
// managed connections getting tables. Leaving this as it is for now. Will
|
||||
// revisit when inclined to change all tests. User code probaby makes use of
|
||||
// managed connections too so don't change it till post hbase 1.0.
|
||||
//
|
||||
// There should still be a way to use this method with an unmanaged connection.
|
||||
if (connection instanceof ClusterConnection) {
|
||||
if (((ClusterConnection) connection).isManaged()) {
|
||||
throw new NeedUnmanagedConnectionException();
|
||||
}
|
||||
}
|
||||
return connection.getTable(TableName.META_TABLE_NAME);
|
||||
}
|
||||
|
||||
|
|
|
@ -744,7 +744,7 @@ class AsyncProcess {
|
|||
|
||||
private final Batch.Callback<CResult> callback;
|
||||
private final BatchErrors errors;
|
||||
private final ConnectionManager.ServerErrorTracker errorsByServer;
|
||||
private final ConnectionImplementation.ServerErrorTracker errorsByServer;
|
||||
private final ExecutorService pool;
|
||||
private final Set<MultiServerCallable<Row>> callsInProgress;
|
||||
|
||||
|
@ -1743,8 +1743,8 @@ class AsyncProcess {
|
|||
* We may benefit from connection-wide tracking of server errors.
|
||||
* @return ServerErrorTracker to use, null if there is no ServerErrorTracker on this connection
|
||||
*/
|
||||
protected ConnectionManager.ServerErrorTracker createServerErrorTracker() {
|
||||
return new ConnectionManager.ServerErrorTracker(
|
||||
protected ConnectionImplementation.ServerErrorTracker createServerErrorTracker() {
|
||||
return new ConnectionImplementation.ServerErrorTracker(
|
||||
this.serverTrackerTimeout, this.numTries);
|
||||
}
|
||||
|
||||
|
|
|
@ -287,12 +287,6 @@ public interface ClusterConnection extends HConnection {
|
|||
*/
|
||||
RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf);
|
||||
|
||||
/**
|
||||
*
|
||||
* @return true if this is a managed connection.
|
||||
*/
|
||||
boolean isManaged();
|
||||
|
||||
/**
|
||||
* @return the current statistics tracker associated with this connection
|
||||
*/
|
||||
|
|
|
@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
|
|||
* A convenience to override when customizing method implementations.
|
||||
*
|
||||
*
|
||||
* @see ConnectionUtils#createShortCircuitHConnection(HConnection, ServerName,
|
||||
* @see ConnectionUtils#createShortCircuitHConnection(Connection, ServerName,
|
||||
* AdminService.BlockingInterface, ClientService.BlockingInterface) for case where we make
|
||||
* Connections skip RPC if request is to local server.
|
||||
*/
|
||||
|
@ -455,11 +455,6 @@ abstract class ConnectionAdapter implements ClusterConnection {
|
|||
return wrappedConnection.getNewRpcRetryingCallerFactory(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isManaged() {
|
||||
return wrappedConnection.isManaged();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerStatisticTracker getStatisticsTracker() {
|
||||
return wrappedConnection.getStatisticsTracker();
|
||||
|
|
|
@ -214,15 +214,9 @@ public class ConnectionFactory {
|
|||
user = provider.getCurrent();
|
||||
}
|
||||
|
||||
return createConnection(conf, false, pool, user);
|
||||
}
|
||||
|
||||
static Connection createConnection(final Configuration conf, final boolean managed,
|
||||
final ExecutorService pool, final User user)
|
||||
throws IOException {
|
||||
String className = conf.get(HConnection.HBASE_CLIENT_CONNECTION_IMPL,
|
||||
ConnectionImplementation.class.getName());
|
||||
Class<?> clazz = null;
|
||||
Class<?> clazz;
|
||||
try {
|
||||
clazz = Class.forName(className);
|
||||
} catch (ClassNotFoundException e) {
|
||||
|
@ -232,9 +226,9 @@ public class ConnectionFactory {
|
|||
// Default HCM#HCI is not accessible; make it so before invoking.
|
||||
Constructor<?> constructor =
|
||||
clazz.getDeclaredConstructor(Configuration.class,
|
||||
boolean.class, ExecutorService.class, User.class);
|
||||
ExecutorService.class, User.class);
|
||||
constructor.setAccessible(true);
|
||||
return (Connection) constructor.newInstance(conf, managed, pool, user);
|
||||
return (Connection) constructor.newInstance(conf, pool, user);
|
||||
} catch (Exception e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
|
|
@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsBalancerEnabled
|
|||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.ExceptionUtil;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
@ -76,8 +77,10 @@ import java.lang.reflect.UndeclaredThrowableException;
|
|||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
@ -93,6 +96,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
justification="Access to the conncurrent hash map is under a lock so should be fine.")
|
||||
@InterfaceAudience.Private
|
||||
class ConnectionImplementation implements ClusterConnection, Closeable {
|
||||
public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
|
||||
static final Log LOG = LogFactory.getLog(ConnectionImplementation.class);
|
||||
private static final String CLIENT_NONCES_ENABLED_KEY = "hbase.client.nonces.enabled";
|
||||
|
||||
|
@ -152,9 +156,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
|
||||
private int refCount;
|
||||
|
||||
// indicates whether this connection's life cycle is managed (by us)
|
||||
private boolean managed;
|
||||
|
||||
private User user;
|
||||
|
||||
private RpcRetryingCallerFactory rpcCallerFactory;
|
||||
|
@ -170,27 +171,15 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
|
||||
private final ClientBackoffPolicy backoffPolicy;
|
||||
|
||||
ConnectionImplementation(Configuration conf, boolean managed) throws IOException {
|
||||
this(conf, managed, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* constructor
|
||||
* @param conf Configuration object
|
||||
* @param managed If true, does not do full shutdown on close; i.e. cleanup of connection
|
||||
* to zk and shutdown of all services; we just close down the resources this connection was
|
||||
* responsible for and decrement usage counters. It is up to the caller to do the full
|
||||
* cleanup. It is set when we want have connection sharing going on -- reuse of zk connection,
|
||||
* and cached region locations, established regionserver connections, etc. When connections
|
||||
* are shared, we have reference counting going on and will only do full cleanup when no more
|
||||
* users of an ConnectionImplementation instance.
|
||||
*/
|
||||
ConnectionImplementation(Configuration conf, boolean managed,
|
||||
ConnectionImplementation(Configuration conf,
|
||||
ExecutorService pool, User user) throws IOException {
|
||||
this(conf);
|
||||
this.user = user;
|
||||
this.batchPool = pool;
|
||||
this.managed = managed;
|
||||
this.registry = setupRegistry();
|
||||
retrieveClusterId();
|
||||
|
||||
|
@ -243,7 +232,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
nonceGenerator = new ConnectionManager.NoNonceGenerator();
|
||||
nonceGenerator = new NoNonceGenerator();
|
||||
}
|
||||
stats = ServerStatisticTracker.create(conf);
|
||||
this.asyncProcess = createAsyncProcess(this.conf);
|
||||
|
@ -262,12 +251,52 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
ClusterConnection conn, NonceGenerator cnm) {
|
||||
ConnectionImplementation connImpl = (ConnectionImplementation)conn;
|
||||
NonceGenerator ng = connImpl.getNonceGenerator();
|
||||
ConnectionManager.LOG.warn("Nonce generator is being replaced by test code for "
|
||||
LOG.warn("Nonce generator is being replaced by test code for "
|
||||
+ cnm.getClass().getName());
|
||||
nonceGenerator = cnm;
|
||||
return ng;
|
||||
}
|
||||
|
||||
/**
|
||||
* Look for an exception we know in the remote exception:
|
||||
* - hadoop.ipc wrapped exceptions
|
||||
* - nested exceptions
|
||||
*
|
||||
* Looks for: RegionMovedException / RegionOpeningException / RegionTooBusyException
|
||||
* @return null if we didn't find the exception, the exception otherwise.
|
||||
*/
|
||||
public static Throwable findException(Object exception) {
|
||||
if (exception == null || !(exception instanceof Throwable)) {
|
||||
return null;
|
||||
}
|
||||
Throwable cur = (Throwable) exception;
|
||||
while (cur != null) {
|
||||
if (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
|
||||
|| cur instanceof RegionTooBusyException) {
|
||||
return cur;
|
||||
}
|
||||
if (cur instanceof RemoteException) {
|
||||
RemoteException re = (RemoteException) cur;
|
||||
cur = re.unwrapRemoteException(
|
||||
RegionOpeningException.class, RegionMovedException.class,
|
||||
RegionTooBusyException.class);
|
||||
if (cur == null) {
|
||||
cur = re.unwrapRemoteException();
|
||||
}
|
||||
// unwrapRemoteException can return the exception given as a parameter when it cannot
|
||||
// unwrap it. In this case, there is no need to look further
|
||||
// noinspection ObjectEquality
|
||||
if (cur == re) {
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
cur = cur.getCause();
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HTableInterface getTable(String tableName) throws IOException {
|
||||
return getTable(TableName.valueOf(tableName));
|
||||
|
@ -295,9 +324,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
|
||||
@Override
|
||||
public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
|
||||
if (managed) {
|
||||
throw new NeedUnmanagedConnectionException();
|
||||
}
|
||||
return new HTable(tableName, this, tableConfig, rpcCallerFactory, rpcControllerFactory, pool);
|
||||
}
|
||||
|
||||
|
@ -330,9 +356,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
|
||||
@Override
|
||||
public Admin getAdmin() throws IOException {
|
||||
if (managed) {
|
||||
throw new NeedUnmanagedConnectionException();
|
||||
}
|
||||
return new HBaseAdmin(this);
|
||||
}
|
||||
|
||||
|
@ -543,14 +566,15 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
@Override
|
||||
public boolean isTableAvailable(final TableName tableName, @Nullable final byte[][] splitKeys)
|
||||
throws IOException {
|
||||
if (this.closed) throw new IOException(toString() + " closed");
|
||||
try {
|
||||
if (!isTableEnabled(tableName)) {
|
||||
LOG.debug("Table " + tableName + " not enabled");
|
||||
return false;
|
||||
}
|
||||
ClusterConnection connection = ConnectionManager.getConnectionInternal(getConfiguration());
|
||||
List<Pair<HRegionInfo, ServerName>> locations = MetaTableAccessor
|
||||
.getTableRegionsAndLocations(connection, tableName, true);
|
||||
List<Pair<HRegionInfo, ServerName>> locations =
|
||||
MetaTableAccessor.getTableRegionsAndLocations(this, tableName, true);
|
||||
|
||||
int notDeployed = 0;
|
||||
int regionCount = 0;
|
||||
for (Pair<HRegionInfo, ServerName> pair : locations) {
|
||||
|
@ -1007,6 +1031,99 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
/** Dummy nonce generator for disabled nonces. */
|
||||
static class NoNonceGenerator implements NonceGenerator {
|
||||
@Override
|
||||
public long getNonceGroup() {
|
||||
return HConstants.NO_NONCE;
|
||||
}
|
||||
@Override
|
||||
public long newNonce() {
|
||||
return HConstants.NO_NONCE;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The record of errors for servers.
|
||||
*/
|
||||
static class ServerErrorTracker {
|
||||
// We need a concurrent map here, as we could have multiple threads updating it in parallel.
|
||||
private final ConcurrentMap<ServerName, ServerErrors> errorsByServer =
|
||||
new ConcurrentHashMap<ServerName, ServerErrors>();
|
||||
private final long canRetryUntil;
|
||||
private final int maxRetries;
|
||||
private final long startTrackingTime;
|
||||
|
||||
public ServerErrorTracker(long timeout, int maxRetries) {
|
||||
this.maxRetries = maxRetries;
|
||||
this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
|
||||
this.startTrackingTime = new Date().getTime();
|
||||
}
|
||||
|
||||
/**
|
||||
* We stop to retry when we have exhausted BOTH the number of retries and the time allocated.
|
||||
*/
|
||||
boolean canRetryMore(int numRetry) {
|
||||
// If there is a single try we must not take into account the time.
|
||||
return numRetry < maxRetries || (maxRetries > 1 &&
|
||||
EnvironmentEdgeManager.currentTime() < this.canRetryUntil);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the back-off time for a retrying request to a particular server.
|
||||
*
|
||||
* @param server The server in question.
|
||||
* @param basePause The default hci pause.
|
||||
* @return The time to wait before sending next request.
|
||||
*/
|
||||
long calculateBackoffTime(ServerName server, long basePause) {
|
||||
long result;
|
||||
ServerErrors errorStats = errorsByServer.get(server);
|
||||
if (errorStats != null) {
|
||||
result = ConnectionUtils.getPauseTime(basePause, errorStats.getCount());
|
||||
} else {
|
||||
result = 0; // yes, if the server is not in our list we don't wait before retrying.
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reports that there was an error on the server to do whatever bean-counting necessary.
|
||||
*
|
||||
* @param server The server in question.
|
||||
*/
|
||||
void reportServerError(ServerName server) {
|
||||
ServerErrors errors = errorsByServer.get(server);
|
||||
if (errors != null) {
|
||||
errors.addError();
|
||||
} else {
|
||||
errors = errorsByServer.putIfAbsent(server, new ServerErrors());
|
||||
if (errors != null){
|
||||
errors.addError();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
long getStartTrackingTime() {
|
||||
return startTrackingTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* The record of errors for a server.
|
||||
*/
|
||||
private static class ServerErrors {
|
||||
private final AtomicInteger retries = new AtomicInteger(0);
|
||||
|
||||
public int getCount() {
|
||||
return retries.get();
|
||||
}
|
||||
|
||||
public void addError() {
|
||||
retries.incrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Makes a client-side stub for master services. Sub-class to specialize.
|
||||
* Depends on hosting class so not static. Exists so we avoid duplicating a bunch of code
|
||||
|
@ -1710,7 +1827,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
}
|
||||
|
||||
HRegionInfo regionInfo = oldLocation.getRegionInfo();
|
||||
Throwable cause = ConnectionManager.findException(exception);
|
||||
Throwable cause = findException(exception);
|
||||
if (cause != null) {
|
||||
if (cause instanceof RegionTooBusyException || cause instanceof RegionOpeningException) {
|
||||
// We know that the region is still on this region server
|
||||
|
@ -1936,16 +2053,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return if this client has no reference
|
||||
*
|
||||
* @return true if this client has no reference; false otherwise
|
||||
*/
|
||||
boolean isZeroReference() {
|
||||
return refCount == 0;
|
||||
}
|
||||
|
||||
void internalClose() {
|
||||
@Override
|
||||
public void close() {
|
||||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
|
@ -1962,19 +2071,6 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (managed) {
|
||||
if (aborted) {
|
||||
ConnectionManager.deleteStaleConnection(this);
|
||||
} else {
|
||||
ConnectionManager.deleteConnection(this, false);
|
||||
}
|
||||
} else {
|
||||
internalClose();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the connection for good, regardless of what the current value of
|
||||
* {@link #refCount} is. Ideally, {@link #refCount} should be zero at this
|
||||
|
@ -2125,8 +2221,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
|
||||
@Override
|
||||
public TableState getTableState(TableName tableName) throws IOException {
|
||||
ClusterConnection conn = ConnectionManager.getConnectionInternal(getConfiguration());
|
||||
TableState tableState = MetaTableAccessor.getTableState(conn, tableName);
|
||||
if (this.closed) throw new IOException(toString() + " closed");
|
||||
|
||||
TableState tableState = MetaTableAccessor.getTableState(this, tableName);
|
||||
if (tableState == null)
|
||||
throw new TableNotFoundException(tableName);
|
||||
return tableState;
|
||||
|
@ -2137,9 +2234,4 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
|
|||
return RpcRetryingCallerFactory
|
||||
.instantiate(conf, this.interceptor, this.getStatisticsTracker());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isManaged() {
|
||||
return managed;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,465 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Date;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.RegionTooBusyException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
|
||||
import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.ExceptionUtil;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
||||
/**
|
||||
* An internal, non-instantiable class that manages creation of {@link HConnection}s.
|
||||
*/
|
||||
@SuppressWarnings("serial")
|
||||
@InterfaceAudience.Private
|
||||
// NOTE: DO NOT make this class public. It was made package-private on purpose.
|
||||
final class ConnectionManager {
|
||||
static final Log LOG = LogFactory.getLog(ConnectionManager.class);
|
||||
|
||||
public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
|
||||
|
||||
// An LRU Map of HConnectionKey -> HConnection (TableServer). All
|
||||
// access must be synchronized. This map is not private because tests
|
||||
// need to be able to tinker with it.
|
||||
static final Map<HConnectionKey, ConnectionImplementation> CONNECTION_INSTANCES;
|
||||
|
||||
public static final int MAX_CACHED_CONNECTION_INSTANCES;
|
||||
|
||||
static {
|
||||
// We set instances to one more than the value specified for {@link
|
||||
// HConstants#ZOOKEEPER_MAX_CLIENT_CNXNS}. By default, the zk default max
|
||||
// connections to the ensemble from the one client is 30, so in that case we
|
||||
// should run into zk issues before the LRU hit this value of 31.
|
||||
MAX_CACHED_CONNECTION_INSTANCES = HBaseConfiguration.create().getInt(
|
||||
HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS, HConstants.DEFAULT_ZOOKEPER_MAX_CLIENT_CNXNS) + 1;
|
||||
CONNECTION_INSTANCES = new LinkedHashMap<HConnectionKey, ConnectionImplementation>(
|
||||
(int) (MAX_CACHED_CONNECTION_INSTANCES / 0.75F) + 1, 0.75F, true) {
|
||||
@Override
|
||||
protected boolean removeEldestEntry(
|
||||
Map.Entry<HConnectionKey, ConnectionImplementation> eldest) {
|
||||
return size() > MAX_CACHED_CONNECTION_INSTANCES;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/** Dummy nonce generator for disabled nonces. */
|
||||
static class NoNonceGenerator implements NonceGenerator {
|
||||
@Override
|
||||
public long getNonceGroup() {
|
||||
return HConstants.NO_NONCE;
|
||||
}
|
||||
@Override
|
||||
public long newNonce() {
|
||||
return HConstants.NO_NONCE;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Non-instantiable.
|
||||
*/
|
||||
private ConnectionManager() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the connection that goes with the passed <code>conf</code> configuration instance.
|
||||
* If no current connection exists, method creates a new connection and keys it using
|
||||
* connection-specific properties from the passed {@link Configuration}; see
|
||||
* {@link HConnectionKey}.
|
||||
* @param conf configuration
|
||||
* @return HConnection object for <code>conf</code>
|
||||
* @throws ZooKeeperConnectionException
|
||||
* @deprecated connection caching is going away.
|
||||
*/
|
||||
@Deprecated
|
||||
public static HConnection getConnection(final Configuration conf) throws IOException {
|
||||
return getConnectionInternal(conf);
|
||||
}
|
||||
|
||||
|
||||
static ClusterConnection getConnectionInternal(final Configuration conf)
|
||||
throws IOException {
|
||||
HConnectionKey connectionKey = new HConnectionKey(conf);
|
||||
synchronized (CONNECTION_INSTANCES) {
|
||||
ConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
|
||||
if (connection == null) {
|
||||
connection = (ConnectionImplementation) ConnectionFactory.createConnection(conf);
|
||||
CONNECTION_INSTANCES.put(connectionKey, connection);
|
||||
} else if (connection.isClosed()) {
|
||||
ConnectionManager.deleteConnection(connectionKey, true);
|
||||
connection = (ConnectionImplementation) ConnectionFactory.createConnection(conf);
|
||||
CONNECTION_INSTANCES.put(connectionKey, connection);
|
||||
}
|
||||
connection.incCount();
|
||||
return connection;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new HConnection instance using the passed <code>conf</code> instance.
|
||||
* <p>Note: This bypasses the usual HConnection life cycle management done by
|
||||
* {@link #getConnection(Configuration)}. The caller is responsible for
|
||||
* calling {@link HConnection#close()} on the returned connection instance.
|
||||
*
|
||||
* This is the recommended way to create HConnections.
|
||||
* {@code
|
||||
* HConnection connection = ConnectionManagerInternal.createConnection(conf);
|
||||
* HTableInterface table = connection.getTable("mytable");
|
||||
* table.get(...);
|
||||
* ...
|
||||
* table.close();
|
||||
* connection.close();
|
||||
* }
|
||||
*
|
||||
* @param conf configuration
|
||||
* @return HConnection object for <code>conf</code>
|
||||
* @throws ZooKeeperConnectionException
|
||||
*/
|
||||
public static HConnection createConnection(Configuration conf) throws IOException {
|
||||
return createConnectionInternal(conf);
|
||||
}
|
||||
|
||||
static ClusterConnection createConnectionInternal(Configuration conf) throws IOException {
|
||||
UserProvider provider = UserProvider.instantiate(conf);
|
||||
return createConnection(conf, false, null, provider.getCurrent());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new HConnection instance using the passed <code>conf</code> instance.
|
||||
* <p>Note: This bypasses the usual HConnection life cycle management done by
|
||||
* {@link #getConnection(Configuration)}. The caller is responsible for
|
||||
* calling {@link HConnection#close()} on the returned connection instance.
|
||||
* This is the recommended way to create HConnections.
|
||||
* {@code
|
||||
* ExecutorService pool = ...;
|
||||
* HConnection connection = ConnectionManager.createConnection(conf, pool);
|
||||
* HTableInterface table = connection.getTable("mytable");
|
||||
* table.get(...);
|
||||
* ...
|
||||
* table.close();
|
||||
* connection.close();
|
||||
* }
|
||||
* @param conf configuration
|
||||
* @param pool the thread pool to use for batch operation in HTables used via this HConnection
|
||||
* @return HConnection object for <code>conf</code>
|
||||
* @throws ZooKeeperConnectionException
|
||||
*/
|
||||
public static HConnection createConnection(Configuration conf, ExecutorService pool)
|
||||
throws IOException {
|
||||
UserProvider provider = UserProvider.instantiate(conf);
|
||||
return createConnection(conf, false, pool, provider.getCurrent());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new HConnection instance using the passed <code>conf</code> instance.
|
||||
* <p>Note: This bypasses the usual HConnection life cycle management done by
|
||||
* {@link #getConnection(Configuration)}. The caller is responsible for
|
||||
* calling {@link HConnection#close()} on the returned connection instance.
|
||||
* This is the recommended way to create HConnections.
|
||||
* {@code
|
||||
* ExecutorService pool = ...;
|
||||
* HConnection connection = ConnectionManager.createConnection(conf, pool);
|
||||
* HTableInterface table = connection.getTable("mytable");
|
||||
* table.get(...);
|
||||
* ...
|
||||
* table.close();
|
||||
* connection.close();
|
||||
* }
|
||||
* @param conf configuration
|
||||
* @param user the user the connection is for
|
||||
* @return HConnection object for <code>conf</code>
|
||||
* @throws ZooKeeperConnectionException
|
||||
*/
|
||||
public static HConnection createConnection(Configuration conf, User user)
|
||||
throws IOException {
|
||||
return createConnection(conf, false, null, user);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new HConnection instance using the passed <code>conf</code> instance.
|
||||
* <p>Note: This bypasses the usual HConnection life cycle management done by
|
||||
* {@link #getConnection(Configuration)}. The caller is responsible for
|
||||
* calling {@link HConnection#close()} on the returned connection instance.
|
||||
* This is the recommended way to create HConnections.
|
||||
* {@code
|
||||
* ExecutorService pool = ...;
|
||||
* HConnection connection = ConnectionManager.createConnection(conf, pool);
|
||||
* HTableInterface table = connection.getTable("mytable");
|
||||
* table.get(...);
|
||||
* ...
|
||||
* table.close();
|
||||
* connection.close();
|
||||
* }
|
||||
* @param conf configuration
|
||||
* @param pool the thread pool to use for batch operation in HTables used via this HConnection
|
||||
* @param user the user the connection is for
|
||||
* @return HConnection object for <code>conf</code>
|
||||
* @throws ZooKeeperConnectionException
|
||||
*/
|
||||
public static HConnection createConnection(Configuration conf, ExecutorService pool, User user)
|
||||
throws IOException {
|
||||
return createConnection(conf, false, pool, user);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated instead use one of the {@link ConnectionFactory#createConnection()} methods.
|
||||
*/
|
||||
@Deprecated
|
||||
static HConnection createConnection(final Configuration conf, final boolean managed)
|
||||
throws IOException {
|
||||
UserProvider provider = UserProvider.instantiate(conf);
|
||||
return createConnection(conf, managed, null, provider.getCurrent());
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated instead use one of the {@link ConnectionFactory#createConnection()} methods.
|
||||
*/
|
||||
@Deprecated
|
||||
static ClusterConnection createConnection(final Configuration conf, final boolean managed,
|
||||
final ExecutorService pool, final User user)
|
||||
throws IOException {
|
||||
return (ClusterConnection) ConnectionFactory.createConnection(conf, managed, pool, user);
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup a known stale connection.
|
||||
* This will then close connection to the zookeeper ensemble and let go of all resources.
|
||||
*
|
||||
* @param connection
|
||||
* @deprecated connection caching is going away.
|
||||
*/
|
||||
@Deprecated
|
||||
public static void deleteStaleConnection(HConnection connection) {
|
||||
deleteConnection(connection, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated connection caching is going away.
|
||||
*/
|
||||
@Deprecated
|
||||
static void deleteConnection(HConnection connection, boolean staleConnection) {
|
||||
synchronized (CONNECTION_INSTANCES) {
|
||||
for (Entry<HConnectionKey, ConnectionImplementation> e: CONNECTION_INSTANCES.entrySet()) {
|
||||
if (e.getValue() == connection) {
|
||||
deleteConnection(e.getKey(), staleConnection);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated connection caching is going away.
|
||||
˙ */
|
||||
@Deprecated
|
||||
private static void deleteConnection(HConnectionKey connectionKey, boolean staleConnection) {
|
||||
synchronized (CONNECTION_INSTANCES) {
|
||||
ConnectionImplementation connection = CONNECTION_INSTANCES.get(connectionKey);
|
||||
if (connection != null) {
|
||||
connection.decCount();
|
||||
if (connection.isZeroReference() || staleConnection) {
|
||||
CONNECTION_INSTANCES.remove(connectionKey);
|
||||
connection.internalClose();
|
||||
}
|
||||
} else {
|
||||
LOG.error("Connection not found in the list, can't delete it "+
|
||||
"(connection key=" + connectionKey + "). May be the key was modified?", new Exception());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This convenience method invokes the given {@link HConnectable#connect}
|
||||
* implementation using a {@link HConnection} instance that lasts just for the
|
||||
* duration of the invocation.
|
||||
*
|
||||
* @param <T> the return type of the connect method
|
||||
* @param connectable the {@link HConnectable} instance
|
||||
* @return the value returned by the connect method
|
||||
* @throws IOException
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public static <T> T execute(HConnectable<T> connectable) throws IOException {
|
||||
if (connectable == null || connectable.conf == null) {
|
||||
return null;
|
||||
}
|
||||
Configuration conf = connectable.conf;
|
||||
HConnection connection = getConnection(conf);
|
||||
boolean connectSucceeded = false;
|
||||
try {
|
||||
T returnValue = connectable.connect(connection);
|
||||
connectSucceeded = true;
|
||||
return returnValue;
|
||||
} finally {
|
||||
try {
|
||||
connection.close();
|
||||
} catch (Exception e) {
|
||||
ExceptionUtil.rethrowIfInterrupt(e);
|
||||
if (connectSucceeded) {
|
||||
throw new IOException("The connection to " + connection
|
||||
+ " could not be deleted.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The record of errors for servers.
|
||||
*/
|
||||
static class ServerErrorTracker {
|
||||
// We need a concurrent map here, as we could have multiple threads updating it in parallel.
|
||||
private final ConcurrentMap<ServerName, ServerErrors> errorsByServer =
|
||||
new ConcurrentHashMap<ServerName, ServerErrors>();
|
||||
private final long canRetryUntil;
|
||||
private final int maxRetries;
|
||||
private final long startTrackingTime;
|
||||
|
||||
public ServerErrorTracker(long timeout, int maxRetries) {
|
||||
this.maxRetries = maxRetries;
|
||||
this.canRetryUntil = EnvironmentEdgeManager.currentTime() + timeout;
|
||||
this.startTrackingTime = new Date().getTime();
|
||||
}
|
||||
|
||||
/**
|
||||
* We stop to retry when we have exhausted BOTH the number of retries and the time allocated.
|
||||
*/
|
||||
boolean canRetryMore(int numRetry) {
|
||||
// If there is a single try we must not take into account the time.
|
||||
return numRetry < maxRetries || (maxRetries > 1 &&
|
||||
EnvironmentEdgeManager.currentTime() < this.canRetryUntil);
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculates the back-off time for a retrying request to a particular server.
|
||||
*
|
||||
* @param server The server in question.
|
||||
* @param basePause The default hci pause.
|
||||
* @return The time to wait before sending next request.
|
||||
*/
|
||||
long calculateBackoffTime(ServerName server, long basePause) {
|
||||
long result;
|
||||
ServerErrors errorStats = errorsByServer.get(server);
|
||||
if (errorStats != null) {
|
||||
result = ConnectionUtils.getPauseTime(basePause, errorStats.getCount());
|
||||
} else {
|
||||
result = 0; // yes, if the server is not in our list we don't wait before retrying.
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reports that there was an error on the server to do whatever bean-counting necessary.
|
||||
*
|
||||
* @param server The server in question.
|
||||
*/
|
||||
void reportServerError(ServerName server) {
|
||||
ServerErrors errors = errorsByServer.get(server);
|
||||
if (errors != null) {
|
||||
errors.addError();
|
||||
} else {
|
||||
errors = errorsByServer.putIfAbsent(server, new ServerErrors());
|
||||
if (errors != null){
|
||||
errors.addError();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
long getStartTrackingTime() {
|
||||
return startTrackingTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* The record of errors for a server.
|
||||
*/
|
||||
private static class ServerErrors {
|
||||
private final AtomicInteger retries = new AtomicInteger(0);
|
||||
|
||||
public int getCount() {
|
||||
return retries.get();
|
||||
}
|
||||
|
||||
public void addError() {
|
||||
retries.incrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Look for an exception we know in the remote exception:
|
||||
* - hadoop.ipc wrapped exceptions
|
||||
* - nested exceptions
|
||||
*
|
||||
* Looks for: RegionMovedException / RegionOpeningException / RegionTooBusyException
|
||||
* @return null if we didn't find the exception, the exception otherwise.
|
||||
*/
|
||||
public static Throwable findException(Object exception) {
|
||||
if (exception == null || !(exception instanceof Throwable)) {
|
||||
return null;
|
||||
}
|
||||
Throwable cur = (Throwable) exception;
|
||||
while (cur != null) {
|
||||
if (cur instanceof RegionMovedException || cur instanceof RegionOpeningException
|
||||
|| cur instanceof RegionTooBusyException) {
|
||||
return cur;
|
||||
}
|
||||
if (cur instanceof RemoteException) {
|
||||
RemoteException re = (RemoteException) cur;
|
||||
cur = re.unwrapRemoteException(
|
||||
RegionOpeningException.class, RegionMovedException.class,
|
||||
RegionTooBusyException.class);
|
||||
if (cur == null) {
|
||||
cur = re.unwrapRemoteException();
|
||||
}
|
||||
// unwrapRemoteException can return the exception given as a parameter when it cannot
|
||||
// unwrap it. In this case, there is no need to look further
|
||||
// noinspection ObjectEquality
|
||||
if (cur == re) {
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
cur = cur.getCause();
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -147,9 +147,9 @@ public final class ConnectionUtils {
|
|||
* region re-lookups.
|
||||
*/
|
||||
static class MasterlessConnection extends ConnectionImplementation {
|
||||
MasterlessConnection(Configuration conf, boolean managed,
|
||||
MasterlessConnection(Configuration conf,
|
||||
ExecutorService pool, User user) throws IOException {
|
||||
super(conf, managed, pool, user);
|
||||
super(conf, pool, user);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -204,9 +204,7 @@ public class HBaseAdmin implements Admin {
|
|||
@Deprecated
|
||||
public HBaseAdmin(Configuration c)
|
||||
throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
|
||||
// Will not leak connections, as the new implementation of the constructor
|
||||
// does not throw exceptions anymore.
|
||||
this(ConnectionManager.getConnectionInternal(new Configuration(c)));
|
||||
this(ConnectionFactory.createConnection(new Configuration(c)));
|
||||
this.cleanupConnectionOnClose = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -46,14 +46,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
|
|||
*
|
||||
* <p>HConnections are used by {@link HTable} mostly but also by
|
||||
* {@link HBaseAdmin}, and {@link org.apache.hadoop.hbase.zookeeper.MetaTableLocator}.
|
||||
* HConnection instances can be shared. Sharing
|
||||
* is usually what you want because rather than each HConnection instance
|
||||
* having to do its own discovery of regions out on the cluster, instead, all
|
||||
* clients get to share the one cache of locations. {@link ConnectionManager} does the
|
||||
* sharing for you if you go by it getting connections. Sharing makes cleanup of
|
||||
* HConnections awkward. See {@link ConnectionFactory} for cleanup discussion.
|
||||
*
|
||||
* @see ConnectionManager
|
||||
* @see ConnectionFactory
|
||||
* @deprecated in favor of {@link Connection} and {@link ConnectionFactory}
|
||||
*/
|
||||
|
@ -79,7 +72,6 @@ public interface HConnection extends Connection {
|
|||
* be created for each using thread.
|
||||
* This is a lightweight operation, pooling or caching of the returned HTableInterface
|
||||
* is neither required nor desired.
|
||||
* Note that the HConnection needs to be unmanaged
|
||||
* (created with {@link ConnectionFactory#createConnection(Configuration)}).
|
||||
* @param tableName
|
||||
* @return an HTable to use for interactions with this table
|
||||
|
@ -92,7 +84,6 @@ public interface HConnection extends Connection {
|
|||
* be created for each using thread.
|
||||
* This is a lightweight operation, pooling or caching of the returned HTableInterface
|
||||
* is neither required nor desired.
|
||||
* Note that the HConnection needs to be unmanaged
|
||||
* (created with {@link ConnectionFactory#createConnection(Configuration)}).
|
||||
* @param tableName
|
||||
* @return an HTable to use for interactions with this table
|
||||
|
@ -105,7 +96,6 @@ public interface HConnection extends Connection {
|
|||
* be created for each using thread.
|
||||
* This is a lightweight operation, pooling or caching of the returned HTableInterface
|
||||
* is neither required nor desired.
|
||||
* Note that the HConnection needs to be unmanaged
|
||||
* (created with {@link ConnectionFactory#createConnection(Configuration)}).
|
||||
* @param tableName
|
||||
* @return an HTable to use for interactions with this table
|
||||
|
@ -119,7 +109,6 @@ public interface HConnection extends Connection {
|
|||
* be created for each using thread.
|
||||
* This is a lightweight operation, pooling or caching of the returned HTableInterface
|
||||
* is neither required nor desired.
|
||||
* Note that the HConnection needs to be unmanaged
|
||||
* (created with {@link ConnectionFactory#createConnection(Configuration)}).
|
||||
* @param tableName
|
||||
* @param pool The thread pool to use for batch operations, null to use a default pool.
|
||||
|
@ -133,7 +122,6 @@ public interface HConnection extends Connection {
|
|||
* be created for each using thread.
|
||||
* This is a lightweight operation, pooling or caching of the returned HTableInterface
|
||||
* is neither required nor desired.
|
||||
* Note that the HConnection needs to be unmanaged
|
||||
* (created with {@link ConnectionFactory#createConnection(Configuration)}).
|
||||
* @param tableName
|
||||
* @param pool The thread pool to use for batch operations, null to use a default pool.
|
||||
|
@ -147,9 +135,8 @@ public interface HConnection extends Connection {
|
|||
* be created for each using thread.
|
||||
* This is a lightweight operation, pooling or caching of the returned HTableInterface
|
||||
* is neither required nor desired.
|
||||
* Note that the HConnection needs to be unmanaged
|
||||
* (created with {@link ConnectionFactory#createConnection(Configuration)}).
|
||||
* @param tableName
|
||||
* @param tableName table to get interface for
|
||||
* @param pool The thread pool to use for batch operations, null to use a default pool.
|
||||
* @return an HTable to use for interactions with this table
|
||||
*/
|
||||
|
@ -162,10 +149,6 @@ public interface HConnection extends Connection {
|
|||
*
|
||||
* This is a lightweight operation. Pooling or caching of the returned RegionLocator is neither
|
||||
* required nor desired.
|
||||
*
|
||||
* RegionLocator needs to be unmanaged
|
||||
* (created with {@link ConnectionFactory#createConnection(Configuration)}).
|
||||
*
|
||||
* @param tableName Name of the table who's region is to be examined
|
||||
* @return A RegionLocator instance
|
||||
*/
|
||||
|
@ -176,7 +159,7 @@ public interface HConnection extends Connection {
|
|||
* Retrieve an Admin implementation to administer an HBase cluster.
|
||||
* The returned Admin is not guaranteed to be thread-safe. A new instance should be created for
|
||||
* each using thread. This is a lightweight operation. Pooling or caching of the returned
|
||||
* Admin is not recommended. Note that HConnection needs to be unmanaged
|
||||
* Admin is not recommended.
|
||||
*
|
||||
* @return an Admin instance for cluster administration
|
||||
*/
|
||||
|
|
|
@ -1,146 +0,0 @@
|
|||
/**
|
||||
* Copyright The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
|
||||
/**
|
||||
* Denotes a unique key to an {@link HConnection} instance.
|
||||
*
|
||||
* In essence, this class captures the properties in {@link Configuration}
|
||||
* that may be used in the process of establishing a connection. In light of
|
||||
* that, if any new such properties are introduced into the mix, they must be
|
||||
* added to the {@link HConnectionKey#properties} list.
|
||||
*
|
||||
*/
|
||||
class HConnectionKey {
|
||||
final static String[] CONNECTION_PROPERTIES = new String[] {
|
||||
HConstants.ZOOKEEPER_QUORUM, HConstants.ZOOKEEPER_ZNODE_PARENT,
|
||||
HConstants.ZOOKEEPER_CLIENT_PORT,
|
||||
HConstants.ZOOKEEPER_RECOVERABLE_WAITTIME,
|
||||
HConstants.HBASE_CLIENT_PAUSE, HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||
HConstants.HBASE_RPC_TIMEOUT_KEY,
|
||||
HConstants.HBASE_META_SCANNER_CACHING,
|
||||
HConstants.HBASE_CLIENT_INSTANCE_ID,
|
||||
HConstants.RPC_CODEC_CONF_KEY,
|
||||
HConstants.USE_META_REPLICAS,
|
||||
RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY};
|
||||
|
||||
private Map<String, String> properties;
|
||||
private String username;
|
||||
|
||||
HConnectionKey(Configuration conf) {
|
||||
Map<String, String> m = new HashMap<String, String>();
|
||||
if (conf != null) {
|
||||
for (String property : CONNECTION_PROPERTIES) {
|
||||
String value = conf.get(property);
|
||||
if (value != null) {
|
||||
m.put(property, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
this.properties = Collections.unmodifiableMap(m);
|
||||
|
||||
try {
|
||||
UserProvider provider = UserProvider.instantiate(conf);
|
||||
User currentUser = provider.getCurrent();
|
||||
if (currentUser != null) {
|
||||
username = currentUser.getName();
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
ConnectionManager.LOG.warn(
|
||||
"Error obtaining current user, skipping username in HConnectionKey", ioe);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
final int prime = 31;
|
||||
int result = 1;
|
||||
if (username != null) {
|
||||
result = username.hashCode();
|
||||
}
|
||||
for (String property : CONNECTION_PROPERTIES) {
|
||||
String value = properties.get(property);
|
||||
if (value != null) {
|
||||
result = prime * result + value.hashCode();
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@edu.umd.cs.findbugs.annotations.SuppressWarnings (value="ES_COMPARING_STRINGS_WITH_EQ",
|
||||
justification="Optimization")
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj)
|
||||
return true;
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
HConnectionKey that = (HConnectionKey) obj;
|
||||
if (this.username != null && !this.username.equals(that.username)) {
|
||||
return false;
|
||||
} else if (this.username == null && that.username != null) {
|
||||
return false;
|
||||
}
|
||||
if (this.properties == null) {
|
||||
if (that.properties != null) {
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
if (that.properties == null) {
|
||||
return false;
|
||||
}
|
||||
for (String property : CONNECTION_PROPERTIES) {
|
||||
String thisValue = this.properties.get(property);
|
||||
String thatValue = that.properties.get(property);
|
||||
//noinspection StringEquality
|
||||
if (thisValue == thatValue) {
|
||||
continue;
|
||||
}
|
||||
if (thisValue == null || !thisValue.equals(thatValue)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "HConnectionKey{" +
|
||||
"properties=" + properties +
|
||||
", username='" + username + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -319,12 +319,9 @@ public class HTable implements HTableInterface {
|
|||
@Deprecated
|
||||
public static boolean isTableEnabled(Configuration conf,
|
||||
final TableName tableName) throws IOException {
|
||||
return ConnectionManager.execute(new HConnectable<Boolean>(conf) {
|
||||
@Override
|
||||
public Boolean connect(HConnection connection) throws IOException {
|
||||
return connection.isTableEnabled(tableName);
|
||||
try(Connection conn = ConnectionFactory.createConnection(conf)) {
|
||||
return conn.getAdmin().isTableEnabled(tableName);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
|
||||
/**
|
||||
* Container for Actions (i.e. Get, Delete, or Put), which are grouped by
|
||||
* regionName. Intended to be used with ConnectionManager.processBatch()
|
||||
* regionName. Intended to be used with {@link AsyncProcess}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class MultiAction<R> {
|
||||
|
|
|
@ -1,31 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Used for internal signalling that a Connection implementation needs to be
|
||||
* user-managed to be used for particular request types.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class NeedUnmanagedConnectionException extends DoNotRetryIOException {
|
||||
private static final long serialVersionUID = 1876775844L;
|
||||
|
||||
public NeedUnmanagedConnectionException() {
|
||||
super("The connection has to be unmanaged.");
|
||||
}
|
||||
}
|
|
@ -129,8 +129,6 @@ public class ReversedScannerCallable extends ScannerCallable {
|
|||
}
|
||||
|
||||
// check how often we retry.
|
||||
// ConnectionManager will call instantiateServer with reload==true
|
||||
// if and only if for retries.
|
||||
if (reload && this.scanMetrics != null) {
|
||||
this.scanMetrics.countOfRPCRetries.incrementAndGet();
|
||||
if (isRegionServerRemote) {
|
||||
|
|
|
@ -155,8 +155,6 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
|
|||
}
|
||||
|
||||
// check how often we retry.
|
||||
// ConnectionManager will call instantiateServer with reload==true
|
||||
// if and only if for retries.
|
||||
if (reload && this.scanMetrics != null) {
|
||||
this.scanMetrics.countOfRPCRetries.incrementAndGet();
|
||||
if (isRegionServerRemote) {
|
||||
|
|
|
@ -798,7 +798,7 @@ public class TestAsyncProcess {
|
|||
ClusterConnection conn = new MyConnectionImpl(configuration);
|
||||
BufferedMutatorImpl mutator =
|
||||
new BufferedMutatorImpl(conn, null, null, new BufferedMutatorParams(DUMMY_TABLE));
|
||||
configuration.setBoolean(ConnectionManager.RETRIES_BY_SERVER_KEY, true);
|
||||
configuration.setBoolean(ConnectionImplementation.RETRIES_BY_SERVER_KEY, true);
|
||||
|
||||
MyAsyncProcess ap = new MyAsyncProcess(conn, configuration, true);
|
||||
mutator.ap = ap;
|
||||
|
|
|
@ -258,38 +258,6 @@ public class TestClientNoCluster extends Configured implements Tool {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Override to shutdown going to zookeeper for cluster id and meta location.
|
||||
*/
|
||||
static class ScanOpenNextThenExceptionThenRecoverConnection
|
||||
extends ConnectionImplementation {
|
||||
final ClientService.BlockingInterface stub;
|
||||
|
||||
ScanOpenNextThenExceptionThenRecoverConnection(Configuration conf,
|
||||
boolean managed, ExecutorService pool) throws IOException {
|
||||
super(conf, managed);
|
||||
// Mock up my stub so open scanner returns a scanner id and then on next, we throw
|
||||
// exceptions for three times and then after that, we return no more to scan.
|
||||
this.stub = Mockito.mock(ClientService.BlockingInterface.class);
|
||||
long sid = 12345L;
|
||||
try {
|
||||
Mockito.when(stub.scan((RpcController)Mockito.any(),
|
||||
(ClientProtos.ScanRequest)Mockito.any())).
|
||||
thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).build()).
|
||||
thenThrow(new ServiceException(new RegionServerStoppedException("From Mockito"))).
|
||||
thenReturn(ClientProtos.ScanResponse.newBuilder().setScannerId(sid).
|
||||
setMoreResults(false).build());
|
||||
} catch (ServiceException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockingInterface getClient(ServerName sn) throws IOException {
|
||||
return this.stub;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Override to shutdown going to zookeeper for cluster id and meta location.
|
||||
*/
|
||||
|
@ -297,9 +265,9 @@ public class TestClientNoCluster extends Configured implements Tool {
|
|||
extends ConnectionImplementation {
|
||||
final ClientService.BlockingInterface stub;
|
||||
|
||||
RegionServerStoppedOnScannerOpenConnection(Configuration conf, boolean managed,
|
||||
RegionServerStoppedOnScannerOpenConnection(Configuration conf,
|
||||
ExecutorService pool, User user) throws IOException {
|
||||
super(conf, managed);
|
||||
super(conf, pool, user);
|
||||
// Mock up my stub so open scanner returns a scanner id and then on next, we throw
|
||||
// exceptions for three times and then after that, we return no more to scan.
|
||||
this.stub = Mockito.mock(ClientService.BlockingInterface.class);
|
||||
|
@ -329,9 +297,9 @@ public class TestClientNoCluster extends Configured implements Tool {
|
|||
extends ConnectionImplementation {
|
||||
final ClientService.BlockingInterface stub;
|
||||
|
||||
RpcTimeoutConnection(Configuration conf, boolean managed, ExecutorService pool, User user)
|
||||
RpcTimeoutConnection(Configuration conf, ExecutorService pool, User user)
|
||||
throws IOException {
|
||||
super(conf, managed);
|
||||
super(conf, pool, user);
|
||||
// Mock up my stub so an exists call -- which turns into a get -- throws an exception
|
||||
this.stub = Mockito.mock(ClientService.BlockingInterface.class);
|
||||
try {
|
||||
|
@ -364,10 +332,10 @@ public class TestClientNoCluster extends Configured implements Tool {
|
|||
final AtomicLong sequenceids = new AtomicLong(0);
|
||||
private final Configuration conf;
|
||||
|
||||
ManyServersManyRegionsConnection(Configuration conf, boolean managed,
|
||||
ManyServersManyRegionsConnection(Configuration conf,
|
||||
ExecutorService pool, User user)
|
||||
throws IOException {
|
||||
super(conf, managed, pool, user);
|
||||
super(conf, pool, user);
|
||||
int serverCount = conf.getInt("hbase.test.servers", 10);
|
||||
this.serversByClient =
|
||||
new HashMap<ServerName, ClientService.BlockingInterface>(serverCount);
|
||||
|
|
|
@ -63,4 +63,4 @@ log4j.logger.org.apache.hadoop.hbase=DEBUG
|
|||
log4j.org.apache.hadoop.metrics2.impl.MetricsSystemImpl=ERROR
|
||||
log4j.org.apache.hadoop.metrics2.util.MBeans=ERROR
|
||||
# Enable this to get detailed connection error/retry logging.
|
||||
# log4j.logger.org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation=TRACE
|
||||
# log4j.logger.org.apache.hadoop.hbase.client.ConnectionImplementation=TRACE
|
||||
|
|
|
@ -40,14 +40,14 @@ import org.apache.hadoop.hbase.security.UserProvider;
|
|||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class CoprocessorHConnection extends ConnectionImplementation {
|
||||
private static final NonceGenerator NO_NONCE_GEN = new ConnectionManager.NoNonceGenerator();
|
||||
private static final NonceGenerator NO_NONCE_GEN = new NoNonceGenerator();
|
||||
|
||||
/**
|
||||
* Create an unmanaged {@link HConnection} based on the environment in which we are running the
|
||||
* Create an {@link HConnection} based on the environment in which we are running the
|
||||
* coprocessor. The {@link HConnection} must be externally cleaned up (we bypass the usual HTable
|
||||
* cleanup mechanisms since we own everything).
|
||||
* @param env environment hosting the {@link HConnection}
|
||||
* @return an unmanaged {@link HConnection}.
|
||||
* @return instance of {@link HConnection}.
|
||||
* @throws IOException if we cannot create the connection
|
||||
*/
|
||||
public static ClusterConnection getConnectionForEnvironment(CoprocessorEnvironment env)
|
||||
|
@ -60,7 +60,7 @@ public class CoprocessorHConnection extends ConnectionImplementation {
|
|||
return new CoprocessorHConnection((HRegionServer) services);
|
||||
}
|
||||
}
|
||||
return ConnectionManager.createConnectionInternal(env.getConfiguration());
|
||||
return (ClusterConnection) ConnectionFactory.createConnection(env.getConfiguration());
|
||||
}
|
||||
|
||||
private final ServerName serverName;
|
||||
|
@ -95,7 +95,7 @@ public class CoprocessorHConnection extends ConnectionImplementation {
|
|||
* @throws IOException if we cannot create the connection
|
||||
*/
|
||||
public CoprocessorHConnection(Configuration conf, HRegionServer server) throws IOException {
|
||||
super(conf, false, null, UserProvider.instantiate(conf).getCurrent());
|
||||
super(conf, null, UserProvider.instantiate(conf).getCurrent());
|
||||
this.server = server;
|
||||
this.serverName = server.getServerName();
|
||||
}
|
||||
|
|
|
@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
|
|||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.RegionServerCallable;
|
||||
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
|
||||
|
@ -288,19 +287,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
*/
|
||||
@SuppressWarnings("deprecation")
|
||||
public void doBulkLoad(Path hfofDir, final HTable table)
|
||||
throws TableNotFoundException, IOException
|
||||
{
|
||||
Admin admin = null;
|
||||
try {
|
||||
try {
|
||||
admin = table.getConnection().getAdmin();
|
||||
} catch (NeedUnmanagedConnectionException ex) {
|
||||
admin = new HBaseAdmin(table.getConfiguration());
|
||||
}
|
||||
doBulkLoad(hfofDir, admin, table, table.getRegionLocator());
|
||||
} finally {
|
||||
admin.close();
|
||||
}
|
||||
throws TableNotFoundException, IOException {
|
||||
doBulkLoad(hfofDir, table.getConnection().getAdmin(), table, table.getRegionLocator());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -39,9 +39,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.NeedUnmanagedConnectionException;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
|
@ -613,24 +611,8 @@ extends InputFormat<ImmutableBytesWritable, Result> {
|
|||
protected void setHTable(HTable table) throws IOException {
|
||||
this.table = table;
|
||||
this.connection = table.getConnection();
|
||||
try {
|
||||
this.regionLocator = table.getRegionLocator();
|
||||
this.admin = this.connection.getAdmin();
|
||||
} catch (NeedUnmanagedConnectionException exception) {
|
||||
LOG.warn("You are using an HTable instance that relies on an HBase-managed Connection. " +
|
||||
"This is usually due to directly creating an HTable, which is deprecated. Instead, you " +
|
||||
"should create a Connection object and then request a Table instance from it. If you " +
|
||||
"don't need the Table instance for your own use, you should instead use the " +
|
||||
"TableInputFormatBase.initalizeTable method directly.");
|
||||
LOG.info("Creating an additional unmanaged connection because user provided one can't be " +
|
||||
"used for administrative actions. We'll close it when we close out the table.");
|
||||
LOG.debug("Details about our failure to request an administrative interface.", exception);
|
||||
// Do we need a "copy the settings from this Connection" method? are things like the User
|
||||
// properly maintained by just looking again at the Configuration?
|
||||
this.connection = ConnectionFactory.createConnection(this.connection.getConfiguration());
|
||||
this.regionLocator = this.connection.getRegionLocator(table.getName());
|
||||
this.admin = this.connection.getAdmin();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,24 +19,9 @@
|
|||
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import org.apache.hadoop.hbase.ResourceChecker.Phase;
|
||||
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
|
||||
|
||||
/**
|
||||
* Monitor the resources. use by the tests All resources in {@link ResourceCheckerJUnitListener}
|
||||
* plus the number of connection.
|
||||
*/
|
||||
public class ServerResourceCheckerJUnitListener extends ResourceCheckerJUnitListener {
|
||||
|
||||
static class ConnectionCountResourceAnalyzer extends ResourceChecker.ResourceAnalyzer {
|
||||
@Override
|
||||
public int getVal(Phase phase) {
|
||||
return HConnectionTestingUtility.getConnectionCount();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void addResourceAnalyzer(ResourceChecker rc) {
|
||||
rc.addResourceAnalyzer(new ConnectionCountResourceAnalyzer());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -183,10 +183,8 @@ public class TestMetaTableAccessorNoCluster {
|
|||
// Return the RegionLocations object when locateRegion
|
||||
// The ugly format below comes of 'Important gotcha on spying real objects!' from
|
||||
// http://mockito.googlecode.com/svn/branches/1.6/javadoc/org/mockito/Mockito.html
|
||||
ClusterConnection cConnection =
|
||||
HConnectionTestingUtility.getSpiedClusterConnection(UTIL.getConfiguration());
|
||||
Mockito.doReturn(rl).when
|
||||
(cConnection).locateRegion((TableName)Mockito.any(), (byte[])Mockito.any(),
|
||||
(connection).locateRegion((TableName)Mockito.any(), (byte[])Mockito.any(),
|
||||
Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyInt());
|
||||
|
||||
// Now shove our HRI implementation into the spied-upon connection.
|
||||
|
|
|
@ -38,7 +38,7 @@ public class HConnectionTestingUtility {
|
|||
/*
|
||||
* Not part of {@link HBaseTestingUtility} because this class is not
|
||||
* in same package as {@link HConnection}. Would have to reveal ugly
|
||||
* {@link ConnectionManager} innards to HBaseTestingUtility to give it access.
|
||||
* {@link ConnectionImplementation} innards to HBaseTestingUtility to give it access.
|
||||
*/
|
||||
/**
|
||||
* Get a Mocked {@link HConnection} that goes with the passed <code>conf</code>
|
||||
|
@ -52,18 +52,10 @@ public class HConnectionTestingUtility {
|
|||
*/
|
||||
public static ClusterConnection getMockedConnection(final Configuration conf)
|
||||
throws ZooKeeperConnectionException {
|
||||
HConnectionKey connectionKey = new HConnectionKey(conf);
|
||||
synchronized (ConnectionManager.CONNECTION_INSTANCES) {
|
||||
ConnectionImplementation connection =
|
||||
ConnectionManager.CONNECTION_INSTANCES.get(connectionKey);
|
||||
if (connection == null) {
|
||||
connection = Mockito.mock(ConnectionImplementation.class);
|
||||
ConnectionImplementation connection = Mockito.mock(ConnectionImplementation.class);
|
||||
Mockito.when(connection.getConfiguration()).thenReturn(conf);
|
||||
ConnectionManager.CONNECTION_INSTANCES.put(connectionKey, connection);
|
||||
}
|
||||
return connection;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls {@link #getMockedConnection(Configuration)} and then mocks a few
|
||||
|
@ -99,7 +91,6 @@ public class HConnectionTestingUtility {
|
|||
throws IOException {
|
||||
ConnectionImplementation c = Mockito.mock(ConnectionImplementation.class);
|
||||
Mockito.when(c.getConfiguration()).thenReturn(conf);
|
||||
ConnectionManager.CONNECTION_INSTANCES.put(new HConnectionKey(conf), c);
|
||||
Mockito.doNothing().when(c).close();
|
||||
// Make it so we return a particular location when asked.
|
||||
final HRegionLocation loc = new HRegionLocation(hri, sn);
|
||||
|
@ -151,38 +142,8 @@ public class HConnectionTestingUtility {
|
|||
*/
|
||||
public static ClusterConnection getSpiedConnection(final Configuration conf)
|
||||
throws IOException {
|
||||
HConnectionKey connectionKey = new HConnectionKey(conf);
|
||||
synchronized (ConnectionManager.CONNECTION_INSTANCES) {
|
||||
ConnectionImplementation connection =
|
||||
ConnectionManager.CONNECTION_INSTANCES.get(connectionKey);
|
||||
if (connection == null) {
|
||||
connection = Mockito.spy(new ConnectionImplementation(conf, false));
|
||||
ConnectionManager.CONNECTION_INSTANCES.put(connectionKey, connection);
|
||||
}
|
||||
Mockito.spy(new ConnectionImplementation(conf, null, null));
|
||||
return connection;
|
||||
}
|
||||
}
|
||||
|
||||
public static ClusterConnection getSpiedClusterConnection(final Configuration conf)
|
||||
throws IOException {
|
||||
HConnectionKey connectionKey = new HConnectionKey(conf);
|
||||
synchronized (ConnectionManager.CONNECTION_INSTANCES) {
|
||||
ConnectionImplementation connection =
|
||||
ConnectionManager.CONNECTION_INSTANCES.get(connectionKey);
|
||||
if (connection == null) {
|
||||
connection = Mockito.spy(new ConnectionImplementation(conf, false));
|
||||
ConnectionManager.CONNECTION_INSTANCES.put(connectionKey, connection);
|
||||
}
|
||||
return connection;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Count of extant connection instances
|
||||
*/
|
||||
public static int getConnectionCount() {
|
||||
synchronized (ConnectionManager.CONNECTION_INSTANCES) {
|
||||
return ConnectionManager.CONNECTION_INSTANCES.size();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -60,7 +60,6 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
@ -629,20 +628,6 @@ public class TestAdmin2 {
|
|||
return regionServer;
|
||||
}
|
||||
|
||||
/**
|
||||
* HBASE-4417 checkHBaseAvailable() doesn't close zk connections
|
||||
*/
|
||||
@Test (timeout=300000)
|
||||
public void testCheckHBaseAvailableClosesConnection() throws Exception {
|
||||
Configuration conf = TEST_UTIL.getConfiguration();
|
||||
|
||||
int initialCount = HConnectionTestingUtility.getConnectionCount();
|
||||
HBaseAdmin.checkHBaseAvailable(conf);
|
||||
int finalCount = HConnectionTestingUtility.getConnectionCount();
|
||||
|
||||
Assert.assertEquals(initialCount, finalCount) ;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that we have an exception if the cluster is not there.
|
||||
*/
|
||||
|
@ -654,8 +639,6 @@ public class TestAdmin2 {
|
|||
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT,
|
||||
conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT, 9999)+10);
|
||||
|
||||
int initialCount = HConnectionTestingUtility.getConnectionCount();
|
||||
|
||||
long start = System.currentTimeMillis();
|
||||
try {
|
||||
HBaseAdmin.checkHBaseAvailable(conf);
|
||||
|
@ -667,10 +650,6 @@ public class TestAdmin2 {
|
|||
}
|
||||
long end = System.currentTimeMillis();
|
||||
|
||||
int finalCount = HConnectionTestingUtility.getConnectionCount();
|
||||
|
||||
Assert.assertEquals(initialCount, finalCount) ;
|
||||
|
||||
LOG.info("It took "+(end-start)+" ms to find out that" +
|
||||
" HBase was not available");
|
||||
}
|
||||
|
|
|
@ -4033,7 +4033,7 @@ public class TestFromClientSide {
|
|||
*/
|
||||
HTable createUnmangedHConnectionHTable(final TableName tableName) throws IOException {
|
||||
TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
|
||||
HConnection conn = ConnectionManager.createConnection(TEST_UTIL.getConfiguration());
|
||||
Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
|
||||
return (HTable)conn.getTable(tableName);
|
||||
}
|
||||
|
||||
|
|
|
@ -23,15 +23,12 @@ import static org.junit.Assert.assertFalse;
|
|||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Modifier;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
|
@ -46,7 +43,6 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
|
@ -73,7 +69,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.jboss.netty.util.internal.DetectionUtil;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
|
@ -141,11 +136,6 @@ public class TestHCM {
|
|||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
|
||||
private static int getHConnectionManagerCacheSize(){
|
||||
return HConnectionTestingUtility.getConnectionCount();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClusterConnection() throws IOException {
|
||||
ThreadPoolExecutor otherPool = new ThreadPoolExecutor(1, 1,
|
||||
|
@ -153,26 +143,26 @@ public class TestHCM {
|
|||
new SynchronousQueue<Runnable>(),
|
||||
Threads.newDaemonThreadFactory("test-hcm"));
|
||||
|
||||
HConnection con1 = ConnectionManager.createConnection(TEST_UTIL.getConfiguration());
|
||||
HConnection con2 = ConnectionManager.createConnection(TEST_UTIL.getConfiguration(), otherPool);
|
||||
Connection con1 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
|
||||
Connection con2 = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration(), otherPool);
|
||||
// make sure the internally created ExecutorService is the one passed
|
||||
assertTrue(otherPool == ((ConnectionImplementation)con2).getCurrentBatchPool());
|
||||
|
||||
String tableName = "testClusterConnection";
|
||||
TEST_UTIL.createTable(tableName.getBytes(), FAM_NAM).close();
|
||||
HTable t = (HTable)con1.getTable(tableName, otherPool);
|
||||
HTable t = (HTable)con1.getTable(TableName.valueOf(tableName), otherPool);
|
||||
// make sure passing a pool to the getTable does not trigger creation of an internal pool
|
||||
assertNull("Internal Thread pool should be null", ((ConnectionImplementation)con1).getCurrentBatchPool());
|
||||
// table should use the pool passed
|
||||
assertTrue(otherPool == t.getPool());
|
||||
t.close();
|
||||
|
||||
t = (HTable)con2.getTable(tableName);
|
||||
t = (HTable)con2.getTable(TableName.valueOf(tableName));
|
||||
// table should use the connectin's internal pool
|
||||
assertTrue(otherPool == t.getPool());
|
||||
t.close();
|
||||
|
||||
t = (HTable)con2.getTable(Bytes.toBytes(tableName));
|
||||
t = (HTable)con2.getTable(TableName.valueOf(tableName));
|
||||
// try other API too
|
||||
assertTrue(otherPool == t.getPool());
|
||||
t.close();
|
||||
|
@ -182,7 +172,7 @@ public class TestHCM {
|
|||
assertTrue(otherPool == t.getPool());
|
||||
t.close();
|
||||
|
||||
t = (HTable)con1.getTable(tableName);
|
||||
t = (HTable)con1.getTable(TableName.valueOf(tableName));
|
||||
ExecutorService pool = ((ConnectionImplementation)con1).getCurrentBatchPool();
|
||||
// make sure an internal pool was created
|
||||
assertNotNull("An internal Thread pool should have been created", pool);
|
||||
|
@ -190,7 +180,7 @@ public class TestHCM {
|
|||
assertTrue(t.getPool() == pool);
|
||||
t.close();
|
||||
|
||||
t = (HTable)con1.getTable(tableName);
|
||||
t = (HTable)con1.getTable(TableName.valueOf(tableName));
|
||||
// still using the *same* internal pool
|
||||
assertTrue(t.getPool() == pool);
|
||||
t.close();
|
||||
|
@ -535,7 +525,6 @@ public class TestHCM {
|
|||
} finally {
|
||||
syncBlockingFilter.set(true);
|
||||
t.join();
|
||||
ConnectionManager.getConnection(c2).close();
|
||||
TEST_UTIL.getHBaseAdmin().setBalancerRunning(previousBalance, true);
|
||||
}
|
||||
|
||||
|
@ -568,28 +557,6 @@ public class TestHCM {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void abortingHConnectionRemovesItselfFromHCM() throws Exception {
|
||||
// Save off current HConnections
|
||||
Map<HConnectionKey, ConnectionImplementation> oldHBaseInstances =
|
||||
new HashMap<HConnectionKey, ConnectionImplementation>();
|
||||
oldHBaseInstances.putAll(ConnectionManager.CONNECTION_INSTANCES);
|
||||
|
||||
ConnectionManager.CONNECTION_INSTANCES.clear();
|
||||
|
||||
try {
|
||||
HConnection connection = ConnectionManager.getConnection(TEST_UTIL.getConfiguration());
|
||||
connection.abort("test abortingHConnectionRemovesItselfFromHCM", new Exception(
|
||||
"test abortingHConnectionRemovesItselfFromHCM"));
|
||||
Assert.assertNotSame(connection,
|
||||
ConnectionManager.getConnection(TEST_UTIL.getConfiguration()));
|
||||
} finally {
|
||||
// Put original HConnections back
|
||||
ConnectionManager.CONNECTION_INSTANCES.clear();
|
||||
ConnectionManager.CONNECTION_INSTANCES.putAll(oldHBaseInstances);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that when we delete a location using the first row of a region
|
||||
* that we really delete it.
|
||||
|
@ -710,7 +677,7 @@ public class TestHCM {
|
|||
Assert.assertArrayEquals(e.getRow(0).getRow(), ROW);
|
||||
|
||||
// Check that we unserialized the exception as expected
|
||||
Throwable cause = ConnectionManager.findException(e.getCause(0));
|
||||
Throwable cause = ConnectionImplementation.findException(e.getCause(0));
|
||||
Assert.assertNotNull(cause);
|
||||
Assert.assertTrue(cause instanceof RegionMovedException);
|
||||
}
|
||||
|
@ -846,35 +813,6 @@ public class TestHCM {
|
|||
table.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Make sure that {@link Configuration} instances that are essentially the
|
||||
* same map to the same {@link HConnection} instance.
|
||||
*/
|
||||
@Test
|
||||
public void testConnectionSameness() throws Exception {
|
||||
Connection previousConnection = null;
|
||||
for (int i = 0; i < 2; i++) {
|
||||
// set random key to differentiate the connection from previous ones
|
||||
Configuration configuration = TEST_UTIL.getConfiguration();
|
||||
configuration.set("some_key", String.valueOf(_randy.nextInt()));
|
||||
LOG.info("The hash code of the current configuration is: "
|
||||
+ configuration.hashCode());
|
||||
Connection currentConnection = ConnectionManager
|
||||
.getConnection(configuration);
|
||||
if (previousConnection != null) {
|
||||
assertTrue(
|
||||
"Did not get the same connection even though its key didn't change",
|
||||
previousConnection == currentConnection);
|
||||
}
|
||||
previousConnection = currentConnection;
|
||||
// change the configuration, so that it is no longer reachable from the
|
||||
// client's perspective. However, since its part of the LRU doubly linked
|
||||
// list, it will eventually get thrown out, at which time it should also
|
||||
// close the corresponding {@link HConnection}.
|
||||
configuration.set("other_key", String.valueOf(_randy.nextInt()));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClosing() throws Exception {
|
||||
Configuration configuration =
|
||||
|
@ -911,13 +849,8 @@ public class TestHCM {
|
|||
// created from the same configuration, yet they are different
|
||||
assertTrue(c1 != c2);
|
||||
assertTrue(c1.getConfiguration() == c2.getConfiguration());
|
||||
// make sure these were not cached
|
||||
Connection c3 = ConnectionManager.getConnection(configuration);
|
||||
assertTrue(c1 != c3);
|
||||
assertTrue(c2 != c3);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This test checks that one can connect to the cluster with only the
|
||||
* ZooKeeper quorum set. Other stuff like master address will be read
|
||||
|
@ -929,12 +862,12 @@ public class TestHCM {
|
|||
Configuration c = new Configuration();
|
||||
c.set(HConstants.ZOOKEEPER_QUORUM,
|
||||
TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM));
|
||||
c.set(HConstants.ZOOKEEPER_CLIENT_PORT ,
|
||||
c.set(HConstants.ZOOKEEPER_CLIENT_PORT,
|
||||
TEST_UTIL.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT));
|
||||
|
||||
// This should be enough to connect
|
||||
HConnection conn = ConnectionManager.getConnection(c);
|
||||
assertTrue( conn.isMasterRunning() );
|
||||
HConnection conn = (HConnection) ConnectionFactory.createConnection(c);
|
||||
assertTrue(conn.isMasterRunning());
|
||||
conn.close();
|
||||
}
|
||||
|
||||
|
@ -1074,8 +1007,8 @@ public class TestHCM {
|
|||
try {
|
||||
long timeBase = timeMachine.currentTime();
|
||||
long largeAmountOfTime = ANY_PAUSE * 1000;
|
||||
ConnectionManager.ServerErrorTracker tracker =
|
||||
new ConnectionManager.ServerErrorTracker(largeAmountOfTime, 100);
|
||||
ConnectionImplementation.ServerErrorTracker tracker =
|
||||
new ConnectionImplementation.ServerErrorTracker(largeAmountOfTime, 100);
|
||||
|
||||
// The default backoff is 0.
|
||||
assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));
|
||||
|
@ -1130,85 +1063,6 @@ public class TestHCM {
|
|||
Math.abs(actual - expected) <= (0.01f * jitterBase));
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that a destroyed connection does not have a live zookeeper.
|
||||
* Below is timing based. We put up a connection to a table and then close the connection while
|
||||
* having a background thread running that is forcing close of the connection to try and
|
||||
* provoke a close catastrophe; we are hoping for a car crash so we can see if we are leaking
|
||||
* zk connections.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Ignore ("Flakey test: See HBASE-8996")@Test
|
||||
public void testDeleteForZKConnLeak() throws Exception {
|
||||
TEST_UTIL.createTable(TABLE_NAME4, FAM_NAM);
|
||||
final Configuration config = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
|
||||
config.setInt("zookeeper.recovery.retry", 1);
|
||||
config.setInt("zookeeper.recovery.retry.intervalmill", 1000);
|
||||
config.setInt("hbase.rpc.timeout", 2000);
|
||||
config.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
|
||||
|
||||
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10,
|
||||
5, TimeUnit.SECONDS,
|
||||
new SynchronousQueue<Runnable>(),
|
||||
Threads.newDaemonThreadFactory("test-hcm-delete"));
|
||||
|
||||
pool.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
while (!Thread.interrupted()) {
|
||||
try {
|
||||
HConnection conn = ConnectionManager.getConnection(config);
|
||||
LOG.info("Connection " + conn);
|
||||
ConnectionManager.deleteStaleConnection(conn);
|
||||
LOG.info("Connection closed " + conn);
|
||||
// TODO: This sleep time should be less than the time that it takes to open and close
|
||||
// a table. Ideally we would do a few runs first to measure. For now this is
|
||||
// timing based; hopefully we hit the bad condition.
|
||||
Threads.sleep(10);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Use connection multiple times.
|
||||
for (int i = 0; i < 30; i++) {
|
||||
Connection c1 = null;
|
||||
try {
|
||||
c1 = ConnectionManager.getConnectionInternal(config);
|
||||
LOG.info("HTable connection " + i + " " + c1);
|
||||
Table table = c1.getTable(TABLE_NAME4, pool);
|
||||
table.close();
|
||||
LOG.info("HTable connection " + i + " closed " + c1);
|
||||
} catch (Exception e) {
|
||||
LOG.info("We actually want this to happen!!!! So we can see if we are leaking zk", e);
|
||||
} finally {
|
||||
if (c1 != null) {
|
||||
if (c1.isClosed()) {
|
||||
// cannot use getZooKeeper as method instantiates watcher if null
|
||||
Field zkwField = c1.getClass().getDeclaredField("keepAliveZookeeper");
|
||||
zkwField.setAccessible(true);
|
||||
Object watcher = zkwField.get(c1);
|
||||
|
||||
if (watcher != null) {
|
||||
if (((ZooKeeperWatcher)watcher).getRecoverableZooKeeper().getState().isAlive()) {
|
||||
// non-synchronized access to watcher; sleep and check again in case zk connection
|
||||
// hasn't been cleaned up yet.
|
||||
Thread.sleep(1000);
|
||||
if (((ZooKeeperWatcher) watcher).getRecoverableZooKeeper().getState().isAlive()) {
|
||||
pool.shutdownNow();
|
||||
fail("Live zookeeper in closed connection");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
c1.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
pool.shutdownNow();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testConnectionRideOverClusterRestart() throws IOException, InterruptedException {
|
||||
Configuration config = new Configuration(TEST_UTIL.getConfiguration());
|
||||
|
|
|
@ -409,7 +409,9 @@ public class TestMetaWithReplicas {
|
|||
public void testShutdownOfReplicaHolder() throws Exception {
|
||||
// checks that the when the server holding meta replica is shut down, the meta replica
|
||||
// can be recovered
|
||||
RegionLocations rl = ConnectionManager.getConnectionInternal(TEST_UTIL.getConfiguration()).
|
||||
ClusterConnection conn = (ClusterConnection)
|
||||
ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
|
||||
RegionLocations rl = conn.
|
||||
locateRegion(TableName.META_TABLE_NAME, Bytes.toBytes(""), false, true);
|
||||
HRegionLocation hrl = rl.getRegionLocation(1);
|
||||
ServerName oldServer = hrl.getServerName();
|
||||
|
@ -418,12 +420,12 @@ public class TestMetaWithReplicas {
|
|||
do {
|
||||
LOG.debug("Waiting for the replica " + hrl.getRegionInfo() + " to come up");
|
||||
Thread.sleep(30000); //wait for the detection/recovery
|
||||
rl = ConnectionManager.getConnectionInternal(TEST_UTIL.getConfiguration()).
|
||||
locateRegion(TableName.META_TABLE_NAME, Bytes.toBytes(""), false, true);
|
||||
rl = conn.locateRegion(TableName.META_TABLE_NAME, Bytes.toBytes(""), false, true);
|
||||
hrl = rl.getRegionLocation(1);
|
||||
i++;
|
||||
} while ((hrl == null || hrl.getServerName().equals(oldServer)) && i < 3);
|
||||
assertTrue(i != 3);
|
||||
conn.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue