HBASE-22965 RS Crash due to DBE reference to an reused ByteBuff (#603)
Signed-off-by: huzheng <openinx@gmail.com>
This commit is contained in:
parent
9f703fc3b2
commit
be932487e8
|
@ -19,8 +19,8 @@
|
|||
package org.apache.hadoop.hbase.io.encoding;
|
||||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Keeps track of the encoding state.
|
||||
*/
|
||||
|
@ -31,4 +31,12 @@ public class EncodingState {
|
|||
* The previous Cell the encoder encoded.
|
||||
*/
|
||||
protected Cell prevCell = null;
|
||||
|
||||
public void beforeShipped() {
|
||||
if (this.prevCell != null) {
|
||||
// can't use KeyValueUtil#toNewKeyCell, because we need both key and value
|
||||
// from the prevCell in FastDiffDeltaEncoder
|
||||
this.prevCell = KeyValueUtil.copyToNewKeyValue(this.prevCell);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,6 +53,13 @@ public class RowIndexCodecV1 extends AbstractDataBlockEncoder {
|
|||
|
||||
private static class RowIndexEncodingState extends EncodingState {
|
||||
RowIndexEncoderV1 encoder = null;
|
||||
|
||||
@Override
|
||||
public void beforeShipped() {
|
||||
if (encoder != null) {
|
||||
encoder.beforeShipped();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -15,6 +15,7 @@ import java.io.IOException;
|
|||
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||
import org.apache.hadoop.hbase.KeyValueUtil;
|
||||
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -30,11 +31,9 @@ public class RowIndexEncoderV1 {
|
|||
private DataOutputStream out;
|
||||
private NoneEncoder encoder;
|
||||
private int startOffset = -1;
|
||||
private ByteArrayOutputStream rowsOffsetBAOS = new ByteArrayOutputStream(
|
||||
64 * 4);
|
||||
private ByteArrayOutputStream rowsOffsetBAOS = new ByteArrayOutputStream(64 * 4);
|
||||
|
||||
public RowIndexEncoderV1(DataOutputStream out,
|
||||
HFileBlockDefaultEncodingContext encodingCtx) {
|
||||
public RowIndexEncoderV1(DataOutputStream out, HFileBlockDefaultEncodingContext encodingCtx) {
|
||||
this.out = out;
|
||||
this.encoder = new NoneEncoder(out, encodingCtx);
|
||||
}
|
||||
|
@ -85,4 +84,9 @@ public class RowIndexEncoderV1 {
|
|||
}
|
||||
}
|
||||
|
||||
void beforeShipped() {
|
||||
if (this.lastCell != null) {
|
||||
this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.io.ByteBuffInputStream;
|
|||
import org.apache.hadoop.hbase.io.ByteBufferWriterDataOutputStream;
|
||||
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.encoding.EncodingState;
|
||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockDecodingContext;
|
||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext;
|
||||
import org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultEncodingContext;
|
||||
|
@ -53,6 +54,7 @@ import org.apache.hadoop.hbase.io.encoding.HFileBlockEncodingContext;
|
|||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.nio.MultiByteBuff;
|
||||
import org.apache.hadoop.hbase.nio.SingleByteBuff;
|
||||
import org.apache.hadoop.hbase.regionserver.ShipperListener;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.ChecksumType;
|
||||
import org.apache.hadoop.hbase.util.ClassSize;
|
||||
|
@ -762,7 +764,7 @@ public class HFileBlock implements Cacheable {
|
|||
* </ol>
|
||||
* <p>
|
||||
*/
|
||||
static class Writer {
|
||||
static class Writer implements ShipperListener {
|
||||
private enum State {
|
||||
INIT,
|
||||
WRITING,
|
||||
|
@ -841,6 +843,17 @@ public class HFileBlock implements Cacheable {
|
|||
/** Meta data that holds information about the hfileblock**/
|
||||
private HFileContext fileContext;
|
||||
|
||||
@Override
|
||||
public void beforeShipped() {
|
||||
if (getEncodingState() != null) {
|
||||
getEncodingState().beforeShipped();
|
||||
}
|
||||
}
|
||||
|
||||
EncodingState getEncodingState() {
|
||||
return dataBlockEncodingCtx.getEncodingState();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param dataBlockEncoder data block encoding algorithm to use
|
||||
*/
|
||||
|
|
|
@ -764,6 +764,7 @@ public class HFileWriterImpl implements HFile.Writer {
|
|||
|
||||
@Override
|
||||
public void beforeShipped() throws IOException {
|
||||
this.blockWriter.beforeShipped();
|
||||
// Add clone methods for every cell
|
||||
if (this.lastCell != null) {
|
||||
this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell);
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.ArrayBackedTag;
|
||||
import org.apache.hadoop.hbase.ByteBufferKeyValue;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
|
@ -60,12 +61,15 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
|
|||
import org.apache.hadoop.hbase.Tag;
|
||||
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
|
||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoder;
|
||||
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile.Writer;
|
||||
import org.apache.hadoop.hbase.nio.ByteBuff;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
|
||||
import org.apache.hadoop.hbase.testclassification.IOTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.ByteBufferUtils;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.junit.Assert;
|
||||
|
@ -759,5 +763,40 @@ public class TestHFile {
|
|||
0, expectedArray.length);
|
||||
}
|
||||
|
||||
}
|
||||
@Test
|
||||
public void testDBEShipped() throws IOException {
|
||||
for (DataBlockEncoding encoding : DataBlockEncoding.values()) {
|
||||
DataBlockEncoder encoder = encoding.getEncoder();
|
||||
if (encoder == null) {
|
||||
continue;
|
||||
}
|
||||
Path f = new Path(ROOT_DIR, testName.getMethodName() + "_" + encoding);
|
||||
HFileContext context = new HFileContextBuilder()
|
||||
.withIncludesTags(false)
|
||||
.withDataBlockEncoding(encoding).build();
|
||||
HFileWriterImpl writer = (HFileWriterImpl) HFile.getWriterFactory(conf, cacheConf)
|
||||
.withPath(fs, f).withFileContext(context).create();
|
||||
|
||||
KeyValue kv = new KeyValue(Bytes.toBytes("testkey1"), Bytes.toBytes("family"),
|
||||
Bytes.toBytes("qual"), Bytes.toBytes("testvalue"));
|
||||
KeyValue kv2 = new KeyValue(Bytes.toBytes("testkey2"), Bytes.toBytes("family"),
|
||||
Bytes.toBytes("qual"), Bytes.toBytes("testvalue"));
|
||||
KeyValue kv3 = new KeyValue(Bytes.toBytes("testkey3"), Bytes.toBytes("family"),
|
||||
Bytes.toBytes("qual"), Bytes.toBytes("testvalue"));
|
||||
|
||||
ByteBuffer buffer = ByteBuffer.wrap(kv.getBuffer());
|
||||
ByteBuffer buffer2 = ByteBuffer.wrap(kv2.getBuffer());
|
||||
ByteBuffer buffer3 = ByteBuffer.wrap(kv3.getBuffer());
|
||||
|
||||
writer.append(new ByteBufferKeyValue(buffer, 0, buffer.remaining()));
|
||||
writer.beforeShipped();
|
||||
|
||||
// pollute first cell's backing ByteBuffer
|
||||
ByteBufferUtils.copyFromBufferToBuffer(buffer3, buffer);
|
||||
|
||||
// write another cell, if DBE not Shipped, test will fail
|
||||
writer.append(new ByteBufferKeyValue(buffer2, 0, buffer2.remaining()));
|
||||
writer.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue