HBASE-27637 Zero length value would cause value compressor read nothing and not advance the position of the InputStream (#5025)
Signed-off-by: Xiaolin Ha <haxiaolin@apache.org> Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
8ba56cccd3
commit
2bbe036e29
|
@ -25,6 +25,7 @@ import java.lang.reflect.Constructor;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.util.EnumMap;
|
import java.util.EnumMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream;
|
import org.apache.hadoop.hbase.io.BoundedDelegatingInputStream;
|
||||||
|
@ -105,7 +106,7 @@ public class CompressionContext {
|
||||||
return compressed;
|
return compressed;
|
||||||
}
|
}
|
||||||
|
|
||||||
public int decompress(InputStream in, int inLength, byte[] outArray, int outOffset,
|
public void decompress(InputStream in, int inLength, byte[] outArray, int outOffset,
|
||||||
int outLength) throws IOException {
|
int outLength) throws IOException {
|
||||||
|
|
||||||
// Our input is a sequence of bounded byte ranges (call them segments), with
|
// Our input is a sequence of bounded byte ranges (call them segments), with
|
||||||
|
@ -122,11 +123,16 @@ public class CompressionContext {
|
||||||
} else {
|
} else {
|
||||||
lowerIn.setDelegate(in, inLength);
|
lowerIn.setDelegate(in, inLength);
|
||||||
}
|
}
|
||||||
|
if (outLength == 0) {
|
||||||
// Caller must handle short reads.
|
// The BufferedInputStream will return earlier and skip reading anything if outLength == 0,
|
||||||
// With current Hadoop compression codecs all 'outLength' bytes are read in here, so not
|
// but in fact for an empty value, the compressed output still contains some metadata so the
|
||||||
// an issue for now.
|
// compressed size is not 0, so here we need to manually skip inLength bytes otherwise the
|
||||||
return compressedIn.read(outArray, outOffset, outLength);
|
// next read on this stream will start from an invalid position and cause critical problem,
|
||||||
|
// such as data loss when splitting wal or replicating wal.
|
||||||
|
IOUtils.skipFully(in, inLength);
|
||||||
|
} else {
|
||||||
|
IOUtils.readFully(compressedIn, outArray, outOffset, outLength);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clear() {
|
public void clear() {
|
||||||
|
|
|
@ -382,13 +382,9 @@ public class WALCellCodec implements Codec {
|
||||||
private void readCompressedValue(InputStream in, byte[] outArray, int outOffset,
|
private void readCompressedValue(InputStream in, byte[] outArray, int outOffset,
|
||||||
int expectedLength) throws IOException {
|
int expectedLength) throws IOException {
|
||||||
int compressedLen = StreamUtils.readRawVarint32(in);
|
int compressedLen = StreamUtils.readRawVarint32(in);
|
||||||
int read = compression.getValueCompressor().decompress(in, compressedLen, outArray, outOffset,
|
compression.getValueCompressor().decompress(in, compressedLen, outArray, outOffset,
|
||||||
expectedLength);
|
expectedLength);
|
||||||
if (read != expectedLength) {
|
|
||||||
throw new IOException("ValueCompressor state error: short read");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class EnsureKvEncoder extends BaseEncoder {
|
public static class EnsureKvEncoder extends BaseEncoder {
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.wal;
|
package org.apache.hadoop.hbase.wal;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.hamcrest.Matchers.hasSize;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
@ -26,8 +28,9 @@ import java.util.NavigableMap;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellBuilderFactory;
|
||||||
|
import org.apache.hadoop.hbase.CellBuilderType;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
|
@ -82,33 +85,42 @@ public class CompressedWALTestBase {
|
||||||
|
|
||||||
for (int i = 0; i < total; i++) {
|
for (int i = 0; i < total; i++) {
|
||||||
WALEdit kvs = new WALEdit();
|
WALEdit kvs = new WALEdit();
|
||||||
kvs.add(new KeyValue(row, family, Bytes.toBytes(i), value));
|
kvs.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
|
||||||
|
.setRow(row).setFamily(family).setQualifier(Bytes.toBytes(i)).setValue(value).build());
|
||||||
|
kvs.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
|
||||||
|
.setType(Cell.Type.DeleteFamily).setRow(row).setFamily(family).build());
|
||||||
wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
|
wal.appendData(regionInfo, new WALKeyImpl(regionInfo.getEncodedNameAsBytes(), tableName,
|
||||||
System.currentTimeMillis(), mvcc, scopes), kvs);
|
System.currentTimeMillis(), mvcc, scopes), kvs);
|
||||||
|
wal.sync();
|
||||||
}
|
}
|
||||||
wal.sync();
|
|
||||||
final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
|
final Path walPath = AbstractFSWALProvider.getCurrentFileName(wal);
|
||||||
wals.shutdown();
|
wals.shutdown();
|
||||||
|
|
||||||
// Confirm the WAL can be read back
|
// Confirm the WAL can be read back
|
||||||
WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath);
|
try (WAL.Reader reader = wals.createReader(TEST_UTIL.getTestFileSystem(), walPath)) {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
WAL.Entry entry = new WAL.Entry();
|
WAL.Entry entry = new WAL.Entry();
|
||||||
while (reader.next(entry) != null) {
|
while (reader.next(entry) != null) {
|
||||||
count++;
|
count++;
|
||||||
List<Cell> cells = entry.getEdit().getCells();
|
List<Cell> cells = entry.getEdit().getCells();
|
||||||
assertTrue("Should be one KV per WALEdit", cells.size() == 1);
|
assertThat("Should be two KVs per WALEdit", cells, hasSize(2));
|
||||||
for (Cell cell : cells) {
|
Cell putCell = cells.get(0);
|
||||||
assertTrue("Incorrect row", Bytes.equals(cell.getRowArray(), cell.getRowOffset(),
|
assertEquals(Cell.Type.Put, putCell.getType());
|
||||||
cell.getRowLength(), row, 0, row.length));
|
assertTrue("Incorrect row", Bytes.equals(putCell.getRowArray(), putCell.getRowOffset(),
|
||||||
assertTrue("Incorrect family", Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(),
|
putCell.getRowLength(), row, 0, row.length));
|
||||||
cell.getFamilyLength(), family, 0, family.length));
|
assertTrue("Incorrect family", Bytes.equals(putCell.getFamilyArray(),
|
||||||
assertTrue("Incorrect value", Bytes.equals(cell.getValueArray(), cell.getValueOffset(),
|
putCell.getFamilyOffset(), putCell.getFamilyLength(), family, 0, family.length));
|
||||||
cell.getValueLength(), value, 0, value.length));
|
assertTrue("Incorrect value", Bytes.equals(putCell.getValueArray(),
|
||||||
}
|
putCell.getValueOffset(), putCell.getValueLength(), value, 0, value.length));
|
||||||
}
|
|
||||||
assertEquals("Should have read back as many KVs as written", total, count);
|
|
||||||
reader.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
Cell deleteCell = cells.get(1);
|
||||||
|
assertEquals(Cell.Type.DeleteFamily, deleteCell.getType());
|
||||||
|
assertTrue("Incorrect row", Bytes.equals(deleteCell.getRowArray(),
|
||||||
|
deleteCell.getRowOffset(), deleteCell.getRowLength(), row, 0, row.length));
|
||||||
|
assertTrue("Incorrect family", Bytes.equals(deleteCell.getFamilyArray(),
|
||||||
|
deleteCell.getFamilyOffset(), deleteCell.getFamilyLength(), family, 0, family.length));
|
||||||
|
}
|
||||||
|
assertEquals("Should have read back as many KVs as written", total, count);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.wal;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
|
import org.apache.hadoop.hbase.HBaseCommonTestingUtil;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.io.compress.Compression;
|
import org.apache.hadoop.hbase.io.compress.Compression;
|
||||||
|
@ -46,9 +45,7 @@ public class TestCompressedWALValueCompression extends CompressedWALTestBase {
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestCompressedWALValueCompression.class);
|
HBaseClassTestRule.forClass(TestCompressedWALValueCompression.class);
|
||||||
|
|
||||||
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
|
@Parameters(name = "{index}: compression={0}")
|
||||||
|
|
||||||
@Parameters
|
|
||||||
public static List<Object[]> params() {
|
public static List<Object[]> params() {
|
||||||
return HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS_PARAMETERIZED;
|
return HBaseCommonTestingUtil.COMPRESSION_ALGORITHMS_PARAMETERIZED;
|
||||||
}
|
}
|
||||||
|
@ -81,5 +78,4 @@ public class TestCompressedWALValueCompression extends CompressedWALTestBase {
|
||||||
TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
|
TableName tableName = TableName.valueOf(name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_"));
|
||||||
doTest(tableName);
|
doTest(tableName);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue