HBASE-15795 Cleanup all classes in package org.apache.hadoop.hbase.ipc

for code style

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Jurriaan Mous 2016-05-07 14:53:59 +02:00 committed by stack
parent 99e2deb86f
commit e71ae601f7
18 changed files with 224 additions and 213 deletions

View File

@ -141,7 +141,9 @@ public abstract class AbstractRpcClient implements RpcClient {
// For NO CODEC, "hbase.client.rpc.codec" must be configured with empty string AND
// "hbase.client.default.rpc.codec" also -- because default is to do cell block encoding.
String className = conf.get(HConstants.RPC_CODEC_CONF_KEY, getDefaultCodec(this.conf));
if (className == null || className.length() == 0) return null;
if (className == null || className.length() == 0) {
return null;
}
try {
return (Codec)Class.forName(className).newInstance();
} catch (Exception e) {
@ -161,9 +163,11 @@ public abstract class AbstractRpcClient implements RpcClient {
*/
private static CompressionCodec getCompressor(final Configuration conf) {
String className = conf.get("hbase.client.rpc.compressor", null);
if (className == null || className.isEmpty()) return null;
if (className == null || className.isEmpty()) {
return null;
}
try {
return (CompressionCodec)Class.forName(className).newInstance();
return (CompressionCodec)Class.forName(className).newInstance();
} catch (Exception e) {
throw new RuntimeException("Failed getting compressor " + className, e);
}
@ -252,8 +256,8 @@ public abstract class AbstractRpcClient implements RpcClient {
* will be a
* new Connection each time.
* @return A pair with the Message response and the Cell data (if any).
* @throws InterruptedException
* @throws java.io.IOException
* @throws InterruptedException if call is interrupted
* @throws java.io.IOException if transport failed
*/
protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,

View File

@ -17,6 +17,13 @@
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.Message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.io.IOException;
import org.apache.hadoop.hbase.CellScanner;
@ -25,13 +32,6 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
import org.apache.hadoop.ipc.RemoteException;
import com.google.protobuf.Message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* Handles Hbase responses
*/

View File

@ -18,13 +18,13 @@
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.RpcCallback;
import java.io.IOException;
import java.io.InterruptedIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import com.google.protobuf.RpcCallback;
/**
* Simple {@link RpcCallback} implementation providing a
* {@link java.util.concurrent.Future}-like {@link BlockingRpcCallback#get()} method, which

View File

@ -19,14 +19,15 @@ package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import java.io.IOException;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import java.io.IOException;
/** A call waiting for a value. */
@InterfaceAudience.Private
public class Call {

View File

@ -17,11 +17,11 @@
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import java.io.IOException;
/**
* Client-side call timeout
*/

View File

@ -17,11 +17,11 @@
*/
package org.apache.hadoop.hbase.ipc;
import java.net.InetSocketAddress;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.security.User;
import java.net.InetSocketAddress;
/**
* This class holds the address and the user ticket, etc. The client connections
* to servers are uniquely identified by &lt;remoteAddress, ticket, serviceName&gt;
@ -58,14 +58,14 @@ public class ConnectionId {
@Override
public boolean equals(Object obj) {
if (obj instanceof ConnectionId) {
ConnectionId id = (ConnectionId) obj;
return address.equals(id.address) &&
if (obj instanceof ConnectionId) {
ConnectionId id = (ConnectionId) obj;
return address.equals(id.address) &&
((ticket != null && ticket.equals(id.ticket)) ||
(ticket == id.ticket)) &&
this.serviceName == id.serviceName;
}
return false;
}
return false;
}
@Override // simply use the default Object#hashcode() ?

View File

@ -34,7 +34,7 @@ public final class CoprocessorRpcUtils {
* We assume that all HBase protobuf services share a common package name
* (defined in the .proto files).
*/
private static String hbaseServicePackage;
private static final String hbaseServicePackage;
static {
Descriptors.ServiceDescriptor clientService = ClientProtos.ClientService.getDescriptor();
hbaseServicePackage = clientService.getFullName()

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
*/
@InterfaceAudience.Private
public class DelegatingPayloadCarryingRpcController extends PayloadCarryingRpcController {
private PayloadCarryingRpcController delegate;
private final PayloadCarryingRpcController delegate;
public DelegatingPayloadCarryingRpcController(PayloadCarryingRpcController delegate) {
this.delegate = delegate;

View File

@ -17,22 +17,21 @@
*/
package org.apache.hadoop.hbase.ipc;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.LinkedList;
/**
* A class to manage a list of servers that failed recently.
*/
@InterfaceAudience.Private
public class FailedServers {
private final LinkedList<Pair<Long, String>> failedServers = new
LinkedList<Pair<Long, String>>();
private final LinkedList<Pair<Long, String>> failedServers = new LinkedList<>();
private final int recheckServersTimeout;
public FailedServers(Configuration conf) {
@ -45,7 +44,7 @@ public class FailedServers {
*/
public synchronized void addToFailedServers(InetSocketAddress address) {
final long expiry = EnvironmentEdgeManager.currentTime() + recheckServersTimeout;
failedServers.addFirst(new Pair<Long, String>(expiry, address.toString()));
failedServers.addFirst(new Pair<>(expiry, address.toString()));
}
/**

View File

@ -17,21 +17,22 @@
*/
package org.apache.hadoop.hbase.ipc;
import java.io.DataInput;
import com.google.common.base.Preconditions;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.BoundedByteBufferPool;
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
@ -45,10 +46,6 @@ import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import com.google.common.base.Preconditions;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message;
/**
* Utility to help ipc'ing.
*/
@ -83,13 +80,13 @@ public class IPCUtil {
/**
* Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
* <code>compressor</code>.
* @param codec
* @param compressor
* @param cellScanner
* @param codec to use for encoding
* @param compressor to use for encoding
* @param cellScanner to encode
* @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
* passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
* flipped and is ready for reading. Use limit to find total size.
* @throws IOException
* passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
* flipped and is ready for reading. Use limit to find total size.
* @throws IOException if encoding the cells fail
*/
@SuppressWarnings("resource")
public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
@ -101,26 +98,30 @@ public class IPCUtil {
/**
* Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
* <code>compressor</code>.
* @param codec
* @param compressor
* @param cellScanner
* @param codec to use for encoding
* @param compressor to use for encoding
* @param cellScanner to encode
* @param pool Pool of ByteBuffers to make use of. Can be null and then we'll allocate
* our own ByteBuffer.
* our own ByteBuffer.
* @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
* passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
* flipped and is ready for reading. Use limit to find total size. If <code>pool</code> was not
* null, then this returned ByteBuffer came from there and should be returned to the pool when
* done.
* @throws IOException
* passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
* flipped and is ready for reading. Use limit to find total size. If <code>pool</code> was not
* null, then this returned ByteBuffer came from there and should be returned to the pool when
* done.
* @throws IOException if encoding the cells fail
*/
@SuppressWarnings("resource")
public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
final CellScanner cellScanner, final BoundedByteBufferPool pool)
throws IOException {
if (cellScanner == null) return null;
if (codec == null) throw new CellScannerButNoCodecException();
if (cellScanner == null) {
return null;
}
if (codec == null) {
throw new CellScannerButNoCodecException();
}
int bufferSize = this.cellBlockBuildingInitialBufferSize;
ByteBufferOutputStream baos = null;
ByteBufferOutputStream baos;
if (pool != null) {
ByteBuffer bb = pool.getBuffer();
bufferSize = bb.capacity();
@ -137,15 +138,17 @@ public class IPCUtil {
}
baos = new ByteBufferOutputStream(bufferSize);
}
OutputStream os = baos;
Compressor poolCompressor = null;
try {
try (OutputStream os = baos) {
OutputStream os2Compress = os;
if (compressor != null) {
if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
if (compressor instanceof Configurable) {
((Configurable) compressor).setConf(this.conf);
}
poolCompressor = CodecPool.getCompressor(compressor);
os = compressor.createOutputStream(os, poolCompressor);
os2Compress = compressor.createOutputStream(os, poolCompressor);
}
Codec.Encoder encoder = codec.getEncoder(os);
Codec.Encoder encoder = codec.getEncoder(os2Compress);
int count = 0;
while (cellScanner.advance()) {
encoder.write(cellScanner.current());
@ -154,12 +157,15 @@ public class IPCUtil {
encoder.flush();
// If no cells, don't mess around. Just return null (could be a bunch of existence checking
// gets or something -- stuff that does not return a cell).
if (count == 0) return null;
if (count == 0) {
return null;
}
} catch (BufferOverflowException e) {
throw new DoNotRetryIOException(e);
} finally {
os.close();
if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
if (poolCompressor != null) {
CodecPool.returnCompressor(poolCompressor);
}
}
if (LOG.isTraceEnabled()) {
if (bufferSize < baos.size()) {
@ -171,10 +177,10 @@ public class IPCUtil {
}
/**
* @param codec
* @param cellBlock
* @param codec to use for cellblock
* @param cellBlock to encode
* @return CellScanner to work against the content of <code>cellBlock</code>
* @throws IOException
* @throws IOException if encoding fails
*/
public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
final byte[] cellBlock) throws IOException {
@ -191,12 +197,12 @@ public class IPCUtil {
}
/**
* @param codec
* @param codec to use for cellblock
* @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
* position()'ed at the start of the cell block and limit()'ed at the end.
* position()'ed at the start of the cell block and limit()'ed at the end.
* @return CellScanner to work against the content of <code>cellBlock</code>.
* All cells created out of the CellScanner will share the same ByteBuffer being passed.
* @throws IOException
* All cells created out of the CellScanner will share the same ByteBuffer being passed.
* @throws IOException if cell encoding fails
*/
public CellScanner createCellScannerReusingBuffers(final Codec codec,
final CompressionCodec compressor, ByteBuffer cellBlock) throws IOException {
@ -212,11 +218,13 @@ public class IPCUtil {
private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock)
throws IOException {
// GZIPCodec fails w/ NPE if no configuration.
if (compressor instanceof Configurable) ((Configurable) compressor).setConf(this.conf);
if (compressor instanceof Configurable) {
((Configurable) compressor).setConf(this.conf);
}
Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock),
poolDecompressor);
ByteBufferOutputStream bbos = null;
ByteBufferOutputStream bbos;
try {
// TODO: This is ugly. The buffer will be resized on us if we guess wrong.
// TODO: Reuse buffers.
@ -231,34 +239,14 @@ public class IPCUtil {
return cellBlock;
}
/**
* @param m Message to serialize delimited; i.e. w/ a vint of its size preceeding its
* serialization.
* @return The passed in Message serialized with delimiter. Return null if <code>m</code> is null
* @throws IOException
*/
public static ByteBuffer getDelimitedMessageAsByteBuffer(final Message m) throws IOException {
if (m == null) return null;
int serializedSize = m.getSerializedSize();
int vintSize = CodedOutputStream.computeRawVarint32Size(serializedSize);
byte [] buffer = new byte[serializedSize + vintSize];
// Passing in a byte array saves COS creating a buffer which it does when using streams.
CodedOutputStream cos = CodedOutputStream.newInstance(buffer);
// This will write out the vint preamble and the message serialized.
cos.writeMessageNoTag(m);
cos.flush();
cos.checkNoSpaceLeft();
return ByteBuffer.wrap(buffer);
}
/**
* Write out header, param, and cell block if there is one.
* @param dos
* @param header
* @param param
* @param cellBlock
* @param dos Stream to write into
* @param header to write
* @param param to write
* @param cellBlock to write
* @return Total number of bytes written.
* @throws IOException
* @throws IOException if write action fails
*/
public static int write(final OutputStream dos, final Message header, final Message param,
final ByteBuffer cellBlock)
@ -267,7 +255,9 @@ public class IPCUtil {
// swoop. This is dictated by how the server is currently written. Server needs to change
// if we are to be able to write without the length prefixing.
int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
if (cellBlock != null) totalSize += cellBlock.remaining();
if (cellBlock != null) {
totalSize += cellBlock.remaining();
}
return write(dos, header, param, cellBlock, totalSize);
}
@ -278,36 +268,25 @@ public class IPCUtil {
dos.write(Bytes.toBytes(totalSize));
// This allocates a buffer that is the size of the message internally.
header.writeDelimitedTo(dos);
if (param != null) param.writeDelimitedTo(dos);
if (cellBlock != null) dos.write(cellBlock.array(), 0, cellBlock.remaining());
if (param != null) {
param.writeDelimitedTo(dos);
}
if (cellBlock != null) {
dos.write(cellBlock.array(), 0, cellBlock.remaining());
}
dos.flush();
return totalSize;
}
/**
* Read in chunks of 8K (HBASE-7239)
* @param in
* @param dest
* @param offset
* @param len
* @throws IOException
*/
public static void readChunked(final DataInput in, byte[] dest, int offset, int len)
throws IOException {
int maxRead = 8192;
for (; offset < len; offset += maxRead) {
in.readFully(dest, offset, Math.min(len - offset, maxRead));
}
}
/**
* @return Size on the wire when the two messages are written with writeDelimitedTo
*/
public static int getTotalSizeWhenWrittenDelimited(Message ... messages) {
int totalSize = 0;
for (Message m: messages) {
if (m == null) continue;
if (m == null) {
continue;
}
totalSize += m.getSerializedSize();
totalSize += CodedOutputStream.computeRawVarint32Size(m.getSerializedSize());
}

View File

@ -15,9 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import java.io.IOException;
import org.apache.commons.logging.Log;
@ -29,10 +32,6 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
/**
* Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s
* against the active master. An instance of this class may be obtained

View File

@ -15,9 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import java.io.IOException;
import org.apache.commons.logging.Log;
@ -32,10 +35,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
/**
* Provides clients with an RPC connection to call coprocessor endpoint {@link com.google.protobuf.Service}s
* against a given table region. An instance of this class may be obtained
@ -52,10 +51,10 @@ public class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
private final TableName table;
private final byte[] row;
private byte[] lastRegion;
private int operationTimeout;
private final int operationTimeout;
private RpcRetryingCallerFactory rpcCallerFactory;
private RpcControllerFactory rpcControllerFactory;
private final RpcRetryingCallerFactory rpcCallerFactory;
private final RpcControllerFactory rpcControllerFactory;
/**
* Constructor
@ -105,7 +104,7 @@ public class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
};
CoprocessorServiceResponse result = rpcCallerFactory.<CoprocessorServiceResponse> newCaller()
.callWithRetries(callable, operationTimeout);
Message response = null;
Message response;
if (result.getValue().hasValue()) {
Message.Builder builder = responsePrototype.newBuilderForType();
ProtobufUtil.mergeFrom(builder, result.getValue().getValue());

View File

@ -11,28 +11,29 @@
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
/**
* Provides clients with an RPC connection to call coprocessor endpoint
* {@link com.google.protobuf.Service}s against a given region server. An instance of this class may
* be obtained by calling {@link org.apache.hadoop.hbase.client.HBaseAdmin#coprocessorService(ServerName)},
* but should normally only be used in creating a new {@link com.google.protobuf.Service} stub to
* call the endpoint methods.
* be obtained by calling {@link org.apache.hadoop.hbase.client.HBaseAdmin#
* coprocessorService(ServerName)}, but should normally only be used in creating a new
* {@link com.google.protobuf.Service} stub to call the endpoint methods.
* @see org.apache.hadoop.hbase.client.HBaseAdmin#coprocessorService(ServerName)
*/
@InterfaceAudience.Private
@ -59,7 +60,7 @@ public class RegionServerCoprocessorRpcChannel extends SyncCoprocessorRpcChannel
// TODO: Are we retrying here? Does not seem so. We should use RetryingRpcCaller
CoprocessorServiceResponse result =
ProtobufUtil.execRegionServerService(controller, connection.getClient(serverName), call);
Message response = null;
Message response;
if (result.getValue().hasValue()) {
Message.Builder builder = responsePrototype.newBuilderForType();
ProtobufUtil.mergeFrom(builder, result.getValue().getValue());

View File

@ -18,13 +18,14 @@
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.BlockingRpcChannel;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.security.User;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.security.User;
/**
* Interface for RpcClient implementations so ConnectionManager can handle it.
*/

View File

@ -18,13 +18,14 @@
package org.apache.hadoop.hbase.ipc;
import com.google.common.annotations.VisibleForTesting;
import java.net.SocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import java.net.SocketAddress;
/**
* Factory to create a {@link org.apache.hadoop.hbase.ipc.RpcClient}
*/
@ -74,7 +75,7 @@ public final class RpcClientFactory {
return ReflectionUtils.instantiateWithCustomCtor(
rpcClientClass,
new Class[] { Configuration.class, String.class, SocketAddress.class,
MetricsConnection.class },
MetricsConnection.class },
new Object[] { conf, clusterId, localAddr, metrics }
);
}

View File

@ -1,5 +1,4 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -16,9 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.Message.Builder;
import com.google.protobuf.RpcCallback;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.Closeable;
@ -96,12 +100,6 @@ import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.Message.Builder;
import com.google.protobuf.RpcCallback;
/**
* Does RPC against a cluster. Manages connections per regionserver in the cluster.
* <p>See HBaseServer
@ -160,25 +158,25 @@ public class RpcClientImpl extends AbstractRpcClient {
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
protected class Connection extends Thread {
private ConnectionHeader header; // connection header
private final ConnectionHeader header; // connection header
protected ConnectionId remoteId;
protected Socket socket = null; // connected socket
protected DataInputStream in;
protected DataOutputStream out;
private Object outLock = new Object();
private InetSocketAddress server; // server ip:port
private final Object outLock = new Object();
private final InetSocketAddress server; // server ip:port
private String serverPrincipal; // server's krb5 principal name
private AuthMethod authMethod; // authentication method
private boolean useSasl;
private Token<? extends TokenIdentifier> token;
private HBaseSaslRpcClient saslRpcClient;
private int reloginMaxBackoff; // max pause before relogin on sasl failure
private final int reloginMaxBackoff; // max pause before relogin on sasl failure
private final Codec codec;
private final CompressionCodec compressor;
// currently active calls
protected final ConcurrentSkipListMap<Integer, Call> calls =
new ConcurrentSkipListMap<Integer, Call>();
new ConcurrentSkipListMap<>();
protected final AtomicBoolean shouldCloseConnection = new AtomicBoolean();
protected final CallSender callSender;
@ -228,7 +226,7 @@ public class RpcClientImpl extends AbstractRpcClient {
CallSender(String name, Configuration conf) {
int queueSize = conf.getInt("hbase.ipc.client.write.queueSize", 1000);
callsToWrite = new ArrayBlockingQueue<CallFuture>(queueSize);
callsToWrite = new ArrayBlockingQueue<>(queueSize);
setDaemon(true);
setName(name + " - writer");
}
@ -438,21 +436,27 @@ public class RpcClientImpl extends AbstractRpcClient {
socket.getOutputStream().close();
}
} catch (IOException ignored) { // Can happen if the socket is already closed
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
if (LOG.isTraceEnabled()){
LOG.trace("ignored", ignored);
}
}
try {
if (socket.getInputStream() != null) {
socket.getInputStream().close();
}
} catch (IOException ignored) { // Can happen if the socket is already closed
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
if (LOG.isTraceEnabled()){
LOG.trace("ignored", ignored);
}
}
try {
if (socket.getChannel() != null) {
socket.getChannel().close();
}
} catch (IOException ignored) { // Can happen if the socket is already closed
if (LOG.isTraceEnabled()) LOG.trace("ignored", ignored);
if (LOG.isTraceEnabled()){
LOG.trace("ignored", ignored);
}
}
try {
socket.close();
@ -665,8 +669,8 @@ public class RpcClientImpl extends AbstractRpcClient {
return null;
} else {
String msg = "Couldn't setup connection for " +
UserGroupInformation.getLoginUser().getUserName() +
" to " + serverPrincipal;
UserGroupInformation.getLoginUser().getUserName() +
" to " + serverPrincipal;
LOG.warn(msg);
throw (IOException) new IOException(msg).initCause(ex);
}
@ -735,7 +739,9 @@ public class RpcClientImpl extends AbstractRpcClient {
}
}
boolean continueSasl;
if (ticket == null) throw new FatalConnectionException("ticket/user is null");
if (ticket == null){
throw new FatalConnectionException("ticket/user is null");
}
try {
continueSasl = ticket.doAs(new PrivilegedExceptionAction<Boolean>() {
@Override
@ -868,11 +874,8 @@ public class RpcClientImpl extends AbstractRpcClient {
}
protected void tracedWriteRequest(Call call, int priority, Span span) throws IOException {
TraceScope ts = Trace.continueSpan(span);
try {
try (TraceScope ignored = Trace.continueSpan(span)) {
writeRequest(call, priority, span);
} finally {
ts.close();
}
}
@ -903,7 +906,7 @@ public class RpcClientImpl extends AbstractRpcClient {
if (priority != PayloadCarryingRpcController.PRIORITY_UNSET) {
builder.setPriority(priority);
}
RequestHeader header = builder.build();
RequestHeader requestHeader = builder.build();
setupIOstreams();
@ -913,13 +916,15 @@ public class RpcClientImpl extends AbstractRpcClient {
checkIsOpen();
IOException writeException = null;
synchronized (this.outLock) {
if (Thread.interrupted()) throw new InterruptedIOException();
if (Thread.interrupted()){
throw new InterruptedIOException();
}
calls.put(call.id, call); // We put first as we don't want the connection to become idle.
checkIsOpen(); // Now we're checking that it didn't became idle in between.
try {
call.callStats.setRequestSizeBytes(IPCUtil.write(this.out, header, call.param,
call.callStats.setRequestSizeBytes(IPCUtil.write(this.out, requestHeader, call.param,
cellBlock));
} catch (IOException e) {
// We set the value inside the synchronized block, this way the next in line
@ -941,7 +946,9 @@ public class RpcClientImpl extends AbstractRpcClient {
doNotify();
// Now that we notified, we can rethrow the exception if any. Otherwise we're good.
if (writeException != null) throw writeException;
if (writeException != null){
throw writeException;
}
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
@ -956,7 +963,9 @@ public class RpcClientImpl extends AbstractRpcClient {
* Because only one receiver, so no synchronization on in.
*/
protected void readResponse() {
if (shouldCloseConnection.get()) return;
if (shouldCloseConnection.get()){
return;
}
Call call = null;
boolean expectedCall = false;
try {
@ -1015,12 +1024,16 @@ public class RpcClientImpl extends AbstractRpcClient {
EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
}
} catch (IOException e) {
if (expectedCall) call.setException(e);
if (expectedCall){
call.setException(e);
}
if (e instanceof SocketTimeoutException) {
// Clean up open calls but don't treat this as a fatal condition,
// since we expect certain responses to not make it by the specified
// {@link ConnectionId#rpcTimeout}.
if (LOG.isTraceEnabled()) LOG.trace("ignored", e);
if (LOG.isTraceEnabled()){
LOG.trace("ignored", e);
}
} else {
// Treat this as a fatal condition and close this connection
markClosed(e);
@ -1054,7 +1067,9 @@ public class RpcClientImpl extends AbstractRpcClient {
}
protected synchronized boolean markClosed(IOException e) {
if (e == null) throw new NullPointerException();
if (e == null){
throw new NullPointerException();
}
boolean ret = shouldCloseConnection.compareAndSet(false, true);
if (ret) {
@ -1124,7 +1139,7 @@ public class RpcClientImpl extends AbstractRpcClient {
super(conf, clusterId, localAddr, metrics);
this.socketFactory = factory;
this.connections = new PoolMap<ConnectionId, Connection>(getPoolType(conf), getPoolSize(conf));
this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));
this.failedServers = new FailedServers(conf);
}
@ -1156,8 +1171,12 @@ public class RpcClientImpl extends AbstractRpcClient {
* using this client. */
@Override
public void close() {
if (LOG.isDebugEnabled()) LOG.debug("Stopping rpc client");
if (!running.compareAndSet(true, false)) return;
if (LOG.isDebugEnabled()){
LOG.debug("Stopping rpc client");
}
if (!running.compareAndSet(true, false)){
return;
}
Set<Connection> connsToClose = null;
// wake up all connections
@ -1172,7 +1191,7 @@ public class RpcClientImpl extends AbstractRpcClient {
// at all (if CallSender has a cancelled Call it can happen). See HBASE-13851
if (!conn.isAlive()) {
if (connsToClose == null) {
connsToClose = new HashSet<Connection>();
connsToClose = new HashSet<>();
}
connsToClose.add(conn);
}
@ -1207,8 +1226,8 @@ public class RpcClientImpl extends AbstractRpcClient {
* {@link UserProvider#getCurrent()} makes a new instance of User each time so will be a
* new Connection each time.
* @return A pair with the Message response and the Cell data (if any).
* @throws InterruptedException
* @throws IOException
* @throws InterruptedException if the call is interupted
* @throws IOException if something fails on the connection
*/
@Override
protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,
@ -1228,17 +1247,17 @@ public class RpcClientImpl extends AbstractRpcClient {
final CallFuture cts;
if (connection.callSender != null) {
cts = connection.callSender.sendCall(call, pcrc.getPriority(), Trace.currentSpan());
pcrc.notifyOnCancel(new RpcCallback<Object>() {
@Override
public void run(Object parameter) {
connection.callSender.remove(cts);
}
});
if (pcrc.isCanceled()) {
// To finish if the call was cancelled before we set the notification (race condition)
call.callComplete();
return new Pair<Message, CellScanner>(call.response, call.cells);
pcrc.notifyOnCancel(new RpcCallback<Object>() {
@Override
public void run(Object parameter) {
connection.callSender.remove(cts);
}
});
if (pcrc.isCanceled()) {
// To finish if the call was cancelled before we set the notification (race condition)
call.callComplete();
return new Pair<>(call.response, call.cells);
}
} else {
cts = null;
connection.tracedWriteRequest(call, pcrc.getPriority(), Trace.currentSpan());
@ -1246,7 +1265,9 @@ public class RpcClientImpl extends AbstractRpcClient {
while (!call.done) {
if (call.checkAndSetTimeout()) {
if (cts != null) connection.callSender.remove(cts);
if (cts != null){
connection.callSender.remove(cts);
}
break;
}
if (connection.shouldCloseConnection.get()) {
@ -1255,12 +1276,16 @@ public class RpcClientImpl extends AbstractRpcClient {
}
try {
synchronized (call) {
if (call.done) break;
if (call.done){
break;
}
call.wait(Math.min(call.remainingTime(), 1000) + 1);
}
} catch (InterruptedException e) {
call.setException(new InterruptedIOException());
if (cts != null) connection.callSender.remove(cts);
if (cts != null) {
connection.callSender.remove(cts);
}
throw e;
}
}
@ -1274,7 +1299,7 @@ public class RpcClientImpl extends AbstractRpcClient {
throw wrapException(addr, call.error);
}
return new Pair<Message, CellScanner>(call.response, call.cells);
return new Pair<>(call.response, call.cells);
}
@ -1303,12 +1328,14 @@ public class RpcClientImpl extends AbstractRpcClient {
}
/**
* Get a connection from the pool, or create a new one and add it to the
* Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given host/port are reused.
*/
protected Connection getConnection(User ticket, Call call, InetSocketAddress addr)
throws IOException {
if (!running.get()) throw new StoppedRpcClientException();
if (!running.get()){
throw new StoppedRpcClientException();
}
Connection connection;
ConnectionId remoteId =
new ConnectionId(ticket, call.md.getService().getName(), addr);

View File

@ -18,14 +18,14 @@
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.util.StringUtils;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
/**
* Used for server-side protobuf RPC service invocations. This handler allows
* invocation exceptions to easily be passed through to the RPC server from coprocessor
@ -55,7 +55,7 @@ public class ServerRpcController implements RpcController {
/**
* The exception thrown within
* {@link com.google.protobuf.Service#callMethod(
* Descriptors.MethodDescriptor, RpcController, Message, RpcCallback)},
* Descriptors.MethodDescriptor, RpcController, Message, RpcCallback)}
* if any.
*/
// TODO: it would be good widen this to just Throwable, but IOException is what we allow now

View File

@ -18,14 +18,14 @@
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
@InterfaceAudience.Private
public class TimeLimitedRpcController implements RpcController {
@ -35,10 +35,10 @@ public class TimeLimitedRpcController implements RpcController {
protected volatile Integer callTimeout;
protected volatile boolean cancelled = false;
protected final AtomicReference<RpcCallback<Object>> cancellationCb =
new AtomicReference<RpcCallback<Object>>(null);
new AtomicReference<>(null);
protected final AtomicReference<RpcCallback<IOException>> failureCb =
new AtomicReference<RpcCallback<IOException>>(null);
new AtomicReference<>(null);
private IOException exception;