HBASE-27621 Also clear the Dictionary when resetting when reading compressed WAL file (#5016)
Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
parent
a854cba59f
commit
833b10e8ba
|
@ -107,7 +107,7 @@ public class TagCompressionContext {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
int endOffset = offset + length;
|
int endOffset = offset + length;
|
||||||
while (offset < endOffset) {
|
while (offset < endOffset) {
|
||||||
byte status = (byte) src.read();
|
byte status = StreamUtils.readByte(src);
|
||||||
if (status == Dictionary.NOT_IN_DICTIONARY) {
|
if (status == Dictionary.NOT_IN_DICTIONARY) {
|
||||||
int tagLen = StreamUtils.readRawVarint32(src);
|
int tagLen = StreamUtils.readRawVarint32(src);
|
||||||
offset = Bytes.putAsShort(dest, offset, tagLen);
|
offset = Bytes.putAsShort(dest, offset, tagLen);
|
||||||
|
@ -115,7 +115,7 @@ public class TagCompressionContext {
|
||||||
tagDict.addEntry(dest, offset, tagLen);
|
tagDict.addEntry(dest, offset, tagLen);
|
||||||
offset += tagLen;
|
offset += tagLen;
|
||||||
} else {
|
} else {
|
||||||
short dictIdx = StreamUtils.toShort(status, (byte) src.read());
|
short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(src));
|
||||||
byte[] entry = tagDict.getEntry(dictIdx);
|
byte[] entry = tagDict.getEntry(dictIdx);
|
||||||
if (entry == null) {
|
if (entry == null) {
|
||||||
throw new IOException("Missing dictionary entry for index " + dictIdx);
|
throw new IOException("Missing dictionary entry for index " + dictIdx);
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.io.util;
|
package org.apache.hadoop.hbase.io.util;
|
||||||
|
|
||||||
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
@ -206,6 +207,22 @@ public class StreamUtils {
|
||||||
return new Pair<>(result, newOffset - offset);
|
return new Pair<>(result, newOffset - offset);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read a byte from the given stream using the read method, and throw EOFException if it returns
|
||||||
|
* -1, like the implementation in {@code DataInputStream}.
|
||||||
|
* <p/>
|
||||||
|
* This is useful because casting the return value of read method into byte directly will make us
|
||||||
|
* lose the ability to check whether there is a byte and its value is -1 or we reach EOF, as
|
||||||
|
* casting int -1 to byte also returns -1.
|
||||||
|
*/
|
||||||
|
public static byte readByte(InputStream in) throws IOException {
|
||||||
|
int r = in.read();
|
||||||
|
if (r < 0) {
|
||||||
|
throw new EOFException();
|
||||||
|
}
|
||||||
|
return (byte) r;
|
||||||
|
}
|
||||||
|
|
||||||
public static short toShort(byte hi, byte lo) {
|
public static short toShort(byte hi, byte lo) {
|
||||||
short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
|
short s = (short) (((hi & 0xFF) << 8) | (lo & 0xFF));
|
||||||
Preconditions.checkArgument(s >= 0);
|
Preconditions.checkArgument(s >= 0);
|
||||||
|
|
|
@ -108,7 +108,9 @@ public class Compressor {
|
||||||
// if this isn't in the dictionary, we need to add to the dictionary.
|
// if this isn't in the dictionary, we need to add to the dictionary.
|
||||||
byte[] arr = new byte[length];
|
byte[] arr = new byte[length];
|
||||||
in.readFully(arr);
|
in.readFully(arr);
|
||||||
if (dict != null) dict.addEntry(arr, 0, length);
|
if (dict != null) {
|
||||||
|
dict.addEntry(arr, 0, length);
|
||||||
|
}
|
||||||
return arr;
|
return arr;
|
||||||
} else {
|
} else {
|
||||||
// Status here is the higher-order byte of index of the dictionary entry
|
// Status here is the higher-order byte of index of the dictionary entry
|
||||||
|
|
|
@ -98,6 +98,9 @@ public class ProtobufLogReader extends ReaderBase {
|
||||||
// cell codec classname
|
// cell codec classname
|
||||||
private String codecClsName = null;
|
private String codecClsName = null;
|
||||||
|
|
||||||
|
// a flag indicate that whether we need to reset compression context when seeking back
|
||||||
|
private boolean resetCompression;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public long trailerSize() {
|
public long trailerSize() {
|
||||||
if (trailerPresent) {
|
if (trailerPresent) {
|
||||||
|
@ -160,6 +163,9 @@ public class ProtobufLogReader extends ReaderBase {
|
||||||
@Override
|
@Override
|
||||||
public void reset() throws IOException {
|
public void reset() throws IOException {
|
||||||
String clsName = initInternal(null, false);
|
String clsName = initInternal(null, false);
|
||||||
|
if (resetCompression) {
|
||||||
|
resetCompression();
|
||||||
|
}
|
||||||
initAfterCompression(clsName); // We need a new decoder (at least).
|
initAfterCompression(clsName); // We need a new decoder (at least).
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -361,6 +367,8 @@ public class ProtobufLogReader extends ReaderBase {
|
||||||
WALKey.Builder builder = WALKey.newBuilder();
|
WALKey.Builder builder = WALKey.newBuilder();
|
||||||
long size = 0;
|
long size = 0;
|
||||||
boolean resetPosition = false;
|
boolean resetPosition = false;
|
||||||
|
// by default, we should reset the compression when seeking back after reading something
|
||||||
|
resetCompression = true;
|
||||||
try {
|
try {
|
||||||
long available = -1;
|
long available = -1;
|
||||||
try {
|
try {
|
||||||
|
@ -372,6 +380,14 @@ public class ProtobufLogReader extends ReaderBase {
|
||||||
// available may be < 0 on local fs for instance. If so, can't depend on it.
|
// available may be < 0 on local fs for instance. If so, can't depend on it.
|
||||||
available = this.inputStream.available();
|
available = this.inputStream.available();
|
||||||
if (available > 0 && available < size) {
|
if (available > 0 && available < size) {
|
||||||
|
// if we quit here, we have just read the length, no actual data yet, which means we
|
||||||
|
// haven't put anything into the compression dictionary yet, so when seeking back to the
|
||||||
|
// last good position, we do not need to reset compression context.
|
||||||
|
// This is very useful for saving the extra effort for reconstructing the compression
|
||||||
|
// dictionary, where we need to read from the beginning instead of just seek to the
|
||||||
|
// position, as DFSInputStream implement the available method, so in most cases we will
|
||||||
|
// reach here if there are not enough data.
|
||||||
|
resetCompression = false;
|
||||||
throw new EOFException("Available stream not enough for edit, "
|
throw new EOFException("Available stream not enough for edit, "
|
||||||
+ "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= "
|
+ "inputStream.available()= " + this.inputStream.available() + ", " + "entry size= "
|
||||||
+ size + " at offset = " + this.inputStream.getPos());
|
+ size + " at offset = " + this.inputStream.getPos());
|
||||||
|
|
|
@ -45,7 +45,7 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
|
||||||
* Compression context to use reading. Can be null if no compression.
|
* Compression context to use reading. Can be null if no compression.
|
||||||
*/
|
*/
|
||||||
protected CompressionContext compressionContext = null;
|
protected CompressionContext compressionContext = null;
|
||||||
protected boolean emptyCompressionContext = true;
|
private boolean emptyCompressionContext = true;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default constructor.
|
* Default constructor.
|
||||||
|
@ -130,6 +130,17 @@ public abstract class ReaderBase implements AbstractFSWALProvider.Reader {
|
||||||
seekOnFs(pos);
|
seekOnFs(pos);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear the {@link ReaderBase#compressionContext}, and also set {@link #emptyCompressionContext}
|
||||||
|
* to true, so when seeking, we will try to skip to the position and reconstruct the dictionary.
|
||||||
|
*/
|
||||||
|
protected final void resetCompression() {
|
||||||
|
if (compressionContext != null) {
|
||||||
|
compressionContext.clear();
|
||||||
|
emptyCompressionContext = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initializes the log reader with a particular stream (may be null). Reader assumes ownership of
|
* Initializes the log reader with a particular stream (may be null). Reader assumes ownership of
|
||||||
* the stream if not null and may use it. Called once.
|
* the stream if not null and may use it. Called once.
|
||||||
|
|
|
@ -197,18 +197,20 @@ public class WALCellCodec implements Codec {
|
||||||
|
|
||||||
private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
|
private static byte[] uncompressByteString(ByteString bs, Dictionary dict) throws IOException {
|
||||||
InputStream in = bs.newInput();
|
InputStream in = bs.newInput();
|
||||||
byte status = (byte) in.read();
|
byte status = StreamUtils.readByte(in);
|
||||||
if (status == Dictionary.NOT_IN_DICTIONARY) {
|
if (status == Dictionary.NOT_IN_DICTIONARY) {
|
||||||
byte[] arr = new byte[StreamUtils.readRawVarint32(in)];
|
byte[] arr = new byte[StreamUtils.readRawVarint32(in)];
|
||||||
int bytesRead = in.read(arr);
|
int bytesRead = in.read(arr);
|
||||||
if (bytesRead != arr.length) {
|
if (bytesRead != arr.length) {
|
||||||
throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
|
throw new IOException("Cannot read; wanted " + arr.length + ", but got " + bytesRead);
|
||||||
}
|
}
|
||||||
if (dict != null) dict.addEntry(arr, 0, arr.length);
|
if (dict != null) {
|
||||||
|
dict.addEntry(arr, 0, arr.length);
|
||||||
|
}
|
||||||
return arr;
|
return arr;
|
||||||
} else {
|
} else {
|
||||||
// Status here is the higher-order byte of index of the dictionary entry.
|
// Status here is the higher-order byte of index of the dictionary entry.
|
||||||
short dictIdx = StreamUtils.toShort(status, (byte) in.read());
|
short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in));
|
||||||
byte[] entry = dict.getEntry(dictIdx);
|
byte[] entry = dict.getEntry(dictIdx);
|
||||||
if (entry == null) {
|
if (entry == null) {
|
||||||
throw new IOException("Missing dictionary entry for index " + dictIdx);
|
throw new IOException("Missing dictionary entry for index " + dictIdx);
|
||||||
|
@ -350,7 +352,7 @@ public class WALCellCodec implements Codec {
|
||||||
}
|
}
|
||||||
|
|
||||||
private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException {
|
private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOException {
|
||||||
byte status = (byte) in.read();
|
byte status = StreamUtils.readByte(in);
|
||||||
if (status == Dictionary.NOT_IN_DICTIONARY) {
|
if (status == Dictionary.NOT_IN_DICTIONARY) {
|
||||||
// status byte indicating that data to be read is not in dictionary.
|
// status byte indicating that data to be read is not in dictionary.
|
||||||
// if this isn't in the dictionary, we need to add to the dictionary.
|
// if this isn't in the dictionary, we need to add to the dictionary.
|
||||||
|
@ -360,7 +362,7 @@ public class WALCellCodec implements Codec {
|
||||||
return length;
|
return length;
|
||||||
} else {
|
} else {
|
||||||
// the status byte also acts as the higher order byte of the dictionary entry.
|
// the status byte also acts as the higher order byte of the dictionary entry.
|
||||||
short dictIdx = StreamUtils.toShort(status, (byte) in.read());
|
short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(in));
|
||||||
byte[] entry = dict.getEntry(dictIdx);
|
byte[] entry = dict.getEntry(dictIdx);
|
||||||
if (entry == null) {
|
if (entry == null) {
|
||||||
throw new IOException("Missing dictionary entry for index " + dictIdx);
|
throw new IOException("Missing dictionary entry for index " + dictIdx);
|
||||||
|
|
|
@ -0,0 +1,251 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.NavigableMap;
|
||||||
|
import java.util.OptionalLong;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellBuilderFactory;
|
||||||
|
import org.apache.hadoop.hbase.CellBuilderType;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogWriter;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.io.ByteStreams;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enable compression and reset the WALEntryStream while reading in ReplicationSourceWALReader.
|
||||||
|
* <p/>
|
||||||
|
* This is used to confirm that we can work well when hitting EOFException in the middle when
|
||||||
|
* reading a WAL entry, when compression is enabled. See HBASE-27621 for more details.
|
||||||
|
*/
|
||||||
|
@Category({ ReplicationTests.class, MediumTests.class })
|
||||||
|
public class TestWALEntryStreamCompressionReset {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestWALEntryStreamCompressionReset.class);
|
||||||
|
|
||||||
|
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
|
||||||
|
|
||||||
|
private static TableName TABLE_NAME = TableName.valueOf("reset");
|
||||||
|
|
||||||
|
private static RegionInfo REGION_INFO = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
|
||||||
|
|
||||||
|
private static byte[] FAMILY = Bytes.toBytes("family");
|
||||||
|
|
||||||
|
private static MultiVersionConcurrencyControl MVCC = new MultiVersionConcurrencyControl();
|
||||||
|
|
||||||
|
private static NavigableMap<byte[], Integer> SCOPE;
|
||||||
|
|
||||||
|
private static String GROUP_ID = "group";
|
||||||
|
|
||||||
|
private static FileSystem FS;
|
||||||
|
|
||||||
|
private static ReplicationSource SOURCE;
|
||||||
|
|
||||||
|
private static MetricsSource METRICS_SOURCE;
|
||||||
|
|
||||||
|
private static ReplicationSourceLogQueue LOG_QUEUE;
|
||||||
|
|
||||||
|
private static Path TEMPLATE_WAL_FILE;
|
||||||
|
|
||||||
|
private static int END_OFFSET_OF_WAL_ENTRIES;
|
||||||
|
|
||||||
|
private static Path WAL_FILE;
|
||||||
|
|
||||||
|
private static volatile long WAL_LENGTH;
|
||||||
|
|
||||||
|
private static ReplicationSourceWALReader READER;
|
||||||
|
|
||||||
|
// return the wal path, and also the end offset of last wal entry
|
||||||
|
private static Pair<Path, Long> generateWAL() throws Exception {
|
||||||
|
Path path = UTIL.getDataTestDir("wal");
|
||||||
|
ProtobufLogWriter writer = new ProtobufLogWriter();
|
||||||
|
writer.init(FS, path, UTIL.getConfiguration(), false, FS.getDefaultBlockSize(path), null);
|
||||||
|
for (int i = 0; i < Byte.MAX_VALUE; i++) {
|
||||||
|
WALEdit edit = new WALEdit();
|
||||||
|
edit.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
|
||||||
|
.setRow(Bytes.toBytes(i)).setFamily(FAMILY).setQualifier(Bytes.toBytes("qualifier-" + i))
|
||||||
|
.setValue(Bytes.toBytes("v-" + i)).build());
|
||||||
|
writer.append(new WAL.Entry(new WALKeyImpl(REGION_INFO.getEncodedNameAsBytes(), TABLE_NAME,
|
||||||
|
EnvironmentEdgeManager.currentTime(), MVCC, SCOPE), edit));
|
||||||
|
}
|
||||||
|
|
||||||
|
WALEdit edit2 = new WALEdit();
|
||||||
|
edit2.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
|
||||||
|
.setRow(Bytes.toBytes(-1)).setFamily(FAMILY).setQualifier(Bytes.toBytes("qualifier"))
|
||||||
|
.setValue(Bytes.toBytes("vv")).build());
|
||||||
|
edit2.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Cell.Type.Put)
|
||||||
|
.setRow(Bytes.toBytes(-1)).setFamily(FAMILY).setQualifier(Bytes.toBytes("qualifier-1"))
|
||||||
|
.setValue(Bytes.toBytes("vvv")).build());
|
||||||
|
writer.append(new WAL.Entry(new WALKeyImpl(REGION_INFO.getEncodedNameAsBytes(), TABLE_NAME,
|
||||||
|
EnvironmentEdgeManager.currentTime(), MVCC, SCOPE), edit2));
|
||||||
|
writer.sync(false);
|
||||||
|
long offset = writer.getSyncedLength();
|
||||||
|
writer.close();
|
||||||
|
return Pair.newPair(path, offset);
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
Configuration conf = UTIL.getConfiguration();
|
||||||
|
FS = UTIL.getTestFileSystem();
|
||||||
|
conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
|
||||||
|
conf.setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, false);
|
||||||
|
conf.setInt("replication.source.maxretriesmultiplier", 1);
|
||||||
|
FS.mkdirs(UTIL.getDataTestDir());
|
||||||
|
Pair<Path, Long> pair = generateWAL();
|
||||||
|
TEMPLATE_WAL_FILE = pair.getFirst();
|
||||||
|
END_OFFSET_OF_WAL_ENTRIES = pair.getSecond().intValue();
|
||||||
|
WAL_FILE = UTIL.getDataTestDir("rep_source");
|
||||||
|
|
||||||
|
METRICS_SOURCE = new MetricsSource("reset");
|
||||||
|
SOURCE = mock(ReplicationSource.class);
|
||||||
|
when(SOURCE.isPeerEnabled()).thenReturn(true);
|
||||||
|
when(SOURCE.getWALFileLengthProvider()).thenReturn(p -> OptionalLong.of(WAL_LENGTH));
|
||||||
|
when(SOURCE.getServerWALsBelongTo())
|
||||||
|
.thenReturn(ServerName.valueOf("localhost", 12345, EnvironmentEdgeManager.currentTime()));
|
||||||
|
when(SOURCE.getSourceMetrics()).thenReturn(METRICS_SOURCE);
|
||||||
|
ReplicationSourceManager rsm = mock(ReplicationSourceManager.class);
|
||||||
|
when(rsm.getTotalBufferUsed()).thenReturn(new AtomicLong());
|
||||||
|
when(rsm.getTotalBufferLimit())
|
||||||
|
.thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
|
||||||
|
when(rsm.getGlobalMetrics()).thenReturn(mock(MetricsReplicationGlobalSourceSource.class));
|
||||||
|
when(SOURCE.getSourceManager()).thenReturn(rsm);
|
||||||
|
|
||||||
|
LOG_QUEUE = new ReplicationSourceLogQueue(conf, METRICS_SOURCE, SOURCE);
|
||||||
|
LOG_QUEUE.enqueueLog(WAL_FILE, GROUP_ID);
|
||||||
|
READER = new ReplicationSourceWALReader(FS, conf, LOG_QUEUE, 0, e -> e, SOURCE, GROUP_ID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
READER.setReaderRunning(false);
|
||||||
|
READER.join();
|
||||||
|
UTIL.cleanupTestDir();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void test(byte[] content, FSDataOutputStream out) throws Exception {
|
||||||
|
// minus 15 so the second entry is incomplete
|
||||||
|
// 15 is a magic number here, we want the reader parse the first cell but not the second cell,
|
||||||
|
// especially not the qualifier of the second cell. The value of the second cell is 'vvv', which
|
||||||
|
// is 3 bytes, plus 8 bytes timestamp, and also qualifier, family and row(which should have been
|
||||||
|
// compressed), so 15 is a proper value, of course 14 or 16 could also work here.
|
||||||
|
out.write(content, 0, END_OFFSET_OF_WAL_ENTRIES - 15);
|
||||||
|
out.hflush();
|
||||||
|
WAL_LENGTH = END_OFFSET_OF_WAL_ENTRIES - 15;
|
||||||
|
READER.start();
|
||||||
|
List<WAL.Entry> entries = new ArrayList<>();
|
||||||
|
for (;;) {
|
||||||
|
WALEntryBatch batch = READER.poll(1000);
|
||||||
|
if (batch == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
entries.addAll(batch.getWalEntries());
|
||||||
|
}
|
||||||
|
// should return all the entries except the last one
|
||||||
|
assertEquals(Byte.MAX_VALUE, entries.size());
|
||||||
|
for (int i = 0; i < Byte.MAX_VALUE; i++) {
|
||||||
|
WAL.Entry entry = entries.get(i);
|
||||||
|
assertEquals(1, entry.getEdit().size());
|
||||||
|
Cell cell = entry.getEdit().getCells().get(0);
|
||||||
|
assertEquals(i, Bytes.toInt(cell.getRowArray(), cell.getRowOffset()));
|
||||||
|
assertEquals(Bytes.toString(FAMILY),
|
||||||
|
Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()));
|
||||||
|
assertEquals("qualifier-" + i, Bytes.toString(cell.getQualifierArray(),
|
||||||
|
cell.getQualifierOffset(), cell.getQualifierLength()));
|
||||||
|
assertEquals("v-" + i,
|
||||||
|
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// confirm that we can not get the last one since it is incomplete
|
||||||
|
assertNull(READER.poll(1000));
|
||||||
|
// write the last byte out
|
||||||
|
out.write(content, END_OFFSET_OF_WAL_ENTRIES - 15, 15);
|
||||||
|
out.hflush();
|
||||||
|
WAL_LENGTH = END_OFFSET_OF_WAL_ENTRIES;
|
||||||
|
|
||||||
|
// should get the last entry
|
||||||
|
WALEntryBatch batch = READER.poll(10000);
|
||||||
|
assertEquals(1, batch.getNbEntries());
|
||||||
|
WAL.Entry entry = batch.getWalEntries().get(0);
|
||||||
|
assertEquals(2, entry.getEdit().size());
|
||||||
|
Cell cell2 = entry.getEdit().getCells().get(0);
|
||||||
|
assertEquals(-1, Bytes.toInt(cell2.getRowArray(), cell2.getRowOffset()));
|
||||||
|
assertEquals(Bytes.toString(FAMILY),
|
||||||
|
Bytes.toString(cell2.getFamilyArray(), cell2.getFamilyOffset(), cell2.getFamilyLength()));
|
||||||
|
assertEquals("qualifier", Bytes.toString(cell2.getQualifierArray(), cell2.getQualifierOffset(),
|
||||||
|
cell2.getQualifierLength()));
|
||||||
|
assertEquals("vv",
|
||||||
|
Bytes.toString(cell2.getValueArray(), cell2.getValueOffset(), cell2.getValueLength()));
|
||||||
|
|
||||||
|
Cell cell3 = entry.getEdit().getCells().get(1);
|
||||||
|
assertEquals(-1, Bytes.toInt(cell3.getRowArray(), cell3.getRowOffset()));
|
||||||
|
assertEquals(Bytes.toString(FAMILY),
|
||||||
|
Bytes.toString(cell3.getFamilyArray(), cell3.getFamilyOffset(), cell3.getFamilyLength()));
|
||||||
|
assertEquals("qualifier-1", Bytes.toString(cell3.getQualifierArray(),
|
||||||
|
cell3.getQualifierOffset(), cell3.getQualifierLength()));
|
||||||
|
assertEquals("vvv",
|
||||||
|
Bytes.toString(cell3.getValueArray(), cell3.getValueOffset(), cell3.getValueLength()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReset() throws Exception {
|
||||||
|
byte[] content;
|
||||||
|
try (FSDataInputStream in = FS.open(TEMPLATE_WAL_FILE)) {
|
||||||
|
content = ByteStreams.toByteArray(in);
|
||||||
|
}
|
||||||
|
try (FSDataOutputStream out = FS.create(WAL_FILE)) {
|
||||||
|
test(content, out);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue