HBASE-16531 Move cell block related code out of IPCUtil

This commit is contained in:
zhangduo 2016-08-30 18:26:42 +08:00
parent ea15522704
commit 647a65ce01
10 changed files with 549 additions and 467 deletions

View File

@ -26,10 +26,8 @@ import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -42,7 +40,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec; import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -64,7 +61,7 @@ public abstract class AbstractRpcClient implements RpcClient {
protected final MetricsConnection metrics; protected final MetricsConnection metrics;
protected UserProvider userProvider; protected UserProvider userProvider;
protected final IPCUtil ipcUtil; protected final CellBlockBuilder cellBlockBuilder;
protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this protected final int minIdleTimeBeforeClose; // if the connection is idle for more than this
// time (in ms), it will be closed at any moment. // time (in ms), it will be closed at any moment.
@ -98,7 +95,7 @@ public abstract class AbstractRpcClient implements RpcClient {
HConstants.DEFAULT_HBASE_CLIENT_PAUSE); HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0);
this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true); this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true);
this.ipcUtil = new IPCUtil(conf); this.cellBlockBuilder = new CellBlockBuilder(conf);
this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes
this.conf = conf; this.conf = conf;
@ -292,33 +289,6 @@ public abstract class AbstractRpcClient implements RpcClient {
return pcrc; return pcrc;
} }
/**
* Takes an Exception and the address we were trying to connect to and return an IOException with
* the input exception as the cause. The new exception provides the stack trace of the place where
* the exception is thrown and some extra diagnostics information. If the exception is
* ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return
* an IOException.
* @param addr target address
* @param exception the relevant exception
* @return an exception to throw
*/
protected IOException wrapException(InetSocketAddress addr, Exception exception) {
if (exception instanceof ConnectException) {
// connection refused; include the host:port in the error
return (ConnectException) new ConnectException("Call to " + addr
+ " failed on connection exception: " + exception).initCause(exception);
} else if (exception instanceof SocketTimeoutException) {
return (SocketTimeoutException) new SocketTimeoutException("Call to " + addr
+ " failed because " + exception).initCause(exception);
} else if (exception instanceof ConnectionClosingException) {
return (ConnectionClosingException) new ConnectionClosingException("Call to " + addr
+ " failed on local exception: " + exception).initCause(exception);
} else {
return (IOException) new IOException("Call to " + addr + " failed on local exception: "
+ exception).initCause(exception);
}
}
/** /**
* Blocking rpc channel that goes via hbase rpc. * Blocking rpc channel that goes via hbase rpc.
*/ */

View File

@ -260,11 +260,11 @@ public class AsyncRpcClient extends AbstractRpcClient {
if (e.getCause() instanceof IOException) { if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause(); throw (IOException) e.getCause();
} else { } else {
throw wrapException(addr, (Exception) e.getCause()); throw IPCUtil.wrapException(addr, (Exception) e.getCause());
} }
} catch (TimeoutException e) { } catch (TimeoutException e) {
CallTimeoutException cte = new CallTimeoutException(promise.toString()); CallTimeoutException cte = new CallTimeoutException(promise.toString());
throw wrapException(addr, cte); throw IPCUtil.wrapException(addr, cte);
} }
} }
@ -359,7 +359,7 @@ public class AsyncRpcClient extends AbstractRpcClient {
* @throws java.io.IOException on error on creation cell scanner * @throws java.io.IOException on error on creation cell scanner
*/ */
public CellScanner createCellScanner(byte[] cellBlock) throws IOException { public CellScanner createCellScanner(byte[] cellBlock) throws IOException {
return ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock); return cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
} }
/** /**
@ -370,7 +370,7 @@ public class AsyncRpcClient extends AbstractRpcClient {
* @throws java.io.IOException if block creation fails * @throws java.io.IOException if block creation fails
*/ */
public ByteBuffer buildCellBlock(CellScanner cells) throws IOException { public ByteBuffer buildCellBlock(CellScanner cells) throws IOException {
return ipcUtil.buildCellBlock(this.codec, this.compressor, cells); return cellBlockBuilder.buildCellBlock(this.codec, this.compressor, cells);
} }
/** /**

View File

@ -0,0 +1,229 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
/**
* Helper class for building cell block.
*/
@InterfaceAudience.Private
public class CellBlockBuilder {
// LOG is being used in TestCellBlockBuilder
static final Log LOG = LogFactory.getLog(CellBlockBuilder.class);
private final Configuration conf;
/**
* How much we think the decompressor will expand the original compressed content.
*/
private final int cellBlockDecompressionMultiplier;
private final int cellBlockBuildingInitialBufferSize;
public CellBlockBuilder(final Configuration conf) {
this.conf = conf;
this.cellBlockDecompressionMultiplier = conf
.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
// Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in
// #buildCellBlock.
this.cellBlockBuildingInitialBufferSize = ClassSize
.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
}
/**
* Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
* <code>compressor</code>.
* @param codec to use for encoding
* @param compressor to use for encoding
* @param cellScanner to encode
* @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
* passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
* been flipped and is ready for reading. Use limit to find total size.
* @throws IOException if encoding the cells fail
*/
public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
final CellScanner cellScanner) throws IOException {
if (cellScanner == null) {
return null;
}
if (codec == null) {
throw new CellScannerButNoCodecException();
}
int bufferSize = this.cellBlockBuildingInitialBufferSize;
ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
encodeCellsTo(baos, cellScanner, codec, compressor);
if (LOG.isTraceEnabled()) {
if (bufferSize < baos.size()) {
LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size()
+ "; up hbase.ipc.cellblock.building.initial.buffersize?");
}
}
ByteBuffer bb = baos.getByteBuffer();
// If no cells, don't mess around. Just return null (could be a bunch of existence checking
// gets or something -- stuff that does not return a cell).
if (!bb.hasRemaining()) return null;
return bb;
}
private void encodeCellsTo(ByteBufferOutputStream bbos, CellScanner cellScanner, Codec codec,
CompressionCodec compressor) throws IOException {
OutputStream os = bbos;
Compressor poolCompressor = null;
try {
if (compressor != null) {
if (compressor instanceof Configurable) {
((Configurable) compressor).setConf(this.conf);
}
poolCompressor = CodecPool.getCompressor(compressor);
os = compressor.createOutputStream(os, poolCompressor);
}
Codec.Encoder encoder = codec.getEncoder(os);
while (cellScanner.advance()) {
encoder.write(cellScanner.current());
}
encoder.flush();
} catch (BufferOverflowException e) {
throw new DoNotRetryIOException(e);
} finally {
os.close();
if (poolCompressor != null) {
CodecPool.returnCompressor(poolCompressor);
}
}
}
/**
* Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
* <code>compressor</code>.
* @param codec to use for encoding
* @param compressor to use for encoding
* @param cellScanner to encode
* @param pool Pool of ByteBuffers to make use of.
* @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
* passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has
* been flipped and is ready for reading. Use limit to find total size. If
* <code>pool</code> was not null, then this returned ByteBuffer came from there and
* should be returned to the pool when done.
* @throws IOException if encoding the cells fail
*/
public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor,
CellScanner cellScanner, ByteBufferPool pool) throws IOException {
if (cellScanner == null) {
return null;
}
if (codec == null) {
throw new CellScannerButNoCodecException();
}
assert pool != null;
ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
encodeCellsTo(bbos, cellScanner, codec, compressor);
if (bbos.size() == 0) {
bbos.releaseResources();
return null;
}
return bbos;
}
/**
* @param codec to use for cellblock
* @param cellBlock to encode
* @return CellScanner to work against the content of <code>cellBlock</code>
* @throws IOException if encoding fails
*/
public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
final byte[] cellBlock) throws IOException {
// Use this method from Client side to create the CellScanner
ByteBuffer cellBlockBuf = ByteBuffer.wrap(cellBlock);
if (compressor != null) {
cellBlockBuf = decompress(compressor, cellBlockBuf);
}
// Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will
// make Cells directly over the passed BB. This method is called at client side and we don't
// want the Cells to share the same byte[] where the RPC response is being read. Caching of any
// of the Cells at user's app level will make it not possible to GC the response byte[]
return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf));
}
/**
* @param codec to use for cellblock
* @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
* position()'ed at the start of the cell block and limit()'ed at the end.
* @return CellScanner to work against the content of <code>cellBlock</code>. All cells created
* out of the CellScanner will share the same ByteBuffer being passed.
* @throws IOException if cell encoding fails
*/
public CellScanner createCellScannerReusingBuffers(final Codec codec,
final CompressionCodec compressor, ByteBuffer cellBlock) throws IOException {
// Use this method from HRS to create the CellScanner
// If compressed, decompress it first before passing it on else we will leak compression
// resources if the stream is not closed properly after we let it out.
if (compressor != null) {
cellBlock = decompress(compressor, cellBlock);
}
return codec.getDecoder(cellBlock);
}
private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock)
throws IOException {
// GZIPCodec fails w/ NPE if no configuration.
if (compressor instanceof Configurable) {
((Configurable) compressor).setConf(this.conf);
}
Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock),
poolDecompressor);
ByteBufferOutputStream bbos;
try {
// TODO: This is ugly. The buffer will be resized on us if we guess wrong.
// TODO: Reuse buffers.
bbos = new ByteBufferOutputStream(
cellBlock.remaining() * this.cellBlockDecompressionMultiplier);
IOUtils.copy(cis, bbos);
bbos.close();
cellBlock = bbos.getByteBuffer();
} finally {
CodecPool.returnDecompressor(poolDecompressor);
}
return cellBlock;
}
}

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Thrown if a cellscanner but no codec to encode it with.
*/
@SuppressWarnings("serial")
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class CellScannerButNoCodecException extends HBaseIOException {
}

View File

@ -20,221 +20,25 @@ package org.apache.hadoop.hbase.ipc;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.protobuf.CodedOutputStream; import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.nio.BufferOverflowException; import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.io.ByteBufferInputStream; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
import org.apache.hadoop.hbase.io.ByteBufferPool;
import org.apache.hadoop.hbase.io.ByteBufferListOutputStream;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
/** /**
* Utility to help ipc'ing. * Utility to help ipc'ing.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class IPCUtil { public class IPCUtil {
// LOG is being used in TestIPCUtil
public static final Log LOG = LogFactory.getLog(IPCUtil.class);
/**
* How much we think the decompressor will expand the original compressed content.
*/
private final int cellBlockDecompressionMultiplier;
private final int cellBlockBuildingInitialBufferSize;
private final Configuration conf;
public IPCUtil(final Configuration conf) {
super();
this.conf = conf;
this.cellBlockDecompressionMultiplier =
conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
// Guess that 16k is a good size for rpc buffer. Could go bigger. See the TODO below in
// #buildCellBlock.
this.cellBlockBuildingInitialBufferSize =
ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
}
/**
* Thrown if a cellscanner but no codec to encode it with.
*/
public static class CellScannerButNoCodecException extends HBaseIOException {};
/**
* Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
* <code>compressor</code>.
* @param codec to use for encoding
* @param compressor to use for encoding
* @param cellScanner to encode
* @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
* passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
* flipped and is ready for reading. Use limit to find total size.
* @throws IOException if encoding the cells fail
*/
@SuppressWarnings("resource")
public ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
final CellScanner cellScanner) throws IOException {
if (cellScanner == null) {
return null;
}
if (codec == null) {
throw new CellScannerButNoCodecException();
}
int bufferSize = this.cellBlockBuildingInitialBufferSize;
ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
encodeCellsTo(baos, cellScanner, codec, compressor);
if (LOG.isTraceEnabled()) {
if (bufferSize < baos.size()) {
LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size()
+ "; up hbase.ipc.cellblock.building.initial.buffersize?");
}
}
ByteBuffer bb = baos.getByteBuffer();
// If no cells, don't mess around. Just return null (could be a bunch of existence checking
// gets or something -- stuff that does not return a cell).
if (!bb.hasRemaining()) return null;
return bb;
}
private void encodeCellsTo(ByteBufferOutputStream bbos, CellScanner cellScanner, Codec codec,
CompressionCodec compressor) throws IOException {
OutputStream os = bbos;
Compressor poolCompressor = null;
try {
if (compressor != null) {
if (compressor instanceof Configurable) {
((Configurable) compressor).setConf(this.conf);
}
poolCompressor = CodecPool.getCompressor(compressor);
os = compressor.createOutputStream(os, poolCompressor);
}
Codec.Encoder encoder = codec.getEncoder(os);
while (cellScanner.advance()) {
encoder.write(cellScanner.current());
}
encoder.flush();
} catch (BufferOverflowException e) {
throw new DoNotRetryIOException(e);
} finally {
os.close();
if (poolCompressor != null) {
CodecPool.returnCompressor(poolCompressor);
}
}
}
/**
* Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
* <code>compressor</code>.
* @param codec to use for encoding
* @param compressor to use for encoding
* @param cellScanner to encode
* @param pool Pool of ByteBuffers to make use of.
* @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
* passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
* flipped and is ready for reading. Use limit to find total size. If <code>pool</code> was not
* null, then this returned ByteBuffer came from there and should be returned to the pool when
* done.
* @throws IOException if encoding the cells fail
*/
@SuppressWarnings("resource")
public ByteBufferListOutputStream buildCellBlockStream(Codec codec, CompressionCodec compressor,
CellScanner cellScanner, ByteBufferPool pool) throws IOException {
if (cellScanner == null) {
return null;
}
if (codec == null) {
throw new CellScannerButNoCodecException();
}
assert pool != null;
ByteBufferListOutputStream bbos = new ByteBufferListOutputStream(pool);
encodeCellsTo(bbos, cellScanner, codec, compressor);
if (bbos.size() == 0) {
bbos.releaseResources();
return null;
}
return bbos;
}
/**
* @param codec to use for cellblock
* @param cellBlock to encode
* @return CellScanner to work against the content of <code>cellBlock</code>
* @throws IOException if encoding fails
*/
public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
final byte[] cellBlock) throws IOException {
// Use this method from Client side to create the CellScanner
ByteBuffer cellBlockBuf = ByteBuffer.wrap(cellBlock);
if (compressor != null) {
cellBlockBuf = decompress(compressor, cellBlockBuf);
}
// Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will
// make Cells directly over the passed BB. This method is called at client side and we don't
// want the Cells to share the same byte[] where the RPC response is being read. Caching of any
// of the Cells at user's app level will make it not possible to GC the response byte[]
return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf));
}
/**
* @param codec to use for cellblock
* @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
* position()'ed at the start of the cell block and limit()'ed at the end.
* @return CellScanner to work against the content of <code>cellBlock</code>.
* All cells created out of the CellScanner will share the same ByteBuffer being passed.
* @throws IOException if cell encoding fails
*/
public CellScanner createCellScannerReusingBuffers(final Codec codec,
final CompressionCodec compressor, ByteBuffer cellBlock) throws IOException {
// Use this method from HRS to create the CellScanner
// If compressed, decompress it first before passing it on else we will leak compression
// resources if the stream is not closed properly after we let it out.
if (compressor != null) {
cellBlock = decompress(compressor, cellBlock);
}
return codec.getDecoder(cellBlock);
}
private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock)
throws IOException {
// GZIPCodec fails w/ NPE if no configuration.
if (compressor instanceof Configurable) {
((Configurable) compressor).setConf(this.conf);
}
Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock),
poolDecompressor);
ByteBufferOutputStream bbos;
try {
// TODO: This is ugly. The buffer will be resized on us if we guess wrong.
// TODO: Reuse buffers.
bbos = new ByteBufferOutputStream(
cellBlock.remaining() * this.cellBlockDecompressionMultiplier);
IOUtils.copy(cis, bbos);
bbos.close();
cellBlock = bbos.getByteBuffer();
} finally {
CodecPool.returnDecompressor(poolDecompressor);
}
return cellBlock;
}
/** /**
* Write out header, param, and cell block if there is one. * Write out header, param, and cell block if there is one.
@ -246,10 +50,9 @@ public class IPCUtil {
* @throws IOException if write action fails * @throws IOException if write action fails
*/ */
public static int write(final OutputStream dos, final Message header, final Message param, public static int write(final OutputStream dos, final Message header, final Message param,
final ByteBuffer cellBlock) final ByteBuffer cellBlock) throws IOException {
throws IOException {
// Must calculate total size and write that first so other side can read it all in in one // Must calculate total size and write that first so other side can read it all in in one
// swoop. This is dictated by how the server is currently written. Server needs to change // swoop. This is dictated by how the server is currently written. Server needs to change
// if we are to be able to write without the length prefixing. // if we are to be able to write without the length prefixing.
int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param); int totalSize = IPCUtil.getTotalSizeWhenWrittenDelimited(header, param);
if (cellBlock != null) { if (cellBlock != null) {
@ -259,8 +62,7 @@ public class IPCUtil {
} }
private static int write(final OutputStream dos, final Message header, final Message param, private static int write(final OutputStream dos, final Message header, final Message param,
final ByteBuffer cellBlock, final int totalSize) final ByteBuffer cellBlock, final int totalSize) throws IOException {
throws IOException {
// I confirmed toBytes does same as DataOutputStream#writeInt. // I confirmed toBytes does same as DataOutputStream#writeInt.
dos.write(Bytes.toBytes(totalSize)); dos.write(Bytes.toBytes(totalSize));
// This allocates a buffer that is the size of the message internally. // This allocates a buffer that is the size of the message internally.
@ -278,9 +80,9 @@ public class IPCUtil {
/** /**
* @return Size on the wire when the two messages are written with writeDelimitedTo * @return Size on the wire when the two messages are written with writeDelimitedTo
*/ */
public static int getTotalSizeWhenWrittenDelimited(Message ... messages) { public static int getTotalSizeWhenWrittenDelimited(Message... messages) {
int totalSize = 0; int totalSize = 0;
for (Message m: messages) { for (Message m : messages) {
if (m == null) { if (m == null) {
continue; continue;
} }
@ -290,4 +92,52 @@ public class IPCUtil {
Preconditions.checkArgument(totalSize < Integer.MAX_VALUE); Preconditions.checkArgument(totalSize < Integer.MAX_VALUE);
return totalSize; return totalSize;
} }
/**
* @return True if the exception is a fatal connection exception.
*/
public static boolean isFatalConnectionException(final ExceptionResponse e) {
return e.getExceptionClassName().equals(FatalConnectionException.class.getName());
}
/**
* @param e exception to be wrapped
* @return RemoteException made from passed <code>e</code>
*/
public static RemoteException createRemoteException(final ExceptionResponse e) {
String innerExceptionClassName = e.getExceptionClassName();
boolean doNotRetry = e.getDoNotRetry();
return e.hasHostname() ?
// If a hostname then add it to the RemoteWithExtrasException
new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), e.getHostname(),
e.getPort(), doNotRetry)
: new RemoteWithExtrasException(innerExceptionClassName, e.getStackTrace(), doNotRetry);
}
/**
* Takes an Exception and the address we were trying to connect to and return an IOException with
* the input exception as the cause. The new exception provides the stack trace of the place where
* the exception is thrown and some extra diagnostics information. If the exception is
* ConnectException or SocketTimeoutException, return a new one of the same type; Otherwise return
* an IOException.
* @param addr target address
* @param exception the relevant exception
* @return an exception to throw
*/
public static IOException wrapException(InetSocketAddress addr, Exception exception) {
if (exception instanceof ConnectException) {
// connection refused; include the host:port in the error
return (ConnectException) new ConnectException(
"Call to " + addr + " failed on connection exception: " + exception).initCause(exception);
} else if (exception instanceof SocketTimeoutException) {
return (SocketTimeoutException) new SocketTimeoutException(
"Call to " + addr + " failed because " + exception).initCause(exception);
} else if (exception instanceof ConnectionClosingException) {
return (ConnectionClosingException) new ConnectionClosingException(
"Call to " + addr + " failed on local exception: " + exception).initCause(exception);
} else {
return (IOException) new IOException(
"Call to " + addr + " failed on local exception: " + exception).initCause(exception);
}
}
} }

View File

@ -896,7 +896,7 @@ public class RpcClientImpl extends AbstractRpcClient {
} }
builder.setMethodName(call.md.getName()); builder.setMethodName(call.md.getName());
builder.setRequestParam(call.param != null); builder.setRequestParam(call.param != null);
ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells); ByteBuffer cellBlock = cellBlockBuilder.buildCellBlock(this.codec, this.compressor, call.cells);
if (cellBlock != null) { if (cellBlock != null) {
CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder(); CellBlockMeta.Builder cellBlockBuilder = CellBlockMeta.newBuilder();
cellBlockBuilder.setLength(cellBlock.limit()); cellBlockBuilder.setLength(cellBlock.limit());
@ -997,12 +997,12 @@ public class RpcClientImpl extends AbstractRpcClient {
} }
if (responseHeader.hasException()) { if (responseHeader.hasException()) {
ExceptionResponse exceptionResponse = responseHeader.getException(); ExceptionResponse exceptionResponse = responseHeader.getException();
RemoteException re = createRemoteException(exceptionResponse); RemoteException re = IPCUtil.createRemoteException(exceptionResponse);
call.setException(re); call.setException(re);
call.callStats.setResponseSizeBytes(totalSize); call.callStats.setResponseSizeBytes(totalSize);
call.callStats.setCallTimeMs( call.callStats.setCallTimeMs(
EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime()); EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
if (isFatalConnectionException(exceptionResponse)) { if (IPCUtil.isFatalConnectionException(exceptionResponse)) {
markClosed(re); markClosed(re);
} }
} else { } else {
@ -1017,7 +1017,7 @@ public class RpcClientImpl extends AbstractRpcClient {
int size = responseHeader.getCellBlockMeta().getLength(); int size = responseHeader.getCellBlockMeta().getLength();
byte [] cellBlock = new byte[size]; byte [] cellBlock = new byte[size];
IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length); IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length);
cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock); cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock);
} }
call.setResponse(value, cellBlockScanner); call.setResponse(value, cellBlockScanner);
call.callStats.setResponseSizeBytes(totalSize); call.callStats.setResponseSizeBytes(totalSize);
@ -1044,29 +1044,6 @@ public class RpcClientImpl extends AbstractRpcClient {
} }
} }
/**
* @return True if the exception is a fatal connection exception.
*/
private boolean isFatalConnectionException(final ExceptionResponse e) {
return e.getExceptionClassName().
equals(FatalConnectionException.class.getName());
}
/**
* @param e exception to be wrapped
* @return RemoteException made from passed <code>e</code>
*/
private RemoteException createRemoteException(final ExceptionResponse e) {
String innerExceptionClassName = e.getExceptionClassName();
boolean doNotRetry = e.getDoNotRetry();
return e.hasHostname()?
// If a hostname then add it to the RemoteWithExtrasException
new RemoteWithExtrasException(innerExceptionClassName,
e.getStackTrace(), e.getHostname(), e.getPort(), doNotRetry):
new RemoteWithExtrasException(innerExceptionClassName,
e.getStackTrace(), doNotRetry);
}
protected synchronized boolean markClosed(IOException e) { protected synchronized boolean markClosed(IOException e) {
if (e == null){ if (e == null){
throw new NullPointerException(); throw new NullPointerException();
@ -1322,7 +1299,7 @@ public class RpcClientImpl extends AbstractRpcClient {
throw call.error; throw call.error;
} }
// local exception // local exception
throw wrapException(addr, call.error); throw IPCUtil.wrapException(addr, call.error);
} }
return call; return call;

View File

@ -0,0 +1,196 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.apache.commons.lang.time.StopWatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.io.SizedCellScanner;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.log4j.Level;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ClientTests.class, SmallTests.class })
public class TestCellBlockBuilder {
private static final Log LOG = LogFactory.getLog(TestCellBlockBuilder.class);
CellBlockBuilder builder;
@Before
public void before() {
this.builder = new CellBlockBuilder(new Configuration());
}
@Test
public void testBuildCellBlock() throws IOException {
doBuildCellBlockUndoCellBlock(this.builder, new KeyValueCodec(), null);
doBuildCellBlockUndoCellBlock(this.builder, new KeyValueCodec(), new DefaultCodec());
doBuildCellBlockUndoCellBlock(this.builder, new KeyValueCodec(), new GzipCodec());
}
static void doBuildCellBlockUndoCellBlock(final CellBlockBuilder util, final Codec codec,
final CompressionCodec compressor) throws IOException {
doBuildCellBlockUndoCellBlock(util, codec, compressor, 10, 1, false);
}
static void doBuildCellBlockUndoCellBlock(final CellBlockBuilder util, final Codec codec,
final CompressionCodec compressor, final int count, final int size, final boolean sized)
throws IOException {
Cell[] cells = getCells(count, size);
CellScanner cellScanner = sized ? getSizedCellScanner(cells)
: CellUtil.createCellScanner(Arrays.asList(cells).iterator());
ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner);
cellScanner = util.createCellScannerReusingBuffers(codec, compressor, bb);
int i = 0;
while (cellScanner.advance()) {
i++;
}
assertEquals(count, i);
}
static CellScanner getSizedCellScanner(final Cell[] cells) {
int size = -1;
for (Cell cell : cells) {
size += CellUtil.estimatedSerializedSizeOf(cell);
}
final int totalSize = ClassSize.align(size);
final CellScanner cellScanner = CellUtil.createCellScanner(cells);
return new SizedCellScanner() {
@Override
public long heapSize() {
return totalSize;
}
@Override
public Cell current() {
return cellScanner.current();
}
@Override
public boolean advance() throws IOException {
return cellScanner.advance();
}
};
}
static Cell[] getCells(final int howMany) {
return getCells(howMany, 1024);
}
static Cell[] getCells(final int howMany, final int valueSize) {
Cell[] cells = new Cell[howMany];
byte[] value = new byte[valueSize];
for (int i = 0; i < howMany; i++) {
byte[] index = Bytes.toBytes(i);
KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, value);
cells[i] = kv;
}
return cells;
}
private static final String COUNT = "--count=";
private static final String SIZE = "--size=";
/**
* Prints usage and then exits w/ passed <code>errCode</code>
* @param errCode
*/
private static void usage(final int errCode) {
System.out.println("Usage: IPCUtil [options]");
System.out.println("Micro-benchmarking how changed sizes and counts work with buffer resizing");
System.out.println(" --count Count of Cells");
System.out.println(" --size Size of Cell values");
System.out.println("Example: IPCUtil --count=1024 --size=1024");
System.exit(errCode);
}
private static void timerTests(final CellBlockBuilder util, final int count, final int size,
final Codec codec, final CompressionCodec compressor) throws IOException {
final int cycles = 1000;
StopWatch timer = new StopWatch();
timer.start();
for (int i = 0; i < cycles; i++) {
timerTest(util, timer, count, size, codec, compressor, false);
}
timer.stop();
LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + false + ", count="
+ count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
timer.reset();
timer.start();
for (int i = 0; i < cycles; i++) {
timerTest(util, timer, count, size, codec, compressor, true);
}
timer.stop();
LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + true + ", count="
+ count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
}
private static void timerTest(final CellBlockBuilder util, final StopWatch timer, final int count,
final int size, final Codec codec, final CompressionCodec compressor, final boolean sized)
throws IOException {
doBuildCellBlockUndoCellBlock(util, codec, compressor, count, size, sized);
}
/**
* For running a few tests of methods herein.
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
int count = 1024;
int size = 10240;
for (String arg : args) {
if (arg.startsWith(COUNT)) {
count = Integer.parseInt(arg.replace(COUNT, ""));
} else if (arg.startsWith(SIZE)) {
size = Integer.parseInt(arg.replace(SIZE, ""));
} else {
usage(1);
}
}
CellBlockBuilder util = new CellBlockBuilder(HBaseConfiguration.create());
((Log4JLogger) CellBlockBuilder.LOG).getLogger().setLevel(Level.ALL);
timerTests(util, count, size, new KeyValueCodec(), null);
timerTests(util, count, size, new KeyValueCodec(), new DefaultCodec());
timerTests(util, count, size, new KeyValueCodec(), new GzipCodec());
}
}

View File

@ -17,181 +17,28 @@
*/ */
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import static org.junit.Assert.assertEquals; import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.net.ConnectException;
import java.nio.ByteBuffer; import java.net.InetSocketAddress;
import java.util.Arrays; import java.net.SocketTimeoutException;
import org.apache.commons.lang.time.StopWatch; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.io.SizedCellScanner;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.log4j.Level;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ClientTests.class, SmallTests.class})
public class TestIPCUtil { public class TestIPCUtil {
private static final Log LOG = LogFactory.getLog(TestIPCUtil.class);
IPCUtil util;
@Before
public void before() {
this.util = new IPCUtil(new Configuration());
}
@Test @Test
public void testBuildCellBlock() throws IOException { public void testWrapException() throws Exception {
doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), null); final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), new DefaultCodec()); assertTrue(wrapException(address, new ConnectException()) instanceof ConnectException);
doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), new GzipCodec()); assertTrue(
} wrapException(address, new SocketTimeoutException()) instanceof SocketTimeoutException);
assertTrue(wrapException(address, new ConnectionClosingException(
static void doBuildCellBlockUndoCellBlock(final IPCUtil util, "Test AbstractRpcClient#wrapException")) instanceof ConnectionClosingException);
final Codec codec, final CompressionCodec compressor) assertTrue(
throws IOException { wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException"))
doBuildCellBlockUndoCellBlock(util, codec, compressor, 10, 1, false); .getCause() instanceof CallTimeoutException);
}
static void doBuildCellBlockUndoCellBlock(final IPCUtil util, final Codec codec,
final CompressionCodec compressor, final int count, final int size, final boolean sized)
throws IOException {
Cell [] cells = getCells(count, size);
CellScanner cellScanner = sized? getSizedCellScanner(cells):
CellUtil.createCellScanner(Arrays.asList(cells).iterator());
ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner);
cellScanner = util.createCellScannerReusingBuffers(codec, compressor, bb);
int i = 0;
while (cellScanner.advance()) {
i++;
}
assertEquals(count, i);
}
static CellScanner getSizedCellScanner(final Cell [] cells) {
int size = -1;
for (Cell cell: cells) {
size += CellUtil.estimatedSerializedSizeOf(cell);
}
final int totalSize = ClassSize.align(size);
final CellScanner cellScanner = CellUtil.createCellScanner(cells);
return new SizedCellScanner() {
@Override
public long heapSize() {
return totalSize;
}
@Override
public Cell current() {
return cellScanner.current();
}
@Override
public boolean advance() throws IOException {
return cellScanner.advance();
}
};
}
static Cell [] getCells(final int howMany) {
return getCells(howMany, 1024);
}
static Cell [] getCells(final int howMany, final int valueSize) {
Cell [] cells = new Cell[howMany];
byte [] value = new byte[valueSize];
for (int i = 0; i < howMany; i++) {
byte [] index = Bytes.toBytes(i);
KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, value);
cells[i] = kv;
}
return cells;
}
private static final String COUNT = "--count=";
private static final String SIZE = "--size=";
/**
* Prints usage and then exits w/ passed <code>errCode</code>
* @param errCode
*/
private static void usage(final int errCode) {
System.out.println("Usage: IPCUtil [options]");
System.out.println("Micro-benchmarking how changed sizes and counts work with buffer resizing");
System.out.println(" --count Count of Cells");
System.out.println(" --size Size of Cell values");
System.out.println("Example: IPCUtil --count=1024 --size=1024");
System.exit(errCode);
}
private static void timerTests(final IPCUtil util, final int count, final int size,
final Codec codec, final CompressionCodec compressor)
throws IOException {
final int cycles = 1000;
StopWatch timer = new StopWatch();
timer.start();
for (int i = 0; i < cycles; i++) {
timerTest(util, timer, count, size, codec, compressor, false);
}
timer.stop();
LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + false +
", count=" + count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
timer.reset();
timer.start();
for (int i = 0; i < cycles; i++) {
timerTest(util, timer, count, size, codec, compressor, true);
}
timer.stop();
LOG.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + true +
", count=" + count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
}
private static void timerTest(final IPCUtil util, final StopWatch timer, final int count,
final int size, final Codec codec, final CompressionCodec compressor, final boolean sized)
throws IOException {
doBuildCellBlockUndoCellBlock(util, codec, compressor, count, size, sized);
}
/**
* For running a few tests of methods herein.
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
int count = 1024;
int size = 10240;
for (String arg: args) {
if (arg.startsWith(COUNT)) {
count = Integer.parseInt(arg.replace(COUNT, ""));
} else if (arg.startsWith(SIZE)) {
size = Integer.parseInt(arg.replace(SIZE, ""));
} else {
usage(1);
}
}
IPCUtil util = new IPCUtil(HBaseConfiguration.create());
((Log4JLogger)IPCUtil.LOG).getLogger().setLevel(Level.ALL);
timerTests(util, count, size, new KeyValueCodec(), null);
timerTests(util, count, size, new KeyValueCodec(), new DefaultCodec());
timerTests(util, count, size, new KeyValueCodec(), new GzipCodec());
} }
} }

View File

@ -183,7 +183,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
*/ */
static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10; static final int DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER = 10;
private final IPCUtil ipcUtil; private final CellBlockBuilder cellBlockBuilder;
private static final String AUTH_FAILED_FOR = "Auth failed for "; private static final String AUTH_FAILED_FOR = "Auth failed for ";
private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for "; private static final String AUTH_SUCCESSFUL_FOR = "Auth successful for ";
@ -434,14 +434,14 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
List<ByteBuffer> cellBlock = null; List<ByteBuffer> cellBlock = null;
int cellBlockSize = 0; int cellBlockSize = 0;
if (reservoir != null) { if (reservoir != null) {
this.cellBlockStream = ipcUtil.buildCellBlockStream(this.connection.codec, this.cellBlockStream = cellBlockBuilder.buildCellBlockStream(this.connection.codec,
this.connection.compressionCodec, cells, reservoir); this.connection.compressionCodec, cells, reservoir);
if (this.cellBlockStream != null) { if (this.cellBlockStream != null) {
cellBlock = this.cellBlockStream.getByteBuffers(); cellBlock = this.cellBlockStream.getByteBuffers();
cellBlockSize = this.cellBlockStream.size(); cellBlockSize = this.cellBlockStream.size();
} }
} else { } else {
ByteBuffer b = ipcUtil.buildCellBlock(this.connection.codec, ByteBuffer b = cellBlockBuilder.buildCellBlock(this.connection.codec,
this.connection.compressionCodec, cells); this.connection.compressionCodec, cells);
if (b != null) { if (b != null) {
cellBlockSize = b.remaining(); cellBlockSize = b.remaining();
@ -1861,7 +1861,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
} }
if (header.hasCellBlockMeta()) { if (header.hasCellBlockMeta()) {
buf.position(offset); buf.position(offset);
cellScanner = ipcUtil.createCellScannerReusingBuffers(this.codec, this.compressionCodec, buf); cellScanner = cellBlockBuilder.createCellScannerReusingBuffers(this.codec, this.compressionCodec, buf);
} }
} catch (Throwable t) { } catch (Throwable t) {
InetSocketAddress address = getListenerAddress(); InetSocketAddress address = getListenerAddress();
@ -2058,7 +2058,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true); this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true);
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true); this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true);
this.ipcUtil = new IPCUtil(conf); this.cellBlockBuilder = new CellBlockBuilder(conf);
// Create the responder here // Create the responder here

View File

@ -35,10 +35,8 @@ import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
import java.io.IOException; import java.io.IOException;
import java.net.ConnectException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -54,7 +52,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto; import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@ -394,19 +391,4 @@ public abstract class AbstractTestIPC {
rpcServer.stop(); rpcServer.stop();
} }
} }
@Test
public void testWrapException() throws Exception {
AbstractRpcClient client =
(AbstractRpcClient) RpcClientFactory.createClient(CONF, "AbstractTestIPC");
final InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", 0);
assertTrue(client.wrapException(address, new ConnectException()) instanceof ConnectException);
assertTrue(client.wrapException(address,
new SocketTimeoutException()) instanceof SocketTimeoutException);
assertTrue(client.wrapException(address, new ConnectionClosingException(
"Test AbstractRpcClient#wrapException")) instanceof ConnectionClosingException);
assertTrue(client
.wrapException(address, new CallTimeoutException("Test AbstractRpcClient#wrapException"))
.getCause() instanceof CallTimeoutException);
}
} }