HBASE-12597

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Jurriaan Mous 2014-12-06 13:29:15 +01:00 committed by stack
parent 1a27cb7b0f
commit a8e6461855
29 changed files with 2116 additions and 1814 deletions

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.RegionOpeningException; import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -619,7 +620,7 @@ class ConnectionManager {
this.registry = setupRegistry(); this.registry = setupRegistry();
retrieveClusterId(); retrieveClusterId();
this.rpcClient = new RpcClient(this.conf, this.clusterId); this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf); this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
// Do we publish the status? // Do we publish the status?
@ -639,7 +640,7 @@ class ConnectionManager {
@Override @Override
public void newDead(ServerName sn) { public void newDead(ServerName sn) {
clearCaches(sn); clearCaches(sn);
rpcClient.cancelConnections(sn.getHostname(), sn.getPort()); rpcClient.cancelConnections(sn);
} }
}, conf, listenerClass); }, conf, listenerClass);
} }
@ -783,18 +784,6 @@ class ConnectionManager {
return RegistryFactory.getRegistry(this); return RegistryFactory.getRegistry(this);
} }
/**
* For tests only.
* @param rpcClient Client we should use instead.
* @return Previous rpcClient
*/
@VisibleForTesting
RpcClient setRpcClient(final RpcClient rpcClient) {
RpcClient oldRpcClient = this.rpcClient;
this.rpcClient = rpcClient;
return oldRpcClient;
}
/** /**
* For tests only. * For tests only.
*/ */
@ -2336,7 +2325,7 @@ class ConnectionManager {
clusterStatusListener.close(); clusterStatusListener.close();
} }
if (rpcClient != null) { if (rpcClient != null) {
rpcClient.stop(); rpcClient.close();
} }
} }

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException; import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException; import org.apache.hadoop.hbase.ipc.FailedServerException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;

View File

@ -0,0 +1,177 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.io.compress.CompressionCodec;
import java.net.SocketAddress;
/**
* Provides the basics for a RpcClient implementation like configuration and Logging.
*/
@InterfaceAudience.Private
public abstract class AbstractRpcClient implements RpcClient {
public static final Log LOG = LogFactory.getLog(AbstractRpcClient.class);
protected final Configuration conf;
protected String clusterId;
protected final SocketAddress localAddr;
protected UserProvider userProvider;
protected final IPCUtil ipcUtil;
protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
// time (in ms), it will be closed at any moment.
protected final int maxRetries; //the max. no. of retries for socket connections
protected final long failureSleep; // Time to sleep before retry on failure.
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
protected final boolean tcpKeepAlive; // if T then use keepalives
protected final Codec codec;
protected final CompressionCodec compressor;
protected final boolean fallbackAllowed;
protected final int connectTO;
protected final int readTO;
protected final int writeTO;
/**
* Construct an IPC client for the cluster <code>clusterId</code>
*
* @param conf configuration
* @param clusterId the cluster id
* @param localAddr client socket bind address.
*/
public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
this.userProvider = UserProvider.instantiate(conf);
this.localAddr = localAddr;
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
this.ipcUtil = new IPCUtil(conf);
this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
this.conf = conf;
this.codec = getCodec();
this.compressor = getCompressor(conf);
this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
// login the server principal (if using secure Hadoop)
if (LOG.isDebugEnabled()) {
LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
", tcpKeepAlive=" + this.tcpKeepAlive +
", tcpNoDelay=" + this.tcpNoDelay +
", connectTO=" + this.connectTO +
", readTO=" + this.readTO +
", writeTO=" + this.writeTO +
", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
", maxRetries=" + this.maxRetries +
", fallbackAllowed=" + this.fallbackAllowed +
", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
}
}
@VisibleForTesting
public static String getDefaultCodec(final Configuration c) {
// If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
// Configuration will complain -- then no default codec (and we'll pb everything). Else
// default is KeyValueCodec
return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
}
/**
* Encapsulate the ugly casting and RuntimeException conversion in private method.
* @return Codec to use on this client.
*/
Codec getCodec() {
// For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
// "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
if (className == null || className.length() == 0) return null;
try {
return (Codec)Class.forName(className).newInstance();
} catch (Exception e) {
throw new RuntimeException("Failed getting codec " + className, e);
}
}
/**
* Encapsulate the ugly casting and RuntimeException conversion in private method.
* @param conf configuration
* @return The compressor to use on this client.
*/
private static CompressionCodec getCompressor(final Configuration conf) {
String className = conf.get("hbase.client.rpc.compressor", null);
if (className == null || className.isEmpty()) return null;
try {
return (CompressionCodec)Class.forName(className).newInstance();
} catch (Exception e) {
throw new RuntimeException("Failed getting compressor " + className, e);
}
}
/**
* Return the pool type specified in the configuration, which must be set to
* either {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
* {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal},
* otherwise default to the former.
*
* For applications with many user threads, use a small round-robin pool. For
* applications with few user threads, you may want to try using a
* thread-local pool. In any case, the number of {@link org.apache.hadoop.hbase.ipc.RpcClient}
* instances should not exceed the operating system's hard limit on the number of
* connections.
*
* @param config configuration
* @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
* {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
*/
protected static PoolMap.PoolType getPoolType(Configuration config) {
return PoolMap.PoolType
.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin,
PoolMap.PoolType.ThreadLocal);
}
/**
* Return the pool size specified in the configuration, which is applicable only if
* the pool type is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
*
* @param config configuration
* @return the maximum pool size
*/
protected static int getPoolSize(Configuration config) {
return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
}
}

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import java.io.IOException;
/** A call waiting for a value. */
@InterfaceAudience.Private
public class Call {
final int id; // call id
final Message param; // rpc request method param object
/**
* Optionally has cells when making call. Optionally has cells set on response. Used
* passing cells to the rpc and receiving the response.
*/
CellScanner cells;
Message response; // value, null if error
// The return type. Used to create shell into which we deserialize the response if any.
Message responseDefaultType;
IOException error; // exception, null if value
volatile boolean done; // true when call is done
long startTime;
final Descriptors.MethodDescriptor md;
final int timeout; // timeout in millisecond for this call; 0 means infinite.
protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
final CellScanner cells, final Message responseDefaultType, int timeout) {
this.param = param;
this.md = md;
this.cells = cells;
this.startTime = EnvironmentEdgeManager.currentTime();
this.responseDefaultType = responseDefaultType;
this.id = id;
this.timeout = timeout;
}
/**
* Check if the call did timeout. Set an exception (includes a notify) if it's the case.
* @return true if the call is on timeout, false otherwise.
*/
public boolean checkAndSetTimeout() {
if (timeout == 0){
return false;
}
long waitTime = EnvironmentEdgeManager.currentTime() - getStartTime();
if (waitTime >= timeout) {
IOException ie = new CallTimeoutException("Call id=" + id +
", waitTime=" + waitTime + ", operationTimeout=" + timeout + " expired.");
setException(ie); // includes a notify
return true;
} else {
return false;
}
}
public int remainingTime() {
if (timeout == 0) {
return Integer.MAX_VALUE;
}
int remaining = timeout - (int) (EnvironmentEdgeManager.currentTime() - getStartTime());
return remaining > 0 ? remaining : 0;
}
@Override
public String toString() {
return "callId: " + this.id + " methodName: " + this.md.getName() + " param {" +
(this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") + "}";
}
/** Indicate when the call is complete and the
* value or error are available. Notifies by default. */
protected synchronized void callComplete() {
this.done = true;
notify(); // notify caller
}
/** Set the exception when there is an error.
* Notify the caller the call is done.
*
* @param error exception thrown by the call; either local or remote
*/
public void setException(IOException error) {
this.error = error;
callComplete();
}
/**
* Set the return value when there is no error.
* Notify the caller the call is done.
*
* @param response return value of the call.
* @param cells Can be null
*/
public void setResponse(Message response, final CellScanner cells) {
this.response = response;
this.cells = cells;
callComplete();
}
public long getStartTime() {
return this.startTime;
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import java.io.IOException;
/**
* Client-side call timeout
*/
@SuppressWarnings("serial")
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class CallTimeoutException extends IOException {
public CallTimeoutException(final String msg) {
super(msg);
}
}

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.security.User;
import java.net.InetSocketAddress;
/**
* This class holds the address and the user ticket, etc. The client connections
* to servers are uniquely identified by <remoteAddress, ticket, serviceName>
*/
@InterfaceAudience.Private
public class ConnectionId {
final InetSocketAddress address;
final User ticket;
private static final int PRIME = 16777619;
final String serviceName;
public ConnectionId(User ticket, String serviceName, InetSocketAddress address) {
this.address = address;
this.ticket = ticket;
this.serviceName = serviceName;
}
public String getServiceName() {
return this.serviceName;
}
public InetSocketAddress getAddress() {
return address;
}
public User getTicket() {
return ticket;
}
@Override
public String toString() {
return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
}
@Override
public boolean equals(Object obj) {
if (obj instanceof ConnectionId) {
ConnectionId id = (ConnectionId) obj;
return address.equals(id.address) &&
((ticket != null && ticket.equals(id.ticket)) ||
(ticket == id.ticket)) &&
this.serviceName == id.serviceName;
}
return false;
}
@Override // simply use the default Object#hashcode() ?
public int hashCode() {
int hashcode = (address.hashCode() +
PRIME * (PRIME * this.serviceName.hashCode() ^
(ticket == null ? 0 : ticket.hashCode())));
return hashcode;
}
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Indicates that we're trying to connect to a already known as dead server. We will want to
* retry: we're getting this because the region location was wrong, or because
* the server just died, in which case the retry loop will help us to wait for the
* regions to recover.
*/
@SuppressWarnings("serial")
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class FailedServerException extends HBaseIOException {
public FailedServerException(String s) {
super(s);
}
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.LinkedList;
/**
* A class to manage a list of servers that failed recently.
*/
@InterfaceAudience.Private
public class FailedServers {
private final LinkedList<Pair<Long, String>> failedServers = new
LinkedList<Pair<Long, String>>();
private final int recheckServersTimeout;
public FailedServers(Configuration conf) {
this.recheckServersTimeout = conf.getInt(
RpcClient.FAILED_SERVER_EXPIRY_KEY, RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
}
/**
* Add an address to the list of the failed servers list.
*/
public synchronized void addToFailedServers(InetSocketAddress address) {
final long expiry = EnvironmentEdgeManager.currentTime() + recheckServersTimeout;
failedServers.addFirst(new Pair<Long, String>(expiry, address.toString()));
}
/**
* Check if the server should be considered as bad. Clean the old entries of the list.
*
* @return true if the server is in the failed servers list
*/
public synchronized boolean isFailedServer(final InetSocketAddress address) {
if (failedServers.isEmpty()) {
return false;
}
final String lookup = address.toString();
final long now = EnvironmentEdgeManager.currentTime();
// iterate, looking for the search entry and cleaning expired entries
Iterator<Pair<Long, String>> it = failedServers.iterator();
while (it.hasNext()) {
Pair<Long, String> cur = it.next();
if (cur.getFirst() < now) {
it.remove();
} else {
if (lookup.equals(cur.getSecond())) {
return true;
}
}
}
return false;
}
}

View File

@ -51,7 +51,7 @@ import com.google.protobuf.Message;
* Utility to help ipc'ing. * Utility to help ipc'ing.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class IPCUtil { public class IPCUtil {
public static final Log LOG = LogFactory.getLog(IPCUtil.class); public static final Log LOG = LogFactory.getLog(IPCUtil.class);
/** /**
* How much we think the decompressor will expand the original compressed content. * How much we think the decompressor will expand the original compressed content.
@ -60,7 +60,7 @@ class IPCUtil {
private final int cellBlockBuildingInitialBufferSize; private final int cellBlockBuildingInitialBufferSize;
private final Configuration conf; private final Configuration conf;
IPCUtil(final Configuration conf) { public IPCUtil(final Configuration conf) {
super(); super();
this.conf = conf; this.conf = conf;
this.cellBlockDecompressionMultiplier = this.cellBlockDecompressionMultiplier =
@ -81,14 +81,14 @@ class IPCUtil {
* <code>compressor</code>. * <code>compressor</code>.
* @param codec * @param codec
* @param compressor * @param compressor
* @Param cellScanner * @param cellScanner
* @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
* passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
* flipped and is ready for reading. Use limit to find total size. * flipped and is ready for reading. Use limit to find total size.
* @throws IOException * @throws IOException
*/ */
@SuppressWarnings("resource") @SuppressWarnings("resource")
ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor, public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
final CellScanner cellScanner) final CellScanner cellScanner)
throws IOException { throws IOException {
if (cellScanner == null) return null; if (cellScanner == null) return null;
@ -145,7 +145,7 @@ class IPCUtil {
* @return CellScanner to work against the content of <code>cellBlock</code> * @return CellScanner to work against the content of <code>cellBlock</code>
* @throws IOException * @throws IOException
*/ */
CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
final byte [] cellBlock) final byte [] cellBlock)
throws IOException { throws IOException {
return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length); return createCellScanner(codec, compressor, cellBlock, 0, cellBlock.length);
@ -159,7 +159,7 @@ class IPCUtil {
* @return CellScanner to work against the content of <code>cellBlock</code> * @return CellScanner to work against the content of <code>cellBlock</code>
* @throws IOException * @throws IOException
*/ */
CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor, public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
final byte [] cellBlock, final int offset, final int length) final byte [] cellBlock, final int offset, final int length)
throws IOException { throws IOException {
// If compressed, decompress it first before passing it on else we will leak compression // If compressed, decompress it first before passing it on else we will leak compression
@ -200,7 +200,7 @@ class IPCUtil {
* @return The passed in Message serialized with delimiter. Return null if <code>m</code> is null * @return The passed in Message serialized with delimiter. Return null if <code>m</code> is null
* @throws IOException * @throws IOException
*/ */
static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException { public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
if (m == null) return null; if (m == null) return null;
int serializedSize = m.getSerializedSize(); int serializedSize = m.getSerializedSize();
int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize); int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
@ -223,7 +223,7 @@ class IPCUtil {
* @return Total number of bytes written. * @return Total number of bytes written.
* @throws IOException * @throws IOException
*/ */
static int write(final OutputStream dos, final Message header, final Message param, public static int write(final OutputStream dos, final Message header, final Message param,
final ByteBuffer cellBlock) final ByteBuffer cellBlock)
throws IOException { throws IOException {
// Must calculate total size and write that first so other side can read it all in in one // Must calculate total size and write that first so other side can read it all in in one
@ -255,7 +255,7 @@ class IPCUtil {
* @param len * @param len
* @throws IOException * @throws IOException
*/ */
static void readChunked(final DataInput in, byte[] dest, int offset, int len) public static void readChunked(final DataInput in, byte[] dest, int offset, int len)
throws IOException { throws IOException {
int maxRead = 8192; int maxRead = 8192;
@ -265,11 +265,9 @@ class IPCUtil {
} }
/** /**
* @param header
* @param body
* @return Size on the wire when the two messages are written with writeDelimitedTo * @return Size on the wire when the two messages are written with writeDelimitedTo
*/ */
static int getTotalSizeWhenWrittenDelimited(Message ... messages) { public static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
int totalSize = 0; int totalSize = 0;
for (Message m: messages) { for (Message m: messages) {
if (m == null) continue; if (m == null) continue;

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import java.net.SocketAddress;
/**
* Factory to create a {@link org.apache.hadoop.hbase.ipc.RpcClient}
*/
@InterfaceAudience.Private
public final class RpcClientFactory {
public static final String CUSTOM_RPC_CLIENT_IMPL_CONF_KEY = "hbase.rpc.client.impl";
/**
* Private Constructor
*/
private RpcClientFactory() {
}
/**
* Creates a new RpcClient by the class defined in the configuration or falls back to
* RpcClientImpl
* @param conf configuration
* @param clusterId the cluster id
* @return newly created RpcClient
*/
public static RpcClient createClient(Configuration conf, String clusterId) {
return createClient(conf, clusterId, null);
}
/**
* Creates a new RpcClient by the class defined in the configuration or falls back to
* RpcClientImpl
* @param conf configuration
* @param clusterId the cluster id
* @param localAddr client socket bind address.
* @return newly created RpcClient
*/
public static RpcClient createClient(Configuration conf, String clusterId,
SocketAddress localAddr) {
String rpcClientClass =
conf.get(CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
RpcClientImpl.class.getName());
return ReflectionUtils.instantiateWithCustomCtor(
rpcClientClass,
new Class[] { Configuration.class, String.class, SocketAddress.class },
new Object[] { conf, clusterId, localAddr }
);
}
}

File diff suppressed because it is too large Load Diff

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.FailedServerException;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -316,7 +316,7 @@ public class MetaTableLocator {
LOG.debug("Exception connecting to " + sn); LOG.debug("Exception connecting to " + sn);
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
LOG.debug("Unknown host exception connecting to " + sn); LOG.debug("Unknown host exception connecting to " + sn);
} catch (RpcClient.FailedServerException e) { } catch (FailedServerException e) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Server " + sn + " is in failed server list."); LOG.debug("Server " + sn + " is in failed server list.");
} }

View File

@ -148,7 +148,7 @@ import com.google.protobuf.TextFormat;
* CallRunner#run executes the call. When done, asks the included Call to put itself on new * CallRunner#run executes the call. When done, asks the included Call to put itself on new
* queue for Responder to pull from and return result to client. * queue for Responder to pull from and return result to client.
* *
* @see RpcClient * @see RpcClientImpl
*/ */
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) @InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
@InterfaceStability.Evolving @InterfaceStability.Evolving

View File

@ -64,8 +64,8 @@ import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.FailedServerException;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper; import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;

View File

@ -78,7 +78,6 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@ -91,6 +90,7 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.http.InfoServer; import org.apache.hadoop.hbase.http.InfoServer;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.ipc.ServerRpcController;
@ -782,10 +782,11 @@ public class HRegionServer extends HasThread implements
rsQuotaManager = new RegionServerQuotaManager(this); rsQuotaManager = new RegionServerQuotaManager(this);
// Setup RPC client for master communication // Setup RPC client for master communication
rpcClient = new RpcClient(conf, clusterId, new InetSocketAddress( rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
rpcServices.isa.getAddress(), 0)); rpcServices.isa.getAddress(), 0));
int storefileRefreshPeriod = conf.getInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD int storefileRefreshPeriod = conf.getInt(
StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
, StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD); , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
if (storefileRefreshPeriod > 0) { if (storefileRefreshPeriod > 0) {
this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, this, this); this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod, this, this);
@ -994,7 +995,7 @@ public class HRegionServer extends HasThread implements
this.rssStub = null; this.rssStub = null;
} }
if (this.rpcClient != null) { if (this.rpcClient != null) {
this.rpcClient.stop(); this.rpcClient.close();
} }
if (this.leases != null) { if (this.leases != null) {
this.leases.close(); this.leases.close();

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer; import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
@ -65,7 +65,7 @@ public class TestClientScannerRPCTimeout {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger)AbstractRpcClient.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
Configuration conf = TEST_UTIL.getConfiguration(); Configuration conf = TEST_UTIL.getConfiguration();
// Don't report so often so easier to see other rpcs // Don't report so often so easier to see other rpcs

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.net.SocketAddress;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -35,6 +36,8 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcClientImpl;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -61,6 +64,10 @@ public class TestClientTimeouts {
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniCluster(SLAVES); TEST_UTIL.startMiniCluster(SLAVES);
// Set the custom RPC client with random timeouts as the client
TEST_UTIL.getConfiguration().set(
RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
RandomTimeoutRpcClient.class.getName());
} }
/** /**
@ -81,7 +88,9 @@ public class TestClientTimeouts {
Connection lastConnection = null; Connection lastConnection = null;
boolean lastFailed = false; boolean lastFailed = false;
int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get(); int initialInvocations = RandomTimeoutBlockingRpcChannel.invokations.get();
RpcClient rpcClient = newRandomTimeoutRpcClient(); RandomTimeoutRpcClient rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
.createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
try { try {
for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) { for (int i = 0; i < 5 || (lastFailed && i < 100); ++i) {
lastFailed = false; lastFailed = false;
@ -94,13 +103,6 @@ public class TestClientTimeouts {
Connection connection = admin.getConnection(); Connection connection = admin.getConnection();
assertFalse(connection == lastConnection); assertFalse(connection == lastConnection);
lastConnection = connection; lastConnection = connection;
// Override the connection's rpc client for timeout testing
RpcClient oldRpcClient =
((ConnectionManager.HConnectionImplementation)connection).setRpcClient(
rpcClient);
if (oldRpcClient != null) {
oldRpcClient.stop();
}
// run some admin commands // run some admin commands
HBaseAdmin.checkHBaseAvailable(conf); HBaseAdmin.checkHBaseAvailable(conf);
admin.setBalancerRunning(false, false); admin.setBalancerRunning(false, false);
@ -111,7 +113,8 @@ public class TestClientTimeouts {
} finally { } finally {
admin.close(); admin.close();
if (admin.getConnection().isClosed()) { if (admin.getConnection().isClosed()) {
rpcClient = newRandomTimeoutRpcClient(); rpcClient = (RandomTimeoutRpcClient) RpcClientFactory
.createClient(TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey());
} }
} }
} }
@ -119,31 +122,36 @@ public class TestClientTimeouts {
assertFalse(lastFailed); assertFalse(lastFailed);
assertTrue(RandomTimeoutBlockingRpcChannel.invokations.get() > initialInvocations); assertTrue(RandomTimeoutBlockingRpcChannel.invokations.get() > initialInvocations);
} finally { } finally {
rpcClient.stop(); rpcClient.close();
} }
} }
private static RpcClient newRandomTimeoutRpcClient() { /**
return new RpcClient( * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel
TEST_UTIL.getConfiguration(), TEST_UTIL.getClusterKey()) { */
// Return my own instance, one that does random timeouts public static class RandomTimeoutRpcClient extends RpcClientImpl{
@Override public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
public BlockingRpcChannel createBlockingRpcChannel(ServerName sn, super(conf, clusterId, localAddr);
User ticket, int rpcTimeout) { }
return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout);
} // Return my own instance, one that does random timeouts
}; @Override
public BlockingRpcChannel createBlockingRpcChannel(ServerName sn,
User ticket, int rpcTimeout) {
return new RandomTimeoutBlockingRpcChannel(this, sn, ticket, rpcTimeout);
}
} }
/** /**
* Blocking rpc channel that goes via hbase rpc. * Blocking rpc channel that goes via hbase rpc.
*/ */
static class RandomTimeoutBlockingRpcChannel extends RpcClient.BlockingRpcChannelImplementation { static class RandomTimeoutBlockingRpcChannel
extends RpcClientImpl.BlockingRpcChannelImplementation {
private static final Random RANDOM = new Random(System.currentTimeMillis()); private static final Random RANDOM = new Random(System.currentTimeMillis());
public static final double CHANCE_OF_TIMEOUT = 0.3; public static final double CHANCE_OF_TIMEOUT = 0.3;
private static AtomicInteger invokations = new AtomicInteger(); private static AtomicInteger invokations = new AtomicInteger();
RandomTimeoutBlockingRpcChannel(final RpcClient rpcClient, final ServerName sn, RandomTimeoutBlockingRpcChannel(final RpcClientImpl rpcClient, final ServerName sn,
final User ticket, final int rpcTimeout) { final User ticket, final int rpcTimeout) {
super(rpcClient, sn, ticket, rpcTimeout); super(rpcClient, sn, ticket, rpcTimeout);
} }

View File

@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -97,7 +97,7 @@ public class TestFromClientSideNoCodec {
public void testNoCodec() { public void testNoCodec() {
Configuration c = new Configuration(); Configuration c = new Configuration();
c.set("hbase.client.default.rpc.codec", ""); c.set("hbase.client.default.rpc.codec", "");
String codec = RpcClient.getDefaultCodec(c); String codec = AbstractRpcClient.getDefaultCodec(c);
assertTrue(codec == null || codec.length() == 0); assertTrue(codec == null || codec.length() == 0);
} }
} }

View File

@ -394,7 +394,7 @@ public class TestHCM {
LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn); LOG.info("Going to cancel connections. connection=" + conn.toString() + ", sn=" + sn);
for (int i = 0; i < 5000; i++) { for (int i = 0; i < 5000; i++) {
rpcClient.cancelConnections(sn.getHostname(), sn.getPort()); rpcClient.cancelConnections(sn);
Thread.sleep(5); Thread.sleep(5);
} }

View File

@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.ScannerCallable; import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.testclassification.FilterTests; import org.apache.hadoop.hbase.testclassification.FilterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -115,7 +115,7 @@ public class FilterTestingCluster {
@BeforeClass @BeforeClass
public static void setUp() throws Exception { public static void setUp() throws Exception {
((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger)AbstractRpcClient.LOG).getLogger().setLevel(Level.ALL);
((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); ((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
TEST_UTIL.startMiniCluster(1); TEST_UTIL.startMiniCluster(1);
initialize(TEST_UTIL.getConfiguration()); initialize(TEST_UTIL.getConfiguration());

View File

@ -89,7 +89,8 @@ public class TestDelayedRpc {
conf, conf,
new FifoRpcScheduler(conf, 1)); new FifoRpcScheduler(conf, 1));
rpcServer.start(); rpcServer.start();
RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try { try {
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
@ -118,7 +119,7 @@ public class TestDelayedRpc {
assertEquals(UNDELAYED, results.get(1).intValue()); assertEquals(UNDELAYED, results.get(1).intValue());
assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED : 0xDEADBEEF); assertEquals(results.get(2).intValue(), delayReturnValue ? DELAYED : 0xDEADBEEF);
} finally { } finally {
rpcClient.stop(); rpcClient.close();
} }
} }
@ -170,7 +171,8 @@ public class TestDelayedRpc {
conf, conf,
new FifoRpcScheduler(conf, 1)); new FifoRpcScheduler(conf, 1));
rpcServer.start(); rpcServer.start();
RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try { try {
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
@ -200,7 +202,7 @@ public class TestDelayedRpc {
log.removeAppender(listAppender); log.removeAppender(listAppender);
} finally { } finally {
rpcClient.stop(); rpcClient.close();
} }
} }
@ -293,7 +295,8 @@ public class TestDelayedRpc {
conf, conf,
new FifoRpcScheduler(conf, 1)); new FifoRpcScheduler(conf, 1));
rpcServer.start(); rpcServer.start();
RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); RpcClient rpcClient = RpcClientFactory.createClient(
conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try { try {
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
@ -323,7 +326,7 @@ public class TestDelayedRpc {
} }
assertTrue(caughtException); assertTrue(caughtException);
} finally { } finally {
rpcClient.stop(); rpcClient.close();
} }
} }

View File

@ -36,7 +36,7 @@ public class TestHBaseClient {
public void testFailedServer(){ public void testFailedServer(){
ManualEnvironmentEdge ee = new ManualEnvironmentEdge(); ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
EnvironmentEdgeManager.injectEdge( ee ); EnvironmentEdgeManager.injectEdge( ee );
RpcClient.FailedServers fs = new RpcClient.FailedServers(new Configuration()); FailedServers fs = new FailedServers(new Configuration());
InetSocketAddress ia = InetSocketAddress.createUnresolved("bad", 12); InetSocketAddress ia = InetSocketAddress.createUnresolved("bad", 12);
InetSocketAddress ia2 = InetSocketAddress.createUnresolved("bad", 12); // same server as ia InetSocketAddress ia2 = InetSocketAddress.createUnresolved("bad", 12); // same server as ia

View File

@ -178,7 +178,7 @@ public class TestIPC {
@Test @Test
public void testNoCodec() throws InterruptedException, IOException { public void testNoCodec() throws InterruptedException, IOException {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
RpcClient client = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT) { RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT) {
@Override @Override
Codec getCodec() { Codec getCodec() {
return null; return null;
@ -197,7 +197,7 @@ public class TestIPC {
// Silly assertion that the message is in the returned pb. // Silly assertion that the message is in the returned pb.
assertTrue(r.getFirst().toString().contains(message)); assertTrue(r.getFirst().toString().contains(message));
} finally { } finally {
client.stop(); client.close();
rpcServer.stop(); rpcServer.stop();
} }
} }
@ -216,10 +216,10 @@ public class TestIPC {
throws IOException, InterruptedException, SecurityException, NoSuchMethodException { throws IOException, InterruptedException, SecurityException, NoSuchMethodException {
Configuration conf = new Configuration(HBaseConfiguration.create()); Configuration conf = new Configuration(HBaseConfiguration.create());
conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName()); conf.set("hbase.client.rpc.compressor", GzipCodec.class.getCanonicalName());
doSimpleTest(conf, new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT)); doSimpleTest(conf, new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT));
} }
private void doSimpleTest(final Configuration conf, final RpcClient client) private void doSimpleTest(final Configuration conf, final RpcClientImpl client)
throws InterruptedException, IOException { throws InterruptedException, IOException {
TestRpcServer rpcServer = new TestRpcServer(); TestRpcServer rpcServer = new TestRpcServer();
List<Cell> cells = new ArrayList<Cell>(); List<Cell> cells = new ArrayList<Cell>();
@ -239,7 +239,7 @@ public class TestIPC {
} }
assertEquals(count, index); assertEquals(count, index);
} finally { } finally {
client.stop(); client.close();
rpcServer.stop(); rpcServer.stop();
} }
} }
@ -258,7 +258,7 @@ public class TestIPC {
}).when(spyFactory).createSocket(); }).when(spyFactory).createSocket();
TestRpcServer rpcServer = new TestRpcServer(); TestRpcServer rpcServer = new TestRpcServer();
RpcClient client = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory); RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT, spyFactory);
try { try {
rpcServer.start(); rpcServer.start();
InetSocketAddress address = rpcServer.getListenerAddress(); InetSocketAddress address = rpcServer.getListenerAddress();
@ -270,7 +270,7 @@ public class TestIPC {
LOG.info("Caught expected exception: " + e.toString()); LOG.info("Caught expected exception: " + e.toString());
assertTrue(StringUtils.stringifyException(e).contains("Injected fault")); assertTrue(StringUtils.stringifyException(e).contains("Injected fault"));
} finally { } finally {
client.stop(); client.close();
rpcServer.stop(); rpcServer.stop();
} }
} }
@ -281,7 +281,7 @@ public class TestIPC {
RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1)); RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
RpcServer rpcServer = new TestRpcServer(scheduler); RpcServer rpcServer = new TestRpcServer(scheduler);
verify(scheduler).init((RpcScheduler.Context) anyObject()); verify(scheduler).init((RpcScheduler.Context) anyObject());
RpcClient client = new RpcClient(CONF, HConstants.CLUSTER_ID_DEFAULT); RpcClientImpl client = new RpcClientImpl(CONF, HConstants.CLUSTER_ID_DEFAULT);
try { try {
rpcServer.start(); rpcServer.start();
verify(scheduler).start(); verify(scheduler).start();
@ -312,7 +312,7 @@ public class TestIPC {
TestRpcServer rpcServer = new TestRpcServer(); TestRpcServer rpcServer = new TestRpcServer();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo"); MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build(); EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
RpcClient client = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT); RpcClientImpl client = new RpcClientImpl(conf, HConstants.CLUSTER_ID_DEFAULT);
KeyValue kv = KeyValueUtil.ensureKeyValue(BIG_CELL); KeyValue kv = KeyValueUtil.ensureKeyValue(BIG_CELL);
Put p = new Put(kv.getRow()); Put p = new Put(kv.getRow());
for (int i = 0; i < cellcount; i++) { for (int i = 0; i < cellcount; i++) {
@ -354,7 +354,7 @@ public class TestIPC {
LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in " + LOG.info("Cycled " + cycles + " time(s) with " + cellcount + " cell(s) in " +
(System.currentTimeMillis() - startTime) + "ms"); (System.currentTimeMillis() - startTime) + "ms");
} finally { } finally {
client.stop(); client.close();
rpcServer.stop(); rpcServer.stop();
} }
} }

