HBASE-26020 Split TestWALEntryStream.testDifferentCounts out (#3409)
Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
parent
8d1473d321
commit
b453cbd819
|
@ -39,7 +39,7 @@ public class TableNameTestRule extends TestWatcher {
|
||||||
* This helper strips out the parameter suffixes.
|
* This helper strips out the parameter suffixes.
|
||||||
* @return current test method name with out parameterized suffixes.
|
* @return current test method name with out parameterized suffixes.
|
||||||
*/
|
*/
|
||||||
private static String cleanUpTestName(String methodName) {
|
public static String cleanUpTestName(String methodName) {
|
||||||
int index = methodName.indexOf('[');
|
int index = methodName.indexOf('[');
|
||||||
if (index == -1) {
|
if (index == -1) {
|
||||||
return methodName;
|
return methodName;
|
||||||
|
|
|
@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Mockito.doNothing;
|
import static org.mockito.Mockito.doNothing;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -45,186 +46,36 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.Waiter;
|
|
||||||
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
|
||||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
|
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
|
||||||
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||||
import org.apache.hadoop.hbase.wal.WALProvider;
|
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.ClassRule;
|
|
||||||
import org.junit.Rule;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
import org.junit.rules.TestName;
|
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||||
|
|
||||||
@Category({ ReplicationTests.class, LargeTests.class })
|
public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
|
||||||
public class TestWALEntryStream {
|
|
||||||
|
|
||||||
@ClassRule
|
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
|
||||||
HBaseClassTestRule.forClass(TestWALEntryStream.class);
|
|
||||||
|
|
||||||
private static final long TEST_TIMEOUT_MS = 5000;
|
|
||||||
protected static HBaseTestingUtility TEST_UTIL;
|
|
||||||
protected static Configuration CONF;
|
|
||||||
protected static FileSystem fs;
|
|
||||||
protected static MiniDFSCluster cluster;
|
|
||||||
private static final TableName tableName = TableName.valueOf("tablename");
|
|
||||||
private static final byte[] family = Bytes.toBytes("column");
|
|
||||||
private static final byte[] qualifier = Bytes.toBytes("qualifier");
|
|
||||||
private static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName)
|
|
||||||
.setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build();
|
|
||||||
private static final NavigableMap<byte[], Integer> scopes = getScopes();
|
|
||||||
private final String fakeWalGroupId = "fake-wal-group-id";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Test helper that waits until a non-null entry is available in the stream next or times out.
|
|
||||||
* A {@link WALEntryStream} provides a streaming access to a queue of log files. Since the stream
|
|
||||||
* can be consumed as the file is being written, callers relying on {@link WALEntryStream#next()}
|
|
||||||
* may need to retry multiple times before an entry appended to the WAL is visible to the stream
|
|
||||||
* consumers. One such cause of delay is the close() of writer writing these log files. While the
|
|
||||||
* closure is in progress, the stream does not switch to the next log in the queue and next() may
|
|
||||||
* return null entries. This utility wraps these retries into a single next call and that makes
|
|
||||||
* the test code simpler.
|
|
||||||
*/
|
|
||||||
private static class WALEntryStreamWithRetries extends WALEntryStream {
|
|
||||||
// Class member to be able to set a non-final from within a lambda.
|
|
||||||
private Entry result;
|
|
||||||
|
|
||||||
public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, Configuration conf,
|
|
||||||
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
|
|
||||||
MetricsSource metrics, String walGroupId) throws IOException {
|
|
||||||
super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, walGroupId);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Entry next() {
|
|
||||||
Waiter.waitFor(CONF, TEST_TIMEOUT_MS, () -> (
|
|
||||||
result = WALEntryStreamWithRetries.super.next()) != null);
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static NavigableMap<byte[], Integer> getScopes() {
|
|
||||||
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
|
||||||
scopes.put(family, 1);
|
|
||||||
return scopes;
|
|
||||||
}
|
|
||||||
|
|
||||||
private WAL log;
|
|
||||||
ReplicationSourceLogQueue logQueue;
|
|
||||||
private PathWatcher pathWatcher;
|
|
||||||
|
|
||||||
@Rule
|
|
||||||
public TestName tn = new TestName();
|
|
||||||
private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setUpBeforeClass() throws Exception {
|
|
||||||
TEST_UTIL = new HBaseTestingUtility();
|
|
||||||
CONF = TEST_UTIL.getConfiguration();
|
|
||||||
CONF.setLong("replication.source.sleepforretries", 10);
|
|
||||||
TEST_UTIL.startMiniDFSCluster(3);
|
|
||||||
|
|
||||||
cluster = TEST_UTIL.getDFSCluster();
|
|
||||||
fs = cluster.getFileSystem();
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void tearDownAfterClass() throws Exception {
|
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
ReplicationSource source = mock(ReplicationSource.class);
|
initWAL();
|
||||||
MetricsSource metricsSource = new MetricsSource("2");
|
|
||||||
// Source with the same id is shared and carries values from the last run
|
|
||||||
metricsSource.clear();
|
|
||||||
logQueue = new ReplicationSourceLogQueue(CONF, metricsSource, source);
|
|
||||||
pathWatcher = new PathWatcher();
|
|
||||||
final WALFactory wals = new WALFactory(CONF, tn.getMethodName());
|
|
||||||
wals.getWALProvider().addWALActionsListener(pathWatcher);
|
|
||||||
log = wals.getWAL(info);
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void tearDown() throws Exception {
|
|
||||||
if (log != null) {
|
|
||||||
log.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try out different combinations of row count and KeyValue count
|
|
||||||
@Test
|
|
||||||
public void testDifferentCounts() throws Exception {
|
|
||||||
int[] NB_ROWS = { 1500, 60000 };
|
|
||||||
int[] NB_KVS = { 1, 100 };
|
|
||||||
// whether compression is used
|
|
||||||
Boolean[] BOOL_VALS = { false, true };
|
|
||||||
// long lastPosition = 0;
|
|
||||||
for (int nbRows : NB_ROWS) {
|
|
||||||
for (int walEditKVs : NB_KVS) {
|
|
||||||
for (boolean isCompressionEnabled : BOOL_VALS) {
|
|
||||||
TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
|
|
||||||
isCompressionEnabled);
|
|
||||||
mvcc.advanceTo(1);
|
|
||||||
|
|
||||||
for (int i = 0; i < nbRows; i++) {
|
|
||||||
appendToLogAndSync(walEditKVs);
|
|
||||||
}
|
|
||||||
|
|
||||||
log.rollWriter();
|
|
||||||
|
|
||||||
try (WALEntryStream entryStream =
|
|
||||||
new WALEntryStream(logQueue, CONF, 0, log, null,
|
|
||||||
new MetricsSource("1"), fakeWalGroupId)) {
|
|
||||||
int i = 0;
|
|
||||||
while (entryStream.hasNext()) {
|
|
||||||
assertNotNull(entryStream.next());
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
assertEquals(nbRows, i);
|
|
||||||
|
|
||||||
// should've read all entries
|
|
||||||
assertFalse(entryStream.hasNext());
|
|
||||||
}
|
|
||||||
// reset everything for next loop
|
|
||||||
log.close();
|
|
||||||
setUp();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -235,7 +86,7 @@ public class TestWALEntryStream {
|
||||||
appendToLogAndSync();
|
appendToLogAndSync();
|
||||||
long oldPos;
|
long oldPos;
|
||||||
try (WALEntryStream entryStream =
|
try (WALEntryStream entryStream =
|
||||||
new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||||
// There's one edit in the log, read it. Reading past it needs to throw exception
|
// There's one edit in the log, read it. Reading past it needs to throw exception
|
||||||
assertTrue(entryStream.hasNext());
|
assertTrue(entryStream.hasNext());
|
||||||
WAL.Entry entry = entryStream.peek();
|
WAL.Entry entry = entryStream.peek();
|
||||||
|
@ -249,8 +100,8 @@ public class TestWALEntryStream {
|
||||||
|
|
||||||
appendToLogAndSync();
|
appendToLogAndSync();
|
||||||
|
|
||||||
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos,
|
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, log,
|
||||||
log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||||
// Read the newly added entry, make sure we made progress
|
// Read the newly added entry, make sure we made progress
|
||||||
WAL.Entry entry = entryStream.next();
|
WAL.Entry entry = entryStream.next();
|
||||||
assertNotEquals(oldPos, entryStream.getPosition());
|
assertNotEquals(oldPos, entryStream.getPosition());
|
||||||
|
@ -263,8 +114,8 @@ public class TestWALEntryStream {
|
||||||
log.rollWriter();
|
log.rollWriter();
|
||||||
appendToLogAndSync();
|
appendToLogAndSync();
|
||||||
|
|
||||||
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos,
|
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, log,
|
||||||
log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||||
WAL.Entry entry = entryStream.next();
|
WAL.Entry entry = entryStream.next();
|
||||||
assertNotEquals(oldPos, entryStream.getPosition());
|
assertNotEquals(oldPos, entryStream.getPosition());
|
||||||
assertNotNull(entry);
|
assertNotNull(entry);
|
||||||
|
@ -288,9 +139,8 @@ public class TestWALEntryStream {
|
||||||
public void testLogrollWhileStreaming() throws Exception {
|
public void testLogrollWhileStreaming() throws Exception {
|
||||||
appendToLog("1");
|
appendToLog("1");
|
||||||
appendToLog("2");// 2
|
appendToLog("2");// 2
|
||||||
try (WALEntryStream entryStream =
|
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null,
|
||||||
new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null,
|
new MetricsSource("1"), fakeWalGroupId)) {
|
||||||
new MetricsSource("1"), fakeWalGroupId)) {
|
|
||||||
assertEquals("1", getRow(entryStream.next()));
|
assertEquals("1", getRow(entryStream.next()));
|
||||||
|
|
||||||
appendToLog("3"); // 3 - comes in after reader opened
|
appendToLog("3"); // 3 - comes in after reader opened
|
||||||
|
@ -299,7 +149,7 @@ public class TestWALEntryStream {
|
||||||
|
|
||||||
assertEquals("2", getRow(entryStream.next()));
|
assertEquals("2", getRow(entryStream.next()));
|
||||||
assertEquals(2, getQueue().size()); // we should not have dequeued yet since there's still an
|
assertEquals(2, getQueue().size()); // we should not have dequeued yet since there's still an
|
||||||
// entry in first log
|
// entry in first log
|
||||||
assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4
|
assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4
|
||||||
// and 3 would be skipped
|
// and 3 would be skipped
|
||||||
assertEquals("4", getRow(entryStream.next())); // 4
|
assertEquals("4", getRow(entryStream.next())); // 4
|
||||||
|
@ -316,8 +166,7 @@ public class TestWALEntryStream {
|
||||||
public void testNewEntriesWhileStreaming() throws Exception {
|
public void testNewEntriesWhileStreaming() throws Exception {
|
||||||
appendToLog("1");
|
appendToLog("1");
|
||||||
try (WALEntryStream entryStream =
|
try (WALEntryStream entryStream =
|
||||||
new WALEntryStream(logQueue, CONF, 0, log, null,
|
new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||||
new MetricsSource("1"), fakeWalGroupId)) {
|
|
||||||
entryStream.next(); // we've hit the end of the stream at this point
|
entryStream.next(); // we've hit the end of the stream at this point
|
||||||
|
|
||||||
// some new entries come in while we're streaming
|
// some new entries come in while we're streaming
|
||||||
|
@ -340,17 +189,15 @@ public class TestWALEntryStream {
|
||||||
long lastPosition = 0;
|
long lastPosition = 0;
|
||||||
appendToLog("1");
|
appendToLog("1");
|
||||||
try (WALEntryStream entryStream =
|
try (WALEntryStream entryStream =
|
||||||
new WALEntryStream(logQueue, CONF, 0, log, null,
|
new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||||
new MetricsSource("1"), fakeWalGroupId)) {
|
|
||||||
entryStream.next(); // we've hit the end of the stream at this point
|
entryStream.next(); // we've hit the end of the stream at this point
|
||||||
appendToLog("2");
|
appendToLog("2");
|
||||||
appendToLog("3");
|
appendToLog("3");
|
||||||
lastPosition = entryStream.getPosition();
|
lastPosition = entryStream.getPosition();
|
||||||
}
|
}
|
||||||
// next stream should picks up where we left off
|
// next stream should picks up where we left off
|
||||||
try (WALEntryStream entryStream =
|
try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null,
|
||||||
new WALEntryStream(logQueue, CONF, lastPosition, log, null,
|
new MetricsSource("1"), fakeWalGroupId)) {
|
||||||
new MetricsSource("1"), fakeWalGroupId)) {
|
|
||||||
assertEquals("2", getRow(entryStream.next()));
|
assertEquals("2", getRow(entryStream.next()));
|
||||||
assertEquals("3", getRow(entryStream.next()));
|
assertEquals("3", getRow(entryStream.next()));
|
||||||
assertFalse(entryStream.hasNext()); // done
|
assertFalse(entryStream.hasNext()); // done
|
||||||
|
@ -368,27 +215,24 @@ public class TestWALEntryStream {
|
||||||
long lastPosition = 0;
|
long lastPosition = 0;
|
||||||
appendEntriesToLogAndSync(3);
|
appendEntriesToLogAndSync(3);
|
||||||
// read only one element
|
// read only one element
|
||||||
try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition,
|
try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null,
|
||||||
log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
new MetricsSource("1"), fakeWalGroupId)) {
|
||||||
entryStream.next();
|
entryStream.next();
|
||||||
lastPosition = entryStream.getPosition();
|
lastPosition = entryStream.getPosition();
|
||||||
}
|
}
|
||||||
// there should still be two more entries from where we left off
|
// there should still be two more entries from where we left off
|
||||||
try (WALEntryStream entryStream =
|
try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null,
|
||||||
new WALEntryStream(logQueue, CONF, lastPosition, log, null,
|
new MetricsSource("1"), fakeWalGroupId)) {
|
||||||
new MetricsSource("1"), fakeWalGroupId)) {
|
|
||||||
assertNotNull(entryStream.next());
|
assertNotNull(entryStream.next());
|
||||||
assertNotNull(entryStream.next());
|
assertNotNull(entryStream.next());
|
||||||
assertFalse(entryStream.hasNext());
|
assertFalse(entryStream.hasNext());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEmptyStream() throws Exception {
|
public void testEmptyStream() throws Exception {
|
||||||
try (WALEntryStream entryStream =
|
try (WALEntryStream entryStream =
|
||||||
new WALEntryStream(logQueue, CONF, 0, log, null,
|
new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||||
new MetricsSource("1"), fakeWalGroupId)) {
|
|
||||||
assertFalse(entryStream.hasNext());
|
assertFalse(entryStream.hasNext());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -398,9 +242,9 @@ public class TestWALEntryStream {
|
||||||
Map<String, byte[]> attributes = new HashMap<String, byte[]>();
|
Map<String, byte[]> attributes = new HashMap<String, byte[]>();
|
||||||
attributes.put("foo", Bytes.toBytes("foo-value"));
|
attributes.put("foo", Bytes.toBytes("foo-value"));
|
||||||
attributes.put("bar", Bytes.toBytes("bar-value"));
|
attributes.put("bar", Bytes.toBytes("bar-value"));
|
||||||
WALKeyImpl key = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
|
WALKeyImpl key =
|
||||||
System.currentTimeMillis(), new ArrayList<UUID>(), 0L, 0L,
|
new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, EnvironmentEdgeManager.currentTime(),
|
||||||
mvcc, scopes, attributes);
|
new ArrayList<UUID>(), 0L, 0L, mvcc, scopes, attributes);
|
||||||
Assert.assertEquals(attributes, key.getExtendedAttributes());
|
Assert.assertEquals(attributes, key.getExtendedAttributes());
|
||||||
|
|
||||||
WALProtos.WALKey.Builder builder = key.getBuilder(WALCellCodec.getNoneCompressor());
|
WALProtos.WALKey.Builder builder = key.getBuilder(WALCellCodec.getNoneCompressor());
|
||||||
|
@ -409,12 +253,12 @@ public class TestWALEntryStream {
|
||||||
WALKeyImpl deserializedKey = new WALKeyImpl();
|
WALKeyImpl deserializedKey = new WALKeyImpl();
|
||||||
deserializedKey.readFieldsFromPb(serializedKey, WALCellCodec.getNoneUncompressor());
|
deserializedKey.readFieldsFromPb(serializedKey, WALCellCodec.getNoneUncompressor());
|
||||||
|
|
||||||
//equals() only checks region name, sequence id and write time
|
// equals() only checks region name, sequence id and write time
|
||||||
Assert.assertEquals(key, deserializedKey);
|
Assert.assertEquals(key, deserializedKey);
|
||||||
//can't use Map.equals() because byte arrays use reference equality
|
// can't use Map.equals() because byte arrays use reference equality
|
||||||
Assert.assertEquals(key.getExtendedAttributes().keySet(),
|
Assert.assertEquals(key.getExtendedAttributes().keySet(),
|
||||||
deserializedKey.getExtendedAttributes().keySet());
|
deserializedKey.getExtendedAttributes().keySet());
|
||||||
for (Map.Entry<String, byte[]> entry : deserializedKey.getExtendedAttributes().entrySet()){
|
for (Map.Entry<String, byte[]> entry : deserializedKey.getExtendedAttributes().entrySet()) {
|
||||||
Assert.assertArrayEquals(key.getExtendedAttribute(entry.getKey()), entry.getValue());
|
Assert.assertArrayEquals(key.getExtendedAttribute(entry.getKey()), entry.getValue());
|
||||||
}
|
}
|
||||||
Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes());
|
Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes());
|
||||||
|
@ -423,8 +267,8 @@ public class TestWALEntryStream {
|
||||||
private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
|
private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
|
||||||
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
|
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
|
||||||
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
|
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
|
||||||
when(mockSourceManager.getTotalBufferLimit()).thenReturn(
|
when(mockSourceManager.getTotalBufferLimit())
|
||||||
(long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
|
.thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
|
||||||
Server mockServer = Mockito.mock(Server.class);
|
Server mockServer = Mockito.mock(Server.class);
|
||||||
ReplicationSource source = Mockito.mock(ReplicationSource.class);
|
ReplicationSource source = Mockito.mock(ReplicationSource.class);
|
||||||
when(source.getSourceManager()).thenReturn(mockSourceManager);
|
when(source.getSourceManager()).thenReturn(mockSourceManager);
|
||||||
|
@ -432,8 +276,8 @@ public class TestWALEntryStream {
|
||||||
when(source.getWALFileLengthProvider()).thenReturn(log);
|
when(source.getWALFileLengthProvider()).thenReturn(log);
|
||||||
when(source.getServer()).thenReturn(mockServer);
|
when(source.getServer()).thenReturn(mockServer);
|
||||||
when(source.isRecovered()).thenReturn(recovered);
|
when(source.isRecovered()).thenReturn(recovered);
|
||||||
MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock(
|
MetricsReplicationGlobalSourceSource globalMetrics =
|
||||||
MetricsReplicationGlobalSourceSource.class);
|
Mockito.mock(MetricsReplicationGlobalSourceSource.class);
|
||||||
when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
|
when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
|
||||||
return source;
|
return source;
|
||||||
}
|
}
|
||||||
|
@ -441,20 +285,18 @@ public class TestWALEntryStream {
|
||||||
private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
|
private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
|
||||||
ReplicationSource source = mockReplicationSource(recovered, conf);
|
ReplicationSource source = mockReplicationSource(recovered, conf);
|
||||||
when(source.isPeerEnabled()).thenReturn(true);
|
when(source.isPeerEnabled()).thenReturn(true);
|
||||||
ReplicationSourceWALReader reader =
|
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0,
|
||||||
new ReplicationSourceWALReader(fs, conf, logQueue, 0, getDummyFilter(), source,
|
getDummyFilter(), source, fakeWalGroupId);
|
||||||
fakeWalGroupId);
|
|
||||||
reader.start();
|
reader.start();
|
||||||
return reader;
|
return reader;
|
||||||
}
|
}
|
||||||
|
|
||||||
private ReplicationSourceWALReader createReaderWithBadReplicationFilter(int numFailures,
|
private ReplicationSourceWALReader createReaderWithBadReplicationFilter(int numFailures,
|
||||||
Configuration conf) {
|
Configuration conf) {
|
||||||
ReplicationSource source = mockReplicationSource(false, conf);
|
ReplicationSource source = mockReplicationSource(false, conf);
|
||||||
when(source.isPeerEnabled()).thenReturn(true);
|
when(source.isPeerEnabled()).thenReturn(true);
|
||||||
ReplicationSourceWALReader reader =
|
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0,
|
||||||
new ReplicationSourceWALReader(fs, conf, logQueue, 0,
|
getIntermittentFailingFilter(numFailures), source, fakeWalGroupId);
|
||||||
getIntermittentFailingFilter(numFailures), source, fakeWalGroupId);
|
|
||||||
reader.start();
|
reader.start();
|
||||||
return reader;
|
return reader;
|
||||||
}
|
}
|
||||||
|
@ -465,8 +307,7 @@ public class TestWALEntryStream {
|
||||||
// get ending position
|
// get ending position
|
||||||
long position;
|
long position;
|
||||||
try (WALEntryStream entryStream =
|
try (WALEntryStream entryStream =
|
||||||
new WALEntryStream(logQueue, CONF, 0, log, null,
|
new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||||
new MetricsSource("1"), fakeWalGroupId)) {
|
|
||||||
entryStream.next();
|
entryStream.next();
|
||||||
entryStream.next();
|
entryStream.next();
|
||||||
entryStream.next();
|
entryStream.next();
|
||||||
|
@ -497,8 +338,7 @@ public class TestWALEntryStream {
|
||||||
// get ending position
|
// get ending position
|
||||||
long position;
|
long position;
|
||||||
try (WALEntryStream entryStream =
|
try (WALEntryStream entryStream =
|
||||||
new WALEntryStream(logQueue, CONF, 0, log, null,
|
new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||||
new MetricsSource("1"), fakeWalGroupId)) {
|
|
||||||
entryStream.next();
|
entryStream.next();
|
||||||
entryStream.next();
|
entryStream.next();
|
||||||
entryStream.next();
|
entryStream.next();
|
||||||
|
@ -508,8 +348,8 @@ public class TestWALEntryStream {
|
||||||
// start up a reader
|
// start up a reader
|
||||||
Path walPath = getQueue().peek();
|
Path walPath = getQueue().peek();
|
||||||
int numFailuresInFilter = 5;
|
int numFailuresInFilter = 5;
|
||||||
ReplicationSourceWALReader reader = createReaderWithBadReplicationFilter(
|
ReplicationSourceWALReader reader =
|
||||||
numFailuresInFilter, CONF);
|
createReaderWithBadReplicationFilter(numFailuresInFilter, CONF);
|
||||||
WALEntryBatch entryBatch = reader.take();
|
WALEntryBatch entryBatch = reader.take();
|
||||||
assertEquals(numFailuresInFilter, FailingWALEntryFilter.numFailures());
|
assertEquals(numFailuresInFilter, FailingWALEntryFilter.numFailures());
|
||||||
|
|
||||||
|
@ -565,7 +405,7 @@ public class TestWALEntryStream {
|
||||||
@Override
|
@Override
|
||||||
public boolean evaluate() throws Exception {
|
public boolean evaluate() throws Exception {
|
||||||
return fs.getFileStatus(walPath).getLen() > 0 &&
|
return fs.getFileStatus(walPath).getLen() > 0 &&
|
||||||
((AbstractFSWAL) log).getInflightWALCloseCount() == 0;
|
((AbstractFSWAL<?>) log).getInflightWALCloseCount() == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -608,13 +448,12 @@ public class TestWALEntryStream {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReplicationSourceWALReaderDisabled()
|
public void testReplicationSourceWALReaderDisabled()
|
||||||
throws IOException, InterruptedException, ExecutionException {
|
throws IOException, InterruptedException, ExecutionException {
|
||||||
appendEntriesToLogAndSync(3);
|
appendEntriesToLogAndSync(3);
|
||||||
// get ending position
|
// get ending position
|
||||||
long position;
|
long position;
|
||||||
try (WALEntryStream entryStream =
|
try (WALEntryStream entryStream =
|
||||||
new WALEntryStream(logQueue, CONF, 0, log, null,
|
new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||||
new MetricsSource("1"), fakeWalGroupId)) {
|
|
||||||
entryStream.next();
|
entryStream.next();
|
||||||
entryStream.next();
|
entryStream.next();
|
||||||
entryStream.next();
|
entryStream.next();
|
||||||
|
@ -631,9 +470,8 @@ public class TestWALEntryStream {
|
||||||
return enabled.get();
|
return enabled.get();
|
||||||
});
|
});
|
||||||
|
|
||||||
ReplicationSourceWALReader reader =
|
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, CONF, logQueue, 0,
|
||||||
new ReplicationSourceWALReader(fs, CONF, logQueue, 0, getDummyFilter(),
|
getDummyFilter(), source, fakeWalGroupId);
|
||||||
source, fakeWalGroupId);
|
|
||||||
reader.start();
|
reader.start();
|
||||||
Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> {
|
Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> {
|
||||||
return reader.take();
|
return reader.take();
|
||||||
|
@ -660,9 +498,8 @@ public class TestWALEntryStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void appendToLog(String key) throws IOException {
|
private void appendToLog(String key) throws IOException {
|
||||||
final long txid = log.appendData(info,
|
final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
|
||||||
new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(),
|
EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdit(key));
|
||||||
mvcc, scopes), getWALEdit(key));
|
|
||||||
log.sync(txid);
|
log.sync(txid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -674,33 +511,10 @@ public class TestWALEntryStream {
|
||||||
log.sync(txid);
|
log.sync(txid);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void appendToLogAndSync() throws IOException {
|
|
||||||
appendToLogAndSync(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void appendToLogAndSync(int count) throws IOException {
|
|
||||||
long txid = appendToLog(count);
|
|
||||||
log.sync(txid);
|
|
||||||
}
|
|
||||||
|
|
||||||
private long appendToLog(int count) throws IOException {
|
|
||||||
return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
|
|
||||||
System.currentTimeMillis(), mvcc, scopes), getWALEdits(count));
|
|
||||||
}
|
|
||||||
|
|
||||||
private WALEdit getWALEdits(int count) {
|
|
||||||
WALEdit edit = new WALEdit();
|
|
||||||
for (int i = 0; i < count; i++) {
|
|
||||||
edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier,
|
|
||||||
System.currentTimeMillis(), qualifier));
|
|
||||||
}
|
|
||||||
return edit;
|
|
||||||
}
|
|
||||||
|
|
||||||
private WALEdit getWALEdit(String row) {
|
private WALEdit getWALEdit(String row) {
|
||||||
WALEdit edit = new WALEdit();
|
WALEdit edit = new WALEdit();
|
||||||
edit.add(
|
edit.add(new KeyValue(Bytes.toBytes(row), family, qualifier,
|
||||||
new KeyValue(Bytes.toBytes(row), family, qualifier, System.currentTimeMillis(), qualifier));
|
EnvironmentEdgeManager.currentTime(), qualifier));
|
||||||
return edit;
|
return edit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -735,29 +549,18 @@ public class TestWALEntryStream {
|
||||||
throw new WALEntryFilterRetryableException("failing filter");
|
throw new WALEntryFilterRetryableException("failing filter");
|
||||||
}
|
}
|
||||||
|
|
||||||
public static int numFailures(){
|
public static int numFailures() {
|
||||||
return countFailures;
|
return countFailures;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class PathWatcher implements WALActionsListener {
|
|
||||||
|
|
||||||
Path currentPath;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void preLogRoll(Path oldPath, Path newPath) {
|
|
||||||
logQueue.enqueueLog(newPath, fakeWalGroupId);
|
|
||||||
currentPath = newPath;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
|
public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
|
||||||
appendToLog("1");
|
appendToLog("1");
|
||||||
appendToLog("2");
|
appendToLog("2");
|
||||||
long size = log.getLogFileSizeIfBeingWritten(getQueue().peek()).getAsLong();
|
long size = log.getLogFileSizeIfBeingWritten(getQueue().peek()).getAsLong();
|
||||||
AtomicLong fileLength = new AtomicLong(size - 1);
|
AtomicLong fileLength = new AtomicLong(size - 1);
|
||||||
try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, 0,
|
try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, 0,
|
||||||
p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"), fakeWalGroupId)) {
|
p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||||
assertTrue(entryStream.hasNext());
|
assertTrue(entryStream.hasNext());
|
||||||
assertNotNull(entryStream.next());
|
assertNotNull(entryStream.next());
|
||||||
|
@ -779,8 +582,8 @@ public class TestWALEntryStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Test removal of 0 length log from logQueue if the source is a recovered source and
|
* Test removal of 0 length log from logQueue if the source is a recovered source and size of
|
||||||
size of logQueue is only 1.
|
* logQueue is only 1.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testEOFExceptionForRecoveredQueue() throws Exception {
|
public void testEOFExceptionForRecoveredQueue() throws Exception {
|
||||||
|
@ -804,9 +607,8 @@ public class TestWALEntryStream {
|
||||||
doNothing().when(metrics).decrSizeOfLogQueue();
|
doNothing().when(metrics).decrSizeOfLogQueue();
|
||||||
ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
|
ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
|
||||||
localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
|
localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
|
||||||
ReplicationSourceWALReader reader =
|
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
|
||||||
new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
|
getDummyFilter(), source, fakeWalGroupId);
|
||||||
getDummyFilter(), source, fakeWalGroupId);
|
|
||||||
reader.run();
|
reader.run();
|
||||||
// ReplicationSourceWALReaderThread#handleEofException method will
|
// ReplicationSourceWALReaderThread#handleEofException method will
|
||||||
// remove empty log from logQueue.
|
// remove empty log from logQueue.
|
||||||
|
@ -820,7 +622,7 @@ public class TestWALEntryStream {
|
||||||
ReplicationSource source = mockReplicationSource(true, conf);
|
ReplicationSource source = mockReplicationSource(true, conf);
|
||||||
ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
|
ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
|
||||||
// Create a 0 length log.
|
// Create a 0 length log.
|
||||||
Path emptyLog = new Path(fs.getHomeDirectory(),"log.2");
|
Path emptyLog = new Path(fs.getHomeDirectory(), "log.2");
|
||||||
FSDataOutputStream fsdos = fs.create(emptyLog);
|
FSDataOutputStream fsdos = fs.create(emptyLog);
|
||||||
fsdos.close();
|
fsdos.close();
|
||||||
assertEquals(0, fs.getFileStatus(emptyLog).getLen());
|
assertEquals(0, fs.getFileStatus(emptyLog).getLen());
|
||||||
|
@ -834,7 +636,7 @@ public class TestWALEntryStream {
|
||||||
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
|
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
|
||||||
// Make it look like the source is from recovered source.
|
// Make it look like the source is from recovered source.
|
||||||
when(mockSourceManager.getOldSources())
|
when(mockSourceManager.getOldSources())
|
||||||
.thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface)source)));
|
.thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface) source)));
|
||||||
when(source.isPeerEnabled()).thenReturn(true);
|
when(source.isPeerEnabled()).thenReturn(true);
|
||||||
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
|
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
|
||||||
// Override the max retries multiplier to fail fast.
|
// Override the max retries multiplier to fail fast.
|
||||||
|
@ -842,11 +644,10 @@ public class TestWALEntryStream {
|
||||||
conf.setBoolean("replication.source.eof.autorecovery", true);
|
conf.setBoolean("replication.source.eof.autorecovery", true);
|
||||||
conf.setInt("replication.source.nb.batches", 10);
|
conf.setInt("replication.source.nb.batches", 10);
|
||||||
// Create a reader thread.
|
// Create a reader thread.
|
||||||
ReplicationSourceWALReader reader =
|
ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
|
||||||
new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
|
getDummyFilter(), source, fakeWalGroupId);
|
||||||
getDummyFilter(), source, fakeWalGroupId);
|
assertEquals("Initial log queue size is not correct", 2,
|
||||||
assertEquals("Initial log queue size is not correct",
|
localLogQueue.getQueueSize(fakeWalGroupId));
|
||||||
2, localLogQueue.getQueueSize(fakeWalGroupId));
|
|
||||||
reader.run();
|
reader.run();
|
||||||
|
|
||||||
// remove empty log from logQueue.
|
// remove empty log from logQueue.
|
||||||
|
@ -861,11 +662,10 @@ public class TestWALEntryStream {
|
||||||
private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
|
private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
|
||||||
for (int i = 0; i < numEntries; i++) {
|
for (int i = 0; i < numEntries; i++) {
|
||||||
byte[] b = Bytes.toBytes(Integer.toString(i));
|
byte[] b = Bytes.toBytes(Integer.toString(i));
|
||||||
KeyValue kv = new KeyValue(b,b,b);
|
KeyValue kv = new KeyValue(b, b, b);
|
||||||
WALEdit edit = new WALEdit();
|
WALEdit edit = new WALEdit();
|
||||||
edit.add(kv);
|
edit.add(kv);
|
||||||
WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0,
|
WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID);
|
||||||
HConstants.DEFAULT_CLUSTER_ID);
|
|
||||||
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
|
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
|
||||||
scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
|
scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
|
||||||
writer.append(new WAL.Entry(key, edit));
|
writer.append(new WAL.Entry(key, edit));
|
||||||
|
@ -887,8 +687,8 @@ public class TestWALEntryStream {
|
||||||
// After rolling there will be 2 wals in the queue
|
// After rolling there will be 2 wals in the queue
|
||||||
assertEquals(2, logQueue.getMetrics().getSizeOfLogQueue());
|
assertEquals(2, logQueue.getMetrics().getSizeOfLogQueue());
|
||||||
|
|
||||||
try (WALEntryStream entryStream = new WALEntryStream(
|
try (WALEntryStream entryStream =
|
||||||
logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) {
|
new WALEntryStream(logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) {
|
||||||
// There's one edit in the log, read it.
|
// There's one edit in the log, read it.
|
||||||
assertTrue(entryStream.hasNext());
|
assertTrue(entryStream.hasNext());
|
||||||
WAL.Entry entry = entryStream.next();
|
WAL.Entry entry = entryStream.next();
|
||||||
|
@ -900,13 +700,13 @@ public class TestWALEntryStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests that wals are closed cleanly and we read the trailer when we remove wal
|
* Tests that wals are closed cleanly and we read the trailer when we remove wal from
|
||||||
* from WALEntryStream.
|
* WALEntryStream.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testCleanClosedWALs() throws Exception {
|
public void testCleanClosedWALs() throws Exception {
|
||||||
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(
|
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null,
|
||||||
logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) {
|
logQueue.getMetrics(), fakeWalGroupId)) {
|
||||||
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
|
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
|
||||||
appendToLogAndSync();
|
appendToLogAndSync();
|
||||||
assertNotNull(entryStream.next());
|
assertNotNull(entryStream.next());
|
|
@ -0,0 +1,46 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
|
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
|
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TestBasicWALEntryStream with {@link AsyncFSWALProvider} as the WAL provider.
|
||||||
|
*/
|
||||||
|
@Category({ ReplicationTests.class, MediumTests.class })
|
||||||
|
public class TestBasicWALEntryStreamAsyncFSWAL extends TestBasicWALEntryStream {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestBasicWALEntryStreamAsyncFSWAL.class);
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
TEST_UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, AsyncFSWALProvider.class,
|
||||||
|
AbstractFSWALProvider.class);
|
||||||
|
startCluster();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,46 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
|
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
|
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TestBasicWALEntryStream with {@link FSHLogProvider} as the WAL provider.
|
||||||
|
*/
|
||||||
|
@Category({ ReplicationTests.class, MediumTests.class })
|
||||||
|
public class TestBasicWALEntryStreamFSHLog extends TestBasicWALEntryStream {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestBasicWALEntryStreamFSHLog.class);
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
TEST_UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class,
|
||||||
|
AbstractFSWALProvider.class);
|
||||||
|
startCluster();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,89 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runners.Parameterized.Parameter;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Try out different combinations of row count and KeyValue count
|
||||||
|
*/
|
||||||
|
public abstract class TestWALEntryStreamDifferentCounts extends WALEntryStreamTestBase {
|
||||||
|
|
||||||
|
@Parameter(0)
|
||||||
|
public int nbRows;
|
||||||
|
|
||||||
|
@Parameter(1)
|
||||||
|
public int walEditKVs;
|
||||||
|
|
||||||
|
@Parameter(2)
|
||||||
|
public boolean isCompressionEnabled;
|
||||||
|
|
||||||
|
@Parameters(name = "{index}: nbRows={0}, walEditKVs={1}, isCompressionEnabled={2}")
|
||||||
|
public static Iterable<Object[]> data() {
|
||||||
|
List<Object[]> params = new ArrayList<>();
|
||||||
|
for (int nbRows : new int[] { 1500, 60000 }) {
|
||||||
|
for (int walEditKVs : new int[] { 1, 100 }) {
|
||||||
|
for (boolean isCompressionEnabled : new boolean[] { false, true }) {
|
||||||
|
params.add(new Object[] { nbRows, walEditKVs, isCompressionEnabled });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return params;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
CONF.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, isCompressionEnabled);
|
||||||
|
initWAL();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDifferentCounts() throws Exception {
|
||||||
|
mvcc.advanceTo(1);
|
||||||
|
|
||||||
|
for (int i = 0; i < nbRows; i++) {
|
||||||
|
appendToLogAndSync(walEditKVs);
|
||||||
|
}
|
||||||
|
|
||||||
|
log.rollWriter();
|
||||||
|
|
||||||
|
try (WALEntryStream entryStream =
|
||||||
|
new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
|
||||||
|
int i = 0;
|
||||||
|
while (entryStream.hasNext()) {
|
||||||
|
assertNotNull(entryStream.next());
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
assertEquals(nbRows, i);
|
||||||
|
|
||||||
|
// should've read all entries
|
||||||
|
assertFalse(entryStream.hasNext());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,47 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
|
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
|
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
@Category({ ReplicationTests.class, LargeTests.class })
|
||||||
|
public class TestWALEntryStreamDifferentCountsAsyncFSWAL
|
||||||
|
extends TestWALEntryStreamDifferentCounts {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestWALEntryStreamDifferentCountsAsyncFSWAL.class);
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
TEST_UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, AsyncFSWALProvider.class,
|
||||||
|
AbstractFSWALProvider.class);
|
||||||
|
startCluster();
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
/*
|
/**
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -7,7 +7,7 @@
|
||||||
* "License"); you may not use this file except in compliance
|
* "License"); you may not use this file except in compliance
|
||||||
* with the License. You may obtain a copy of the License at
|
* with the License. You may obtain a copy of the License at
|
||||||
*
|
*
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
*
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
@ -18,7 +18,6 @@
|
||||||
package org.apache.hadoop.hbase.replication.regionserver;
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
|
@ -27,25 +26,21 @@ import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
/**
|
@RunWith(Parameterized.class)
|
||||||
* TestWALEntryStream with {@link org.apache.hadoop.hbase.wal.FSHLogProvider} as the WAL provider.
|
|
||||||
*/
|
|
||||||
@Category({ ReplicationTests.class, LargeTests.class })
|
@Category({ ReplicationTests.class, LargeTests.class })
|
||||||
public class TestFSHLogWALEntryStream extends TestWALEntryStream {
|
public class TestWALEntryStreamDifferentCountsFSHLog extends TestWALEntryStreamDifferentCounts {
|
||||||
|
|
||||||
@ClassRule
|
@ClassRule
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestFSHLogWALEntryStream.class);
|
HBaseClassTestRule.forClass(TestWALEntryStreamDifferentCountsFSHLog.class);
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
TEST_UTIL = new HBaseTestingUtility();
|
TEST_UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class,
|
||||||
CONF = TEST_UTIL.getConfiguration();
|
AbstractFSWALProvider.class);
|
||||||
CONF.setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class, AbstractFSWALProvider.class);
|
startCluster();
|
||||||
CONF.setLong("replication.source.sleepforretries", 10);
|
|
||||||
TEST_UTIL.startMiniDFSCluster(3);
|
|
||||||
cluster = TEST_UTIL.getDFSCluster();
|
|
||||||
fs = cluster.getFileSystem();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,182 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.NavigableMap;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.TableNameTestRule;
|
||||||
|
import org.apache.hadoop.hbase.Waiter;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
|
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Base class for WALEntryStream tests.
|
||||||
|
*/
|
||||||
|
public abstract class WALEntryStreamTestBase {
|
||||||
|
|
||||||
|
protected static final long TEST_TIMEOUT_MS = 5000;
|
||||||
|
protected static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();;
|
||||||
|
protected static Configuration CONF;
|
||||||
|
protected static FileSystem fs;
|
||||||
|
protected static MiniDFSCluster cluster;
|
||||||
|
protected static final TableName tableName = TableName.valueOf("tablename");
|
||||||
|
protected static final byte[] family = Bytes.toBytes("column");
|
||||||
|
protected static final byte[] qualifier = Bytes.toBytes("qualifier");
|
||||||
|
protected static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName)
|
||||||
|
.setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build();
|
||||||
|
protected static final NavigableMap<byte[], Integer> scopes = getScopes();
|
||||||
|
protected final String fakeWalGroupId = "fake-wal-group-id";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test helper that waits until a non-null entry is available in the stream next or times out. A
|
||||||
|
* {@link WALEntryStream} provides a streaming access to a queue of log files. Since the stream
|
||||||
|
* can be consumed as the file is being written, callers relying on {@link WALEntryStream#next()}
|
||||||
|
* may need to retry multiple times before an entry appended to the WAL is visible to the stream
|
||||||
|
* consumers. One such cause of delay is the close() of writer writing these log files. While the
|
||||||
|
* closure is in progress, the stream does not switch to the next log in the queue and next() may
|
||||||
|
* return null entries. This utility wraps these retries into a single next call and that makes
|
||||||
|
* the test code simpler.
|
||||||
|
*/
|
||||||
|
protected static class WALEntryStreamWithRetries extends WALEntryStream {
|
||||||
|
// Class member to be able to set a non-final from within a lambda.
|
||||||
|
private Entry result;
|
||||||
|
|
||||||
|
public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, Configuration conf,
|
||||||
|
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
|
||||||
|
MetricsSource metrics, String walGroupId) throws IOException {
|
||||||
|
super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, walGroupId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Entry next() {
|
||||||
|
Waiter.waitFor(CONF, TEST_TIMEOUT_MS, () -> {
|
||||||
|
result = WALEntryStreamWithRetries.super.next();
|
||||||
|
return result != null;
|
||||||
|
});
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static NavigableMap<byte[], Integer> getScopes() {
|
||||||
|
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
|
scopes.put(family, 1);
|
||||||
|
return scopes;
|
||||||
|
}
|
||||||
|
|
||||||
|
class PathWatcher implements WALActionsListener {
|
||||||
|
|
||||||
|
Path currentPath;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preLogRoll(Path oldPath, Path newPath) {
|
||||||
|
logQueue.enqueueLog(newPath, fakeWalGroupId);
|
||||||
|
currentPath = newPath;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected WAL log;
|
||||||
|
protected ReplicationSourceLogQueue logQueue;
|
||||||
|
protected PathWatcher pathWatcher;
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName tn = new TestName();
|
||||||
|
protected final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
|
||||||
|
|
||||||
|
protected static void startCluster() throws Exception {
|
||||||
|
TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
CONF = TEST_UTIL.getConfiguration();
|
||||||
|
CONF.setLong("replication.source.sleepforretries", 10);
|
||||||
|
TEST_UTIL.startMiniDFSCluster(3);
|
||||||
|
|
||||||
|
cluster = TEST_UTIL.getDFSCluster();
|
||||||
|
fs = cluster.getFileSystem();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void initWAL() throws IOException {
|
||||||
|
ReplicationSource source = mock(ReplicationSource.class);
|
||||||
|
MetricsSource metricsSource = new MetricsSource("2");
|
||||||
|
// Source with the same id is shared and carries values from the last run
|
||||||
|
metricsSource.clear();
|
||||||
|
logQueue = new ReplicationSourceLogQueue(CONF, metricsSource, source);
|
||||||
|
pathWatcher = new PathWatcher();
|
||||||
|
final WALFactory wals =
|
||||||
|
new WALFactory(CONF, TableNameTestRule.cleanUpTestName(tn.getMethodName()));
|
||||||
|
wals.getWALProvider().addWALActionsListener(pathWatcher);
|
||||||
|
log = wals.getWAL(info);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
Closeables.close(log, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void appendToLogAndSync() throws IOException {
|
||||||
|
appendToLogAndSync(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void appendToLogAndSync(int count) throws IOException {
|
||||||
|
long txid = appendToLog(count);
|
||||||
|
log.sync(txid);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected long appendToLog(int count) throws IOException {
|
||||||
|
return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
|
||||||
|
EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdits(count));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected WALEdit getWALEdits(int count) {
|
||||||
|
WALEdit edit = new WALEdit();
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
edit.add(new KeyValue(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), family, qualifier,
|
||||||
|
EnvironmentEdgeManager.currentTime(), qualifier));
|
||||||
|
}
|
||||||
|
return edit;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue