HBASE-26020 Split TestWALEntryStream.testDifferentCounts out (#3409)

Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
Duo Zhang 2021-06-23 22:46:07 +08:00 committed by GitHub
parent cb247f9464
commit 39d143f290
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 491 additions and 288 deletions

View File

@ -39,7 +39,7 @@ public class TableNameTestRule extends TestWatcher {
* This helper strips out the parameter suffixes. * This helper strips out the parameter suffixes.
* @return current test method name with out parameterized suffixes. * @return current test method name with out parameterized suffixes.
*/ */
private static String cleanUpTestName(String methodName) { public static String cleanUpTestName(String methodName) {
int index = methodName.indexOf('['); int index = methodName.indexOf('[');
if (index == -1) { if (index == -1) {
return methodName; return methodName;

View File

@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -45,27 +46,16 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
@ -74,158 +64,18 @@ import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@Category({ ReplicationTests.class, LargeTests.class }) public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
public class TestWALEntryStream {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALEntryStream.class);
private static final long TEST_TIMEOUT_MS = 5000;
protected static HBaseTestingUtility TEST_UTIL;
protected static Configuration CONF;
protected static FileSystem fs;
protected static MiniDFSCluster cluster;
private static final TableName tableName = TableName.valueOf("tablename");
private static final byte[] family = Bytes.toBytes("column");
private static final byte[] qualifier = Bytes.toBytes("qualifier");
private static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build();
private static final NavigableMap<byte[], Integer> scopes = getScopes();
private final String fakeWalGroupId = "fake-wal-group-id";
/**
* Test helper that waits until a non-null entry is available in the stream next or times out.
* A {@link WALEntryStream} provides a streaming access to a queue of log files. Since the stream
* can be consumed as the file is being written, callers relying on {@link WALEntryStream#next()}
* may need to retry multiple times before an entry appended to the WAL is visible to the stream
* consumers. One such cause of delay is the close() of writer writing these log files. While the
* closure is in progress, the stream does not switch to the next log in the queue and next() may
* return null entries. This utility wraps these retries into a single next call and that makes
* the test code simpler.
*/
private static class WALEntryStreamWithRetries extends WALEntryStream {
// Class member to be able to set a non-final from within a lambda.
private Entry result;
public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, Configuration conf,
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
MetricsSource metrics, String walGroupId) throws IOException {
super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, walGroupId);
}
@Override
public Entry next() {
Waiter.waitFor(CONF, TEST_TIMEOUT_MS, () -> (
result = WALEntryStreamWithRetries.super.next()) != null);
return result;
}
}
private static NavigableMap<byte[], Integer> getScopes() {
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
scopes.put(family, 1);
return scopes;
}
private WAL log;
ReplicationSourceLogQueue logQueue;
private PathWatcher pathWatcher;
@Rule
public TestName tn = new TestName();
private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
CONF = TEST_UTIL.getConfiguration();
CONF.setLong("replication.source.sleepforretries", 10);
TEST_UTIL.startMiniDFSCluster(3);
cluster = TEST_UTIL.getDFSCluster();
fs = cluster.getFileSystem();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
ReplicationSource source = mock(ReplicationSource.class); initWAL();
MetricsSource metricsSource = new MetricsSource("2");
// Source with the same id is shared and carries values from the last run
metricsSource.clear();
logQueue = new ReplicationSourceLogQueue(CONF, metricsSource, source);
pathWatcher = new PathWatcher();
final WALFactory wals = new WALFactory(CONF, tn.getMethodName());
wals.getWALProvider().addWALActionsListener(pathWatcher);
log = wals.getWAL(info);
}
@After
public void tearDown() throws Exception {
if (log != null) {
log.close();
}
}
// Try out different combinations of row count and KeyValue count
@Test
public void testDifferentCounts() throws Exception {
int[] NB_ROWS = { 1500, 60000 };
int[] NB_KVS = { 1, 100 };
// whether compression is used
Boolean[] BOOL_VALS = { false, true };
// long lastPosition = 0;
for (int nbRows : NB_ROWS) {
for (int walEditKVs : NB_KVS) {
for (boolean isCompressionEnabled : BOOL_VALS) {
TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION,
isCompressionEnabled);
mvcc.advanceTo(1);
for (int i = 0; i < nbRows; i++) {
appendToLogAndSync(walEditKVs);
}
log.rollWriter();
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, CONF, 0, log, null,
new MetricsSource("1"), fakeWalGroupId)) {
int i = 0;
while (entryStream.hasNext()) {
assertNotNull(entryStream.next());
i++;
}
assertEquals(nbRows, i);
// should've read all entries
assertFalse(entryStream.hasNext());
}
// reset everything for next loop
log.close();
setUp();
}
}
}
} }
/** /**
@ -250,8 +100,8 @@ public class TestWALEntryStream {
appendToLogAndSync(); appendToLogAndSync();
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, log,
log, null, new MetricsSource("1"), fakeWalGroupId)) { null, new MetricsSource("1"), fakeWalGroupId)) {
// Read the newly added entry, make sure we made progress // Read the newly added entry, make sure we made progress
WAL.Entry entry = entryStream.next(); WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition()); assertNotEquals(oldPos, entryStream.getPosition());
@ -264,8 +114,8 @@ public class TestWALEntryStream {
log.rollWriter(); log.rollWriter();
appendToLogAndSync(); appendToLogAndSync();
try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, log,
log, null, new MetricsSource("1"), fakeWalGroupId)) { null, new MetricsSource("1"), fakeWalGroupId)) {
WAL.Entry entry = entryStream.next(); WAL.Entry entry = entryStream.next();
assertNotEquals(oldPos, entryStream.getPosition()); assertNotEquals(oldPos, entryStream.getPosition());
assertNotNull(entry); assertNotNull(entry);
@ -289,8 +139,7 @@ public class TestWALEntryStream {
public void testLogrollWhileStreaming() throws Exception { public void testLogrollWhileStreaming() throws Exception {
appendToLog("1"); appendToLog("1");
appendToLog("2");// 2 appendToLog("2");// 2
try (WALEntryStream entryStream = try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null,
new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null,
new MetricsSource("1"), fakeWalGroupId)) { new MetricsSource("1"), fakeWalGroupId)) {
assertEquals("1", getRow(entryStream.next())); assertEquals("1", getRow(entryStream.next()));
@ -317,8 +166,7 @@ public class TestWALEntryStream {
public void testNewEntriesWhileStreaming() throws Exception { public void testNewEntriesWhileStreaming() throws Exception {
appendToLog("1"); appendToLog("1");
try (WALEntryStream entryStream = try (WALEntryStream entryStream =
new WALEntryStream(logQueue, CONF, 0, log, null, new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
new MetricsSource("1"), fakeWalGroupId)) {
entryStream.next(); // we've hit the end of the stream at this point entryStream.next(); // we've hit the end of the stream at this point
// some new entries come in while we're streaming // some new entries come in while we're streaming
@ -341,16 +189,14 @@ public class TestWALEntryStream {
long lastPosition = 0; long lastPosition = 0;
appendToLog("1"); appendToLog("1");
try (WALEntryStream entryStream = try (WALEntryStream entryStream =
new WALEntryStream(logQueue, CONF, 0, log, null, new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
new MetricsSource("1"), fakeWalGroupId)) {
entryStream.next(); // we've hit the end of the stream at this point entryStream.next(); // we've hit the end of the stream at this point
appendToLog("2"); appendToLog("2");
appendToLog("3"); appendToLog("3");
lastPosition = entryStream.getPosition(); lastPosition = entryStream.getPosition();
} }
// next stream should picks up where we left off // next stream should picks up where we left off
try (WALEntryStream entryStream = try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null,
new WALEntryStream(logQueue, CONF, lastPosition, log, null,
new MetricsSource("1"), fakeWalGroupId)) { new MetricsSource("1"), fakeWalGroupId)) {
assertEquals("2", getRow(entryStream.next())); assertEquals("2", getRow(entryStream.next()));
assertEquals("3", getRow(entryStream.next())); assertEquals("3", getRow(entryStream.next()));
@ -369,14 +215,13 @@ public class TestWALEntryStream {
long lastPosition = 0; long lastPosition = 0;
appendEntriesToLogAndSync(3); appendEntriesToLogAndSync(3);
// read only one element // read only one element
try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null,
log, null, new MetricsSource("1"), fakeWalGroupId)) { new MetricsSource("1"), fakeWalGroupId)) {
entryStream.next(); entryStream.next();
lastPosition = entryStream.getPosition(); lastPosition = entryStream.getPosition();
} }
// there should still be two more entries from where we left off // there should still be two more entries from where we left off
try (WALEntryStream entryStream = try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null,
new WALEntryStream(logQueue, CONF, lastPosition, log, null,
new MetricsSource("1"), fakeWalGroupId)) { new MetricsSource("1"), fakeWalGroupId)) {
assertNotNull(entryStream.next()); assertNotNull(entryStream.next());
assertNotNull(entryStream.next()); assertNotNull(entryStream.next());
@ -384,12 +229,10 @@ public class TestWALEntryStream {
} }
} }
@Test @Test
public void testEmptyStream() throws Exception { public void testEmptyStream() throws Exception {
try (WALEntryStream entryStream = try (WALEntryStream entryStream =
new WALEntryStream(logQueue, CONF, 0, log, null, new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
new MetricsSource("1"), fakeWalGroupId)) {
assertFalse(entryStream.hasNext()); assertFalse(entryStream.hasNext());
} }
} }
@ -399,9 +242,9 @@ public class TestWALEntryStream {
Map<String, byte[]> attributes = new HashMap<String, byte[]>(); Map<String, byte[]> attributes = new HashMap<String, byte[]>();
attributes.put("foo", Bytes.toBytes("foo-value")); attributes.put("foo", Bytes.toBytes("foo-value"));
attributes.put("bar", Bytes.toBytes("bar-value")); attributes.put("bar", Bytes.toBytes("bar-value"));
WALKeyImpl key = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, WALKeyImpl key =
EnvironmentEdgeManager.currentTime(), new ArrayList<UUID>(), 0L, 0L, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, EnvironmentEdgeManager.currentTime(),
mvcc, scopes, attributes); new ArrayList<UUID>(), 0L, 0L, mvcc, scopes, attributes);
Assert.assertEquals(attributes, key.getExtendedAttributes()); Assert.assertEquals(attributes, key.getExtendedAttributes());
WALProtos.WALKey.Builder builder = key.getBuilder(WALCellCodec.getNoneCompressor()); WALProtos.WALKey.Builder builder = key.getBuilder(WALCellCodec.getNoneCompressor());
@ -410,12 +253,12 @@ public class TestWALEntryStream {
WALKeyImpl deserializedKey = new WALKeyImpl(); WALKeyImpl deserializedKey = new WALKeyImpl();
deserializedKey.readFieldsFromPb(serializedKey, WALCellCodec.getNoneUncompressor()); deserializedKey.readFieldsFromPb(serializedKey, WALCellCodec.getNoneUncompressor());
//equals() only checks region name, sequence id and write time // equals() only checks region name, sequence id and write time
Assert.assertEquals(key, deserializedKey); Assert.assertEquals(key, deserializedKey);
//can't use Map.equals() because byte arrays use reference equality // can't use Map.equals() because byte arrays use reference equality
Assert.assertEquals(key.getExtendedAttributes().keySet(), Assert.assertEquals(key.getExtendedAttributes().keySet(),
deserializedKey.getExtendedAttributes().keySet()); deserializedKey.getExtendedAttributes().keySet());
for (Map.Entry<String, byte[]> entry : deserializedKey.getExtendedAttributes().entrySet()){ for (Map.Entry<String, byte[]> entry : deserializedKey.getExtendedAttributes().entrySet()) {
Assert.assertArrayEquals(key.getExtendedAttribute(entry.getKey()), entry.getValue()); Assert.assertArrayEquals(key.getExtendedAttribute(entry.getKey()), entry.getValue());
} }
Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes()); Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes());
@ -424,8 +267,8 @@ public class TestWALEntryStream {
private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) { private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
when(mockSourceManager.getTotalBufferLimit()).thenReturn( when(mockSourceManager.getTotalBufferLimit())
(long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT);
Server mockServer = Mockito.mock(Server.class); Server mockServer = Mockito.mock(Server.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class); ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceManager()).thenReturn(mockSourceManager); when(source.getSourceManager()).thenReturn(mockSourceManager);
@ -433,8 +276,8 @@ public class TestWALEntryStream {
when(source.getWALFileLengthProvider()).thenReturn(log); when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer); when(source.getServer()).thenReturn(mockServer);
when(source.isRecovered()).thenReturn(recovered); when(source.isRecovered()).thenReturn(recovered);
MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock( MetricsReplicationGlobalSourceSource globalMetrics =
MetricsReplicationGlobalSourceSource.class); Mockito.mock(MetricsReplicationGlobalSourceSource.class);
when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics); when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
return source; return source;
} }
@ -442,9 +285,8 @@ public class TestWALEntryStream {
private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) { private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
ReplicationSource source = mockReplicationSource(recovered, conf); ReplicationSource source = mockReplicationSource(recovered, conf);
when(source.isPeerEnabled()).thenReturn(true); when(source.isPeerEnabled()).thenReturn(true);
ReplicationSourceWALReader reader = ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0,
new ReplicationSourceWALReader(fs, conf, logQueue, 0, getDummyFilter(), source, getDummyFilter(), source, fakeWalGroupId);
fakeWalGroupId);
reader.start(); reader.start();
return reader; return reader;
} }
@ -453,8 +295,7 @@ public class TestWALEntryStream {
Configuration conf) { Configuration conf) {
ReplicationSource source = mockReplicationSource(false, conf); ReplicationSource source = mockReplicationSource(false, conf);
when(source.isPeerEnabled()).thenReturn(true); when(source.isPeerEnabled()).thenReturn(true);
ReplicationSourceWALReader reader = ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0,
new ReplicationSourceWALReader(fs, conf, logQueue, 0,
getIntermittentFailingFilter(numFailures), source, fakeWalGroupId); getIntermittentFailingFilter(numFailures), source, fakeWalGroupId);
reader.start(); reader.start();
return reader; return reader;
@ -466,8 +307,7 @@ public class TestWALEntryStream {
// get ending position // get ending position
long position; long position;
try (WALEntryStream entryStream = try (WALEntryStream entryStream =
new WALEntryStream(logQueue, CONF, 0, log, null, new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
new MetricsSource("1"), fakeWalGroupId)) {
entryStream.next(); entryStream.next();
entryStream.next(); entryStream.next();
entryStream.next(); entryStream.next();
@ -498,8 +338,7 @@ public class TestWALEntryStream {
// get ending position // get ending position
long position; long position;
try (WALEntryStream entryStream = try (WALEntryStream entryStream =
new WALEntryStream(logQueue, CONF, 0, log, null, new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
new MetricsSource("1"), fakeWalGroupId)) {
entryStream.next(); entryStream.next();
entryStream.next(); entryStream.next();
entryStream.next(); entryStream.next();
@ -509,8 +348,8 @@ public class TestWALEntryStream {
// start up a reader // start up a reader
Path walPath = getQueue().peek(); Path walPath = getQueue().peek();
int numFailuresInFilter = 5; int numFailuresInFilter = 5;
ReplicationSourceWALReader reader = createReaderWithBadReplicationFilter( ReplicationSourceWALReader reader =
numFailuresInFilter, CONF); createReaderWithBadReplicationFilter(numFailuresInFilter, CONF);
WALEntryBatch entryBatch = reader.take(); WALEntryBatch entryBatch = reader.take();
assertEquals(numFailuresInFilter, FailingWALEntryFilter.numFailures()); assertEquals(numFailuresInFilter, FailingWALEntryFilter.numFailures());
@ -566,7 +405,7 @@ public class TestWALEntryStream {
@Override @Override
public boolean evaluate() throws Exception { public boolean evaluate() throws Exception {
return fs.getFileStatus(walPath).getLen() > 0 && return fs.getFileStatus(walPath).getLen() > 0 &&
((AbstractFSWAL) log).getInflightWALCloseCount() == 0; ((AbstractFSWAL<?>) log).getInflightWALCloseCount() == 0;
} }
@Override @Override
@ -614,8 +453,7 @@ public class TestWALEntryStream {
// get ending position // get ending position
long position; long position;
try (WALEntryStream entryStream = try (WALEntryStream entryStream =
new WALEntryStream(logQueue, CONF, 0, log, null, new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
new MetricsSource("1"), fakeWalGroupId)) {
entryStream.next(); entryStream.next();
entryStream.next(); entryStream.next();
entryStream.next(); entryStream.next();
@ -632,9 +470,8 @@ public class TestWALEntryStream {
return enabled.get(); return enabled.get();
}); });
ReplicationSourceWALReader reader = ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, CONF, logQueue, 0,
new ReplicationSourceWALReader(fs, CONF, logQueue, 0, getDummyFilter(), getDummyFilter(), source, fakeWalGroupId);
source, fakeWalGroupId);
reader.start(); reader.start();
Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> { Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> {
return reader.take(); return reader.take();
@ -661,8 +498,7 @@ public class TestWALEntryStream {
} }
private void appendToLog(String key) throws IOException { private void appendToLog(String key) throws IOException {
final long txid = log.appendData(info, final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdit(key)); EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdit(key));
log.sync(txid); log.sync(txid);
} }
@ -675,34 +511,10 @@ public class TestWALEntryStream {
log.sync(txid); log.sync(txid);
} }
private void appendToLogAndSync() throws IOException {
appendToLogAndSync(1);
}
private void appendToLogAndSync(int count) throws IOException {
long txid = appendToLog(count);
log.sync(txid);
}
private long appendToLog(int count) throws IOException {
return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdits(count));
}
private WALEdit getWALEdits(int count) {
WALEdit edit = new WALEdit();
for (int i = 0; i < count; i++) {
edit.add(new KeyValue(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), family,
qualifier, EnvironmentEdgeManager.currentTime(), qualifier));
}
return edit;
}
private WALEdit getWALEdit(String row) { private WALEdit getWALEdit(String row) {
WALEdit edit = new WALEdit(); WALEdit edit = new WALEdit();
edit.add( edit.add(new KeyValue(Bytes.toBytes(row), family, qualifier,
new KeyValue(Bytes.toBytes(row), family, qualifier, EnvironmentEdgeManager.currentTime(), EnvironmentEdgeManager.currentTime(), qualifier));
qualifier));
return edit; return edit;
} }
@ -737,22 +549,11 @@ public class TestWALEntryStream {
throw new WALEntryFilterRetryableException("failing filter"); throw new WALEntryFilterRetryableException("failing filter");
} }
public static int numFailures(){ public static int numFailures() {
return countFailures; return countFailures;
} }
} }
class PathWatcher implements WALActionsListener {
Path currentPath;
@Override
public void preLogRoll(Path oldPath, Path newPath) {
logQueue.enqueueLog(newPath, fakeWalGroupId);
currentPath = newPath;
}
}
@Test @Test
public void testReadBeyondCommittedLength() throws IOException, InterruptedException { public void testReadBeyondCommittedLength() throws IOException, InterruptedException {
appendToLog("1"); appendToLog("1");
@ -781,8 +582,8 @@ public class TestWALEntryStream {
} }
/* /*
Test removal of 0 length log from logQueue if the source is a recovered source and * Test removal of 0 length log from logQueue if the source is a recovered source and size of
size of logQueue is only 1. * logQueue is only 1.
*/ */
@Test @Test
public void testEOFExceptionForRecoveredQueue() throws Exception { public void testEOFExceptionForRecoveredQueue() throws Exception {
@ -806,8 +607,7 @@ public class TestWALEntryStream {
doNothing().when(metrics).decrSizeOfLogQueue(); doNothing().when(metrics).decrSizeOfLogQueue();
ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source); ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
localLogQueue.enqueueLog(emptyLog, fakeWalGroupId); localLogQueue.enqueueLog(emptyLog, fakeWalGroupId);
ReplicationSourceWALReader reader = ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
getDummyFilter(), source, fakeWalGroupId); getDummyFilter(), source, fakeWalGroupId);
reader.run(); reader.run();
// ReplicationSourceWALReaderThread#handleEofException method will // ReplicationSourceWALReaderThread#handleEofException method will
@ -822,7 +622,7 @@ public class TestWALEntryStream {
ReplicationSource source = mockReplicationSource(true, conf); ReplicationSource source = mockReplicationSource(true, conf);
ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source); ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source);
// Create a 0 length log. // Create a 0 length log.
Path emptyLog = new Path(fs.getHomeDirectory(),"log.2"); Path emptyLog = new Path(fs.getHomeDirectory(), "log.2");
FSDataOutputStream fsdos = fs.create(emptyLog); FSDataOutputStream fsdos = fs.create(emptyLog);
fsdos.close(); fsdos.close();
assertEquals(0, fs.getFileStatus(emptyLog).getLen()); assertEquals(0, fs.getFileStatus(emptyLog).getLen());
@ -836,7 +636,7 @@ public class TestWALEntryStream {
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
// Make it look like the source is from recovered source. // Make it look like the source is from recovered source.
when(mockSourceManager.getOldSources()) when(mockSourceManager.getOldSources())
.thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface)source))); .thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface) source)));
when(source.isPeerEnabled()).thenReturn(true); when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
// Override the max retries multiplier to fail fast. // Override the max retries multiplier to fail fast.
@ -844,11 +644,10 @@ public class TestWALEntryStream {
conf.setBoolean("replication.source.eof.autorecovery", true); conf.setBoolean("replication.source.eof.autorecovery", true);
conf.setInt("replication.source.nb.batches", 10); conf.setInt("replication.source.nb.batches", 10);
// Create a reader thread. // Create a reader thread.
ReplicationSourceWALReader reader = ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
new ReplicationSourceWALReader(fs, conf, localLogQueue, 0,
getDummyFilter(), source, fakeWalGroupId); getDummyFilter(), source, fakeWalGroupId);
assertEquals("Initial log queue size is not correct", assertEquals("Initial log queue size is not correct", 2,
2, localLogQueue.getQueueSize(fakeWalGroupId)); localLogQueue.getQueueSize(fakeWalGroupId));
reader.run(); reader.run();
// remove empty log from logQueue. // remove empty log from logQueue.
@ -863,11 +662,10 @@ public class TestWALEntryStream {
private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException { private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
for (int i = 0; i < numEntries; i++) { for (int i = 0; i < numEntries; i++) {
byte[] b = Bytes.toBytes(Integer.toString(i)); byte[] b = Bytes.toBytes(Integer.toString(i));
KeyValue kv = new KeyValue(b,b,b); KeyValue kv = new KeyValue(b, b, b);
WALEdit edit = new WALEdit(); WALEdit edit = new WALEdit();
edit.add(kv); edit.add(kv);
WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID);
HConstants.DEFAULT_CLUSTER_ID);
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL); scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
writer.append(new WAL.Entry(key, edit)); writer.append(new WAL.Entry(key, edit));
@ -889,8 +687,8 @@ public class TestWALEntryStream {
// After rolling there will be 2 wals in the queue // After rolling there will be 2 wals in the queue
assertEquals(2, logQueue.getMetrics().getSizeOfLogQueue()); assertEquals(2, logQueue.getMetrics().getSizeOfLogQueue());
try (WALEntryStream entryStream = new WALEntryStream( try (WALEntryStream entryStream =
logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) { new WALEntryStream(logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) {
// There's one edit in the log, read it. // There's one edit in the log, read it.
assertTrue(entryStream.hasNext()); assertTrue(entryStream.hasNext());
WAL.Entry entry = entryStream.next(); WAL.Entry entry = entryStream.next();
@ -902,13 +700,13 @@ public class TestWALEntryStream {
} }
/** /**
* Tests that wals are closed cleanly and we read the trailer when we remove wal * Tests that wals are closed cleanly and we read the trailer when we remove wal from
* from WALEntryStream. * WALEntryStream.
*/ */
@Test @Test
public void testCleanClosedWALs() throws Exception { public void testCleanClosedWALs() throws Exception {
try (WALEntryStream entryStream = new WALEntryStreamWithRetries( try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null,
logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) { logQueue.getMetrics(), fakeWalGroupId)) {
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs()); assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
appendToLogAndSync(); appendToLogAndSync();
assertNotNull(entryStream.next()); assertNotNull(entryStream.next());

View File

@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
/**
* TestBasicWALEntryStream with {@link AsyncFSWALProvider} as the WAL provider.
*/
@Category({ ReplicationTests.class, MediumTests.class })
public class TestBasicWALEntryStreamAsyncFSWAL extends TestBasicWALEntryStream {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBasicWALEntryStreamAsyncFSWAL.class);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, AsyncFSWALProvider.class,
AbstractFSWALProvider.class);
startCluster();
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
/**
* TestBasicWALEntryStream with {@link FSHLogProvider} as the WAL provider.
*/
@Category({ ReplicationTests.class, MediumTests.class })
public class TestBasicWALEntryStreamFSHLog extends TestBasicWALEntryStream {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBasicWALEntryStreamFSHLog.class);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class,
AbstractFSWALProvider.class);
startCluster();
}
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HConstants;
import org.junit.Before;
import org.junit.Test;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
/**
* Try out different combinations of row count and KeyValue count
*/
public abstract class TestWALEntryStreamDifferentCounts extends WALEntryStreamTestBase {
@Parameter(0)
public int nbRows;
@Parameter(1)
public int walEditKVs;
@Parameter(2)
public boolean isCompressionEnabled;
@Parameters(name = "{index}: nbRows={0}, walEditKVs={1}, isCompressionEnabled={2}")
public static Iterable<Object[]> data() {
List<Object[]> params = new ArrayList<>();
for (int nbRows : new int[] { 1500, 60000 }) {
for (int walEditKVs : new int[] { 1, 100 }) {
for (boolean isCompressionEnabled : new boolean[] { false, true }) {
params.add(new Object[] { nbRows, walEditKVs, isCompressionEnabled });
}
}
}
return params;
}
@Before
public void setUp() throws IOException {
CONF.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, isCompressionEnabled);
initWAL();
}
@Test
public void testDifferentCounts() throws Exception {
mvcc.advanceTo(1);
for (int i = 0; i < nbRows; i++) {
appendToLogAndSync(walEditKVs);
}
log.rollWriter();
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
int i = 0;
while (entryStream.hasNext()) {
assertNotNull(entryStream.next());
i++;
}
assertEquals(nbRows, i);
// should've read all entries
assertFalse(entryStream.hasNext());
}
}
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
@Category({ ReplicationTests.class, LargeTests.class })
public class TestWALEntryStreamDifferentCountsAsyncFSWAL
extends TestWALEntryStreamDifferentCounts {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALEntryStreamDifferentCountsAsyncFSWAL.class);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, AsyncFSWALProvider.class,
AbstractFSWALProvider.class);
startCluster();
}
}

View File

@ -1,4 +1,4 @@
/* /**
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
@ -27,25 +26,21 @@ import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/** @RunWith(Parameterized.class)
* TestWALEntryStream with {@link org.apache.hadoop.hbase.wal.FSHLogProvider} as the WAL provider.
*/
@Category({ ReplicationTests.class, LargeTests.class }) @Category({ ReplicationTests.class, LargeTests.class })
public class TestFSHLogWALEntryStream extends TestWALEntryStream { public class TestWALEntryStreamDifferentCountsFSHLog extends TestWALEntryStreamDifferentCounts {
@ClassRule @ClassRule
public static final HBaseClassTestRule CLASS_RULE = public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestFSHLogWALEntryStream.class); HBaseClassTestRule.forClass(TestWALEntryStreamDifferentCountsFSHLog.class);
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
TEST_UTIL = new HBaseTestingUtility(); TEST_UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class,
CONF = TEST_UTIL.getConfiguration(); AbstractFSWALProvider.class);
CONF.setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class, AbstractFSWALProvider.class); startCluster();
CONF.setLong("replication.source.sleepforretries", 10);
TEST_UTIL.startMiniDFSCluster(3);
cluster = TEST_UTIL.getDFSCluster();
fs = cluster.getFileSystem();
} }
} }

View File

@ -0,0 +1,182 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNameTestRule;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
/**
* Base class for WALEntryStream tests.
*/
public abstract class WALEntryStreamTestBase {
protected static final long TEST_TIMEOUT_MS = 5000;
protected static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();;
protected static Configuration CONF;
protected static FileSystem fs;
protected static MiniDFSCluster cluster;
protected static final TableName tableName = TableName.valueOf("tablename");
protected static final byte[] family = Bytes.toBytes("column");
protected static final byte[] qualifier = Bytes.toBytes("qualifier");
protected static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName)
.setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build();
protected static final NavigableMap<byte[], Integer> scopes = getScopes();
protected final String fakeWalGroupId = "fake-wal-group-id";
/**
* Test helper that waits until a non-null entry is available in the stream next or times out. A
* {@link WALEntryStream} provides a streaming access to a queue of log files. Since the stream
* can be consumed as the file is being written, callers relying on {@link WALEntryStream#next()}
* may need to retry multiple times before an entry appended to the WAL is visible to the stream
* consumers. One such cause of delay is the close() of writer writing these log files. While the
* closure is in progress, the stream does not switch to the next log in the queue and next() may
* return null entries. This utility wraps these retries into a single next call and that makes
* the test code simpler.
*/
protected static class WALEntryStreamWithRetries extends WALEntryStream {
// Class member to be able to set a non-final from within a lambda.
private Entry result;
public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, Configuration conf,
long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName,
MetricsSource metrics, String walGroupId) throws IOException {
super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, walGroupId);
}
@Override
public Entry next() {
Waiter.waitFor(CONF, TEST_TIMEOUT_MS, () -> {
result = WALEntryStreamWithRetries.super.next();
return result != null;
});
return result;
}
}
private static NavigableMap<byte[], Integer> getScopes() {
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
scopes.put(family, 1);
return scopes;
}
class PathWatcher implements WALActionsListener {
Path currentPath;
@Override
public void preLogRoll(Path oldPath, Path newPath) {
logQueue.enqueueLog(newPath, fakeWalGroupId);
currentPath = newPath;
}
}
protected WAL log;
protected ReplicationSourceLogQueue logQueue;
protected PathWatcher pathWatcher;
@Rule
public TestName tn = new TestName();
protected final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
protected static void startCluster() throws Exception {
TEST_UTIL = new HBaseTestingUtility();
CONF = TEST_UTIL.getConfiguration();
CONF.setLong("replication.source.sleepforretries", 10);
TEST_UTIL.startMiniDFSCluster(3);
cluster = TEST_UTIL.getDFSCluster();
fs = cluster.getFileSystem();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
protected void initWAL() throws IOException {
ReplicationSource source = mock(ReplicationSource.class);
MetricsSource metricsSource = new MetricsSource("2");
// Source with the same id is shared and carries values from the last run
metricsSource.clear();
logQueue = new ReplicationSourceLogQueue(CONF, metricsSource, source);
pathWatcher = new PathWatcher();
final WALFactory wals =
new WALFactory(CONF, TableNameTestRule.cleanUpTestName(tn.getMethodName()));
wals.getWALProvider().addWALActionsListener(pathWatcher);
log = wals.getWAL(info);
}
@After
public void tearDown() throws Exception {
Closeables.close(log, true);
}
protected void appendToLogAndSync() throws IOException {
appendToLogAndSync(1);
}
protected void appendToLogAndSync(int count) throws IOException {
long txid = appendToLog(count);
log.sync(txid);
}
protected long appendToLog(int count) throws IOException {
return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName,
EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdits(count));
}
protected WALEdit getWALEdits(int count) {
WALEdit edit = new WALEdit();
for (int i = 0; i < count; i++) {
edit.add(new KeyValue(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), family, qualifier,
EnvironmentEdgeManager.currentTime(), qualifier));
}
return edit;
}
}