HBASE-15180 Reduce garbage created while reading Cells from Codec Decoder.

This commit is contained in:
anoopsjohn 2016-03-11 13:58:41 +05:30
parent a979d85582
commit eea8b38dfa
12 changed files with 147 additions and 37 deletions

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.ipc;
import java.io.DataInput;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
@ -178,50 +177,58 @@ public class IPCUtil {
* @throws IOException
*/
public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
final byte [] cellBlock)
throws IOException {
return createCellScanner(codec, compressor, ByteBuffer.wrap(cellBlock));
final byte[] cellBlock) throws IOException {
// Use this method from Client side to create the CellScanner
ByteBuffer cellBlockBuf = ByteBuffer.wrap(cellBlock);
if (compressor != null) {
cellBlockBuf = decompress(compressor, cellBlockBuf);
}
// Not making the Decoder over the ByteBuffer purposefully. The Decoder over the BB will
// make Cells directly over the passed BB. This method is called at client side and we don't
// want the Cells to share the same byte[] where the RPC response is being read. Caching of any
// of the Cells at user's app level will make it not possible to GC the response byte[]
return codec.getDecoder(new ByteBufferInputStream(cellBlockBuf));
}
/**
* @param codec
* @param cellBlock ByteBuffer containing the cells written by the Codec. The buffer should be
* position()'ed at the start of the cell block and limit()'ed at the end.
* @return CellScanner to work against the content of <code>cellBlock</code>
* @return CellScanner to work against the content of <code>cellBlock</code>.
* All cells created out of the CellScanner will share the same ByteBuffer being passed.
* @throws IOException
*/
public CellScanner createCellScanner(final Codec codec, final CompressionCodec compressor,
final ByteBuffer cellBlock)
throws IOException {
public CellScanner createCellScannerReusingBuffers(final Codec codec,
final CompressionCodec compressor, ByteBuffer cellBlock) throws IOException {
// Use this method from HRS to create the CellScanner
// If compressed, decompress it first before passing it on else we will leak compression
// resources if the stream is not closed properly after we let it out.
InputStream is = null;
if (compressor != null) {
// GZIPCodec fails w/ NPE if no configuration.
if (compressor instanceof Configurable) ((Configurable)compressor).setConf(this.conf);
Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
CompressionInputStream cis =
compressor.createInputStream(new ByteBufferInputStream(cellBlock), poolDecompressor);
ByteBufferOutputStream bbos = null;
try {
// TODO: This is ugly. The buffer will be resized on us if we guess wrong.
// TODO: Reuse buffers.
bbos = new ByteBufferOutputStream(cellBlock.remaining() *
this.cellBlockDecompressionMultiplier);
IOUtils.copy(cis, bbos);
bbos.close();
ByteBuffer bb = bbos.getByteBuffer();
is = new ByteBufferInputStream(bb);
} finally {
if (is != null) is.close();
if (bbos != null) bbos.close();
CodecPool.returnDecompressor(poolDecompressor);
}
} else {
is = new ByteBufferInputStream(cellBlock);
cellBlock = decompress(compressor, cellBlock);
}
return codec.getDecoder(is);
return codec.getDecoder(cellBlock);
}
private ByteBuffer decompress(CompressionCodec compressor, ByteBuffer cellBlock)
throws IOException {
// GZIPCodec fails w/ NPE if no configuration.
if (compressor instanceof Configurable) ((Configurable) compressor).setConf(this.conf);
Decompressor poolDecompressor = CodecPool.getDecompressor(compressor);
CompressionInputStream cis = compressor.createInputStream(new ByteBufferInputStream(cellBlock),
poolDecompressor);
ByteBufferOutputStream bbos = null;
try {
// TODO: This is ugly. The buffer will be resized on us if we guess wrong.
// TODO: Reuse buffers.
bbos = new ByteBufferOutputStream(
cellBlock.remaining() * this.cellBlockDecompressionMultiplier);
IOUtils.copy(cis, bbos);
bbos.close();
cellBlock = bbos.getByteBuffer();
} finally {
CodecPool.returnDecompressor(poolDecompressor);
}
return cellBlock;
}
/**

View File

@ -79,7 +79,7 @@ public class TestIPCUtil {
CellScanner cellScanner = sized? getSizedCellScanner(cells):
CellUtil.createCellScanner(Arrays.asList(cells).iterator());
ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner);
cellScanner = util.createCellScanner(codec, compressor, bb);
cellScanner = util.createCellScannerReusingBuffers(codec, compressor, bb);
int i = 0;
while (cellScanner.advance()) {
i++;

View File

@ -20,12 +20,14 @@ package org.apache.hadoop.hbase.codec;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.util.Bytes;
/**
@ -115,6 +117,11 @@ public class CellCodec implements Codec {
return new CellDecoder(is);
}
@Override
public Decoder getDecoder(ByteBuffer buf) {
return getDecoder(new ByteBufferInputStream(buf));
}
@Override
public Encoder getEncoder(OutputStream os) {
return new CellEncoder(os);

View File

@ -20,12 +20,14 @@ package org.apache.hadoop.hbase.codec;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.util.Bytes;
/**
@ -116,6 +118,11 @@ public class CellCodecWithTags implements Codec {
return new CellDecoder(is);
}
@Override
public Decoder getDecoder(ByteBuffer buf) {
return getDecoder(new ByteBufferInputStream(buf));
}
@Override
public Encoder getEncoder(OutputStream os) {
return new CellEncoder(os);

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.codec;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -50,5 +51,6 @@ public interface Codec {
interface Decoder extends CellScanner {};
Decoder getDecoder(InputStream is);
Decoder getDecoder(ByteBuffer buf);
Encoder getEncoder(OutputStream os);
}

View File

@ -20,11 +20,14 @@ package org.apache.hadoop.hbase.codec;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.NoTagsKeyValue;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ByteBufferUtils;
/**
* Codec that does KeyValue version 1 serialization.
@ -69,6 +72,37 @@ public class KeyValueCodec implements Codec {
}
}
public static class ByteBufferedKeyValueDecoder implements Codec.Decoder {
protected final ByteBuffer buf;
protected Cell current = null;
public ByteBufferedKeyValueDecoder(ByteBuffer buf) {
this.buf = buf;
}
@Override
public boolean advance() throws IOException {
if (this.buf.remaining() <= 0) {
return false;
}
int len = ByteBufferUtils.toInt(buf);
assert buf.hasArray();
this.current = createCell(buf.array(), buf.arrayOffset() + buf.position(), len);
buf.position(buf.position() + len);
return true;
}
@Override
public Cell current() {
return this.current;
}
protected Cell createCell(byte[] buf, int offset, int len) {
return new NoTagsKeyValue(buf, offset, len);
}
}
/**
* Implementation depends on {@link InputStream#available()}
*/
@ -77,6 +111,11 @@ public class KeyValueCodec implements Codec {
return new KeyValueDecoder(is);
}
@Override
public Decoder getDecoder(ByteBuffer buf) {
return new ByteBufferedKeyValueDecoder(buf);
}
@Override
public Encoder getEncoder(OutputStream os) {
return new KeyValueEncoder(os);

View File

@ -20,9 +20,11 @@ package org.apache.hadoop.hbase.codec;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -75,6 +77,18 @@ public class KeyValueCodecWithTags implements Codec {
}
}
public static class ByteBufferedKeyValueDecoder
extends KeyValueCodec.ByteBufferedKeyValueDecoder {
public ByteBufferedKeyValueDecoder(ByteBuffer buf) {
super(buf);
}
protected Cell createCell(byte[] buf, int offset, int len) {
return new KeyValue(buf, offset, len);
}
}
/**
* Implementation depends on {@link InputStream#available()}
*/
@ -87,4 +101,9 @@ public class KeyValueCodecWithTags implements Codec {
public Encoder getEncoder(OutputStream os) {
return new KeyValueEncoder(os);
}
@Override
public Decoder getDecoder(ByteBuffer buf) {
return new ByteBufferedKeyValueDecoder(buf);
}
}

View File

@ -781,6 +781,19 @@ public final class ByteBufferUtils {
}
}
/**
* Reads an int value at the given buffer's current position. Also advances the buffer's position
*/
public static int toInt(ByteBuffer buffer) {
if (UNSAFE_UNALIGNED) {
int i = UnsafeAccess.toInt(buffer, buffer.position());
buffer.position(buffer.position() + Bytes.SIZEOF_INT);
return i;
} else {
return buffer.getInt();
}
}
/**
* Reads an int value at the given buffer's offset.
* @param buffer

View File

@ -20,9 +20,11 @@ package org.apache.hadoop.hbase.codec;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@ -79,6 +81,11 @@ public class MessageCodec implements Codec {
return new MessageDecoder(is);
}
@Override
public Decoder getDecoder(ByteBuffer buf) {
return getDecoder(new ByteBufferInputStream(buf));
}
@Override
public Encoder getEncoder(OutputStream os) {
return new MessageEncoder(os);

View File

@ -1884,7 +1884,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
}
if (header.hasCellBlockMeta()) {
buf.position(offset);
cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, buf);
cellScanner = ipcUtil.createCellScannerReusingBuffers(this.codec, this.compressionCodec, buf);
}
} catch (Throwable t) {
InetSocketAddress address = getListenerAddress();

View File

@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.codec.BaseDecoder;
import org.apache.hadoop.hbase.codec.BaseEncoder;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.io.util.StreamUtils;
import org.apache.hadoop.hbase.util.Bytes;
@ -347,6 +349,11 @@ public class WALCellCodec implements Codec {
? new KeyValueCodecWithTags.KeyValueDecoder(is) : new CompressedKvDecoder(is, compression);
}
@Override
public Decoder getDecoder(ByteBuffer buf) {
return getDecoder(new ByteBufferInputStream(buf));
}
@Override
public Encoder getEncoder(OutputStream os) {
return (compression == null)

View File

@ -221,7 +221,8 @@ public class TestTags {
CellScanner cellScanner = result.cellScanner();
cellScanner.advance();
KeyValue current = (KeyValue) cellScanner.current();
assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength());
assertTrue(current.getValueOffset() + current.getValueLength() == current.getOffset()
+ current.getLength());
}
} finally {
if (scanner != null)
@ -239,7 +240,8 @@ public class TestTags {
CellScanner cellScanner = result.cellScanner();
cellScanner.advance();
KeyValue current = (KeyValue) cellScanner.current();
assertTrue(current.getValueOffset() + current.getValueLength() == current.getLength());
assertTrue(current.getValueOffset() + current.getValueLength() == current.getOffset()
+ current.getLength());
}
} finally {
if (scanner != null) {