View File

@ -112,7 +112,7 @@ public class TestProtoBufRpc {
@Test @Test
public void testProtoBufRpc() throws Exception { public void testProtoBufRpc() throws Exception {
RpcClient rpcClient = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT); RpcClient rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT);
try { try {
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
ServerName.valueOf(this.isa.getHostName(), this.isa.getPort(), System.currentTimeMillis()), ServerName.valueOf(this.isa.getHostName(), this.isa.getPort(), System.currentTimeMillis()),
@ -136,7 +136,7 @@ public class TestProtoBufRpc {
} catch (ServiceException e) { } catch (ServiceException e) {
} }
} finally { } finally {
rpcClient.stop(); rpcClient.close();
} }
} }
} }

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
@ -53,7 +54,7 @@ public class TestHMasterRPCException {
CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf); CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager(conf);
HMaster hm = new HMaster(conf, cp); HMaster hm = new HMaster(conf, cp);
ServerName sm = hm.getServerName(); ServerName sm = hm.getServerName();
RpcClient rpcClient = new RpcClient(conf, HConstants.CLUSTER_ID_DEFAULT); RpcClient rpcClient = RpcClientFactory.createClient(conf, HConstants.CLUSTER_ID_DEFAULT);
try { try {
int i = 0; int i = 0;
//retry the RPC a few times; we have seen SocketTimeoutExceptions if we //retry the RPC a few times; we have seen SocketTimeoutExceptions if we
@ -88,7 +89,7 @@ public class TestHMasterRPCException {
} }
fail(); fail();
} finally { } finally {
rpcClient.stop(); rpcClient.close();
} }
} }
} }

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.testclassification.SecurityTests; import org.apache.hadoop.hbase.testclassification.SecurityTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
@ -99,7 +100,8 @@ public class TestSecureRPC {
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)), Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
isa, conf, new FifoRpcScheduler(conf, 1)); isa, conf, new FifoRpcScheduler(conf, 1));
rpcServer.start(); rpcServer.start();
RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString()); RpcClient rpcClient = RpcClientFactory
.createClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
try { try {
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel( BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(
ServerName.valueOf(rpcServer.getListenerAddress().getHostName(), ServerName.valueOf(rpcServer.getListenerAddress().getHostName(),
@ -115,7 +117,7 @@ public class TestSecureRPC {
assertEquals(0xDEADBEEF, results.get(0).intValue()); assertEquals(0xDEADBEEF, results.get(0).intValue());
} finally { } finally {
rpcClient.stop(); rpcClient.close();
} }
} }
} }

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler; import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.RequestContext; import org.apache.hadoop.hbase.ipc.RequestContext;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface;
@ -400,7 +401,7 @@ public class TestTokenAuthentication {
testuser.doAs(new PrivilegedExceptionAction<Object>() { testuser.doAs(new PrivilegedExceptionAction<Object>() {
public Object run() throws Exception { public Object run() throws Exception {
Configuration c = server.getConfiguration(); Configuration c = server.getConfiguration();
RpcClient rpcClient = new RpcClient(c, clusterId.toString()); RpcClient rpcClient = RpcClientFactory.createClient(c, clusterId.toString());
ServerName sn = ServerName sn =
ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(), ServerName.valueOf(server.getAddress().getHostName(), server.getAddress().getPort(),
System.currentTimeMillis()); System.currentTimeMillis());
@ -416,7 +417,7 @@ public class TestTokenAuthentication {
String authMethod = response.getAuthMethod(); String authMethod = response.getAuthMethod();
assertEquals("TOKEN", authMethod); assertEquals("TOKEN", authMethod);
} finally { } finally {
rpcClient.stop(); rpcClient.close();
} }
return null; return null;
} }

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.ScannerCallable; import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.AbstractRpcClient;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
@ -88,7 +88,7 @@ public class TestFlushSnapshotFromClient {
// Uncomment the following lines if more verbosity is needed for // Uncomment the following lines if more verbosity is needed for
// debugging (see HBASE-12285 for details). // debugging (see HBASE-12285 for details).
//((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL); //((Log4JLogger)RpcServer.LOG).getLogger().setLevel(Level.ALL);
//((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.ALL); //((Log4JLogger)AbstractRpcClient.LOG).getLogger().setLevel(Level.ALL);
//((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL); //((Log4JLogger)ScannerCallable.LOG).getLogger().setLevel(Level.ALL);
setupConf(UTIL.getConfiguration()); setupConf(UTIL.getConfiguration());
UTIL.startMiniCluster(NUM_RS); UTIL.startMiniCluster(NUM_RS);