Revert "HBASE-12597 Add RpcClient interface and enable changing of RpcClient implementation (Jurriaan Mous)"
Revert partial commit of HBASE-12597
This reverts commit 4be2034a26
.
This commit is contained in:
parent
b8ab1b176c
commit
1a27cb7b0f
|
@ -1,177 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.ipc;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.codec.Codec;
|
|
||||||
import org.apache.hadoop.hbase.codec.KeyValueCodec;
|
|
||||||
import org.apache.hadoop.hbase.security.UserProvider;
|
|
||||||
import org.apache.hadoop.hbase.util.PoolMap;
|
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
|
||||||
|
|
||||||
import java.net.SocketAddress;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Provides the basics for a RpcClient implementation like configuration and Logging.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public abstract class AbstractRpcClient implements RpcClient {
|
|
||||||
public static final Log LOG = LogFactory.getLog(AbstractRpcClient.class);
|
|
||||||
|
|
||||||
protected final Configuration conf;
|
|
||||||
protected String clusterId;
|
|
||||||
protected final SocketAddress localAddr;
|
|
||||||
|
|
||||||
protected UserProvider userProvider;
|
|
||||||
protected final IPCUtil ipcUtil;
|
|
||||||
|
|
||||||
protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
|
|
||||||
// time (in ms), it will be closed at any moment.
|
|
||||||
protected final int maxRetries; //the max. no. of retries for socket connections
|
|
||||||
protected final long failureSleep; // Time to sleep before retry on failure.
|
|
||||||
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
|
|
||||||
protected final boolean tcpKeepAlive; // if T then use keepalives
|
|
||||||
protected final Codec codec;
|
|
||||||
protected final CompressionCodec compressor;
|
|
||||||
protected final boolean fallbackAllowed;
|
|
||||||
|
|
||||||
protected final int connectTO;
|
|
||||||
protected final int readTO;
|
|
||||||
protected final int writeTO;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Construct an IPC client for the cluster <code>clusterId</code>
|
|
||||||
*
|
|
||||||
* @param conf configuration
|
|
||||||
* @param clusterId the cluster id
|
|
||||||
* @param localAddr client socket bind address.
|
|
||||||
*/
|
|
||||||
public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
|
|
||||||
this.userProvider = UserProvider.instantiate(conf);
|
|
||||||
this.localAddr = localAddr;
|
|
||||||
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
|
|
||||||
this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT;
|
|
||||||
this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
|
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
|
|
||||||
this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
|
|
||||||
this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
|
|
||||||
this.ipcUtil = new IPCUtil(conf);
|
|
||||||
|
|
||||||
this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
|
|
||||||
this.conf = conf;
|
|
||||||
this.codec = getCodec();
|
|
||||||
this.compressor = getCompressor(conf);
|
|
||||||
this.fallbackAllowed = conf.getBoolean(IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
|
|
||||||
IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
|
|
||||||
this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
|
|
||||||
this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
|
|
||||||
this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
|
|
||||||
|
|
||||||
// login the server principal (if using secure Hadoop)
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor +
|
|
||||||
", tcpKeepAlive=" + this.tcpKeepAlive +
|
|
||||||
", tcpNoDelay=" + this.tcpNoDelay +
|
|
||||||
", connectTO=" + this.connectTO +
|
|
||||||
", readTO=" + this.readTO +
|
|
||||||
", writeTO=" + this.writeTO +
|
|
||||||
", minIdleTimeBeforeClose=" + this.minIdleTimeBeforeClose +
|
|
||||||
", maxRetries=" + this.maxRetries +
|
|
||||||
", fallbackAllowed=" + this.fallbackAllowed +
|
|
||||||
", bind address=" + (this.localAddr != null ? this.localAddr : "null"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public static String getDefaultCodec(final Configuration c) {
|
|
||||||
// If "hbase.client.default.rpc.codec" is empty string -- you can't set it to null because
|
|
||||||
// Configuration will complain -- then no default codec (and we'll pb everything). Else
|
|
||||||
// default is KeyValueCodec
|
|
||||||
return c.get(DEFAULT_CODEC_CLASS, KeyValueCodec.class.getCanonicalName());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Encapsulate the ugly casting and RuntimeException conversion in private method.
|
|
||||||
* @return Codec to use on this client.
|
|
||||||
*/
|
|
||||||
Codec getCodec() {
|
|
||||||
// For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
|
|
||||||
// "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
|
|
||||||
String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
|
|
||||||
if (className == null || className.length() == 0) return null;
|
|
||||||
try {
|
|
||||||
return (Codec)Class.forName(className).newInstance();
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException("Failed getting codec " + className, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Encapsulate the ugly casting and RuntimeException conversion in private method.
|
|
||||||
* @param conf configuration
|
|
||||||
* @return The compressor to use on this client.
|
|
||||||
*/
|
|
||||||
private static CompressionCodec getCompressor(final Configuration conf) {
|
|
||||||
String className = conf.get("hbase.client.rpc.compressor", null);
|
|
||||||
if (className == null || className.isEmpty()) return null;
|
|
||||||
try {
|
|
||||||
return (CompressionCodec)Class.forName(className).newInstance();
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException("Failed getting compressor " + className, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the pool type specified in the configuration, which must be set to
|
|
||||||
* either {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
|
|
||||||
* {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal},
|
|
||||||
* otherwise default to the former.
|
|
||||||
*
|
|
||||||
* For applications with many user threads, use a small round-robin pool. For
|
|
||||||
* applications with few user threads, you may want to try using a
|
|
||||||
* thread-local pool. In any case, the number of {@link org.apache.hadoop.hbase.ipc.RpcClient}
|
|
||||||
* instances should not exceed the operating system's hard limit on the number of
|
|
||||||
* connections.
|
|
||||||
*
|
|
||||||
* @param config configuration
|
|
||||||
* @return either a {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin} or
|
|
||||||
* {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#ThreadLocal}
|
|
||||||
*/
|
|
||||||
protected static PoolMap.PoolType getPoolType(Configuration config) {
|
|
||||||
return PoolMap.PoolType
|
|
||||||
.valueOf(config.get(HConstants.HBASE_CLIENT_IPC_POOL_TYPE), PoolMap.PoolType.RoundRobin,
|
|
||||||
PoolMap.PoolType.ThreadLocal);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the pool size specified in the configuration, which is applicable only if
|
|
||||||
* the pool type is {@link org.apache.hadoop.hbase.util.PoolMap.PoolType#RoundRobin}.
|
|
||||||
*
|
|
||||||
* @param config configuration
|
|
||||||
* @return the maximum pool size
|
|
||||||
*/
|
|
||||||
protected static int getPoolSize(Configuration config) {
|
|
||||||
return config.getInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 1);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,127 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.ipc;
|
|
||||||
|
|
||||||
import com.google.protobuf.Descriptors;
|
|
||||||
import com.google.protobuf.Message;
|
|
||||||
import org.apache.hadoop.hbase.CellScanner;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
/** A call waiting for a value. */
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class Call {
|
|
||||||
final int id; // call id
|
|
||||||
final Message param; // rpc request method param object
|
|
||||||
/**
|
|
||||||
* Optionally has cells when making call. Optionally has cells set on response. Used
|
|
||||||
* passing cells to the rpc and receiving the response.
|
|
||||||
*/
|
|
||||||
CellScanner cells;
|
|
||||||
Message response; // value, null if error
|
|
||||||
// The return type. Used to create shell into which we deserialize the response if any.
|
|
||||||
Message responseDefaultType;
|
|
||||||
IOException error; // exception, null if value
|
|
||||||
volatile boolean done; // true when call is done
|
|
||||||
long startTime;
|
|
||||||
final Descriptors.MethodDescriptor md;
|
|
||||||
final int timeout; // timeout in millisecond for this call; 0 means infinite.
|
|
||||||
|
|
||||||
protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
|
|
||||||
final CellScanner cells, final Message responseDefaultType, int timeout) {
|
|
||||||
this.param = param;
|
|
||||||
this.md = md;
|
|
||||||
this.cells = cells;
|
|
||||||
this.startTime = EnvironmentEdgeManager.currentTime();
|
|
||||||
this.responseDefaultType = responseDefaultType;
|
|
||||||
this.id = id;
|
|
||||||
this.timeout = timeout;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if the call did timeout. Set an exception (includes a notify) if it's the case.
|
|
||||||
* @return true if the call is on timeout, false otherwise.
|
|
||||||
*/
|
|
||||||
public boolean checkAndSetTimeout() {
|
|
||||||
if (timeout == 0){
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
long waitTime = EnvironmentEdgeManager.currentTime() - getStartTime();
|
|
||||||
if (waitTime >= timeout) {
|
|
||||||
IOException ie = new CallTimeoutException("Call id=" + id +
|
|
||||||
", waitTime=" + waitTime + ", operationTimeout=" + timeout + " expired.");
|
|
||||||
setException(ie); // includes a notify
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public int remainingTime() {
|
|
||||||
if (timeout == 0) {
|
|
||||||
return Integer.MAX_VALUE;
|
|
||||||
}
|
|
||||||
|
|
||||||
int remaining = timeout - (int) (EnvironmentEdgeManager.currentTime() - getStartTime());
|
|
||||||
return remaining > 0 ? remaining : 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "callId: " + this.id + " methodName: " + this.md.getName() + " param {" +
|
|
||||||
(this.param != null? ProtobufUtil.getShortTextFormat(this.param): "") + "}";
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Indicate when the call is complete and the
|
|
||||||
* value or error are available. Notifies by default. */
|
|
||||||
protected synchronized void callComplete() {
|
|
||||||
this.done = true;
|
|
||||||
notify(); // notify caller
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Set the exception when there is an error.
|
|
||||||
* Notify the caller the call is done.
|
|
||||||
*
|
|
||||||
* @param error exception thrown by the call; either local or remote
|
|
||||||
*/
|
|
||||||
public void setException(IOException error) {
|
|
||||||
this.error = error;
|
|
||||||
callComplete();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the return value when there is no error.
|
|
||||||
* Notify the caller the call is done.
|
|
||||||
*
|
|
||||||
* @param response return value of the call.
|
|
||||||
* @param cells Can be null
|
|
||||||
*/
|
|
||||||
public void setResponse(Message response, final CellScanner cells) {
|
|
||||||
this.response = response;
|
|
||||||
this.cells = cells;
|
|
||||||
callComplete();
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getStartTime() {
|
|
||||||
return this.startTime;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,35 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.ipc;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Client-side call timeout
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("serial")
|
|
||||||
@InterfaceAudience.Public
|
|
||||||
@InterfaceStability.Evolving
|
|
||||||
public class CallTimeoutException extends IOException {
|
|
||||||
public CallTimeoutException(final String msg) {
|
|
||||||
super(msg);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,78 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.ipc;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.security.User;
|
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class holds the address and the user ticket, etc. The client connections
|
|
||||||
* to servers are uniquely identified by <remoteAddress, ticket, serviceName>
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class ConnectionId {
|
|
||||||
final InetSocketAddress address;
|
|
||||||
final User ticket;
|
|
||||||
private static final int PRIME = 16777619;
|
|
||||||
final String serviceName;
|
|
||||||
|
|
||||||
public ConnectionId(User ticket, String serviceName, InetSocketAddress address) {
|
|
||||||
this.address = address;
|
|
||||||
this.ticket = ticket;
|
|
||||||
this.serviceName = serviceName;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getServiceName() {
|
|
||||||
return this.serviceName;
|
|
||||||
}
|
|
||||||
|
|
||||||
public InetSocketAddress getAddress() {
|
|
||||||
return address;
|
|
||||||
}
|
|
||||||
|
|
||||||
public User getTicket() {
|
|
||||||
return ticket;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return this.address.toString() + "/" + this.serviceName + "/" + this.ticket;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object obj) {
|
|
||||||
if (obj instanceof ConnectionId) {
|
|
||||||
ConnectionId id = (ConnectionId) obj;
|
|
||||||
return address.equals(id.address) &&
|
|
||||||
((ticket != null && ticket.equals(id.ticket)) ||
|
|
||||||
(ticket == id.ticket)) &&
|
|
||||||
this.serviceName == id.serviceName;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override // simply use the default Object#hashcode() ?
|
|
||||||
public int hashCode() {
|
|
||||||
int hashcode = (address.hashCode() +
|
|
||||||
PRIME * (PRIME * this.serviceName.hashCode() ^
|
|
||||||
(ticket == null ? 0 : ticket.hashCode())));
|
|
||||||
return hashcode;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,37 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.ipc;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HBaseIOException;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Indicates that we're trying to connect to a already known as dead server. We will want to
|
|
||||||
* retry: we're getting this because the region location was wrong, or because
|
|
||||||
* the server just died, in which case the retry loop will help us to wait for the
|
|
||||||
* regions to recover.
|
|
||||||
*/
|
|
||||||
@SuppressWarnings("serial")
|
|
||||||
@InterfaceAudience.Public
|
|
||||||
@InterfaceStability.Evolving
|
|
||||||
public class FailedServerException extends HBaseIOException {
|
|
||||||
public FailedServerException(String s) {
|
|
||||||
super(s);
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,79 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.ipc;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.LinkedList;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A class to manage a list of servers that failed recently.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class FailedServers {
|
|
||||||
private final LinkedList<Pair<Long, String>> failedServers = new
|
|
||||||
LinkedList<Pair<Long, String>>();
|
|
||||||
private final int recheckServersTimeout;
|
|
||||||
|
|
||||||
public FailedServers(Configuration conf) {
|
|
||||||
this.recheckServersTimeout = conf.getInt(
|
|
||||||
RpcClient.FAILED_SERVER_EXPIRY_KEY, RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Add an address to the list of the failed servers list.
|
|
||||||
*/
|
|
||||||
public synchronized void addToFailedServers(InetSocketAddress address) {
|
|
||||||
final long expiry = EnvironmentEdgeManager.currentTime() + recheckServersTimeout;
|
|
||||||
failedServers.addFirst(new Pair<Long, String>(expiry, address.toString()));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if the server should be considered as bad. Clean the old entries of the list.
|
|
||||||
*
|
|
||||||
* @return true if the server is in the failed servers list
|
|
||||||
*/
|
|
||||||
public synchronized boolean isFailedServer(final InetSocketAddress address) {
|
|
||||||
if (failedServers.isEmpty()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
final String lookup = address.toString();
|
|
||||||
final long now = EnvironmentEdgeManager.currentTime();
|
|
||||||
|
|
||||||
// iterate, looking for the search entry and cleaning expired entries
|
|
||||||
Iterator<Pair<Long, String>> it = failedServers.iterator();
|
|
||||||
while (it.hasNext()) {
|
|
||||||
Pair<Long, String> cur = it.next();
|
|
||||||
if (cur.getFirst() < now) {
|
|
||||||
it.remove();
|
|
||||||
} else {
|
|
||||||
if (lookup.equals(cur.getSecond())) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,70 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.ipc;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
|
||||||
|
|
||||||
import java.net.SocketAddress;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Factory to create a {@link org.apache.hadoop.hbase.ipc.RpcClient}
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public final class RpcClientFactory {
|
|
||||||
|
|
||||||
public static final String CUSTOM_RPC_CLIENT_IMPL_CONF_KEY = "hbase.rpc.client.impl";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Private Constructor
|
|
||||||
*/
|
|
||||||
private RpcClientFactory() {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new RpcClient by the class defined in the configuration or falls back to
|
|
||||||
* RpcClientImpl
|
|
||||||
* @param conf configuration
|
|
||||||
* @param clusterId the cluster id
|
|
||||||
* @return newly created RpcClient
|
|
||||||
*/
|
|
||||||
public static RpcClient createClient(Configuration conf, String clusterId) {
|
|
||||||
return createClient(conf, clusterId, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new RpcClient by the class defined in the configuration or falls back to
|
|
||||||
* RpcClientImpl
|
|
||||||
* @param conf configuration
|
|
||||||
* @param clusterId the cluster id
|
|
||||||
* @param localAddr client socket bind address.
|
|
||||||
* @return newly created RpcClient
|
|
||||||
*/
|
|
||||||
public static RpcClient createClient(Configuration conf, String clusterId,
|
|
||||||
SocketAddress localAddr) {
|
|
||||||
String rpcClientClass =
|
|
||||||
conf.get(CUSTOM_RPC_CLIENT_IMPL_CONF_KEY,
|
|
||||||
RpcClientImpl.class.getName());
|
|
||||||
return ReflectionUtils.instantiateWithCustomCtor(
|
|
||||||
rpcClientClass,
|
|
||||||
new Class[] { Configuration.class, String.class, SocketAddress.class },
|
|
||||||
new Object[] { conf, clusterId, localAddr }
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue