diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/TableNameTestRule.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/TableNameTestRule.java index ca7d446f31d..56473c993ef 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/TableNameTestRule.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/TableNameTestRule.java @@ -39,7 +39,7 @@ public class TableNameTestRule extends TestWatcher { * This helper strips out the parameter suffixes. * @return current test method name with out parameterized suffixes. */ - private static String cleanUpTestName(String methodName) { + public static String cleanUpTestName(String methodName) { int index = methodName.indexOf('['); if (index == -1) { return methodName; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java similarity index 67% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java index 3442f98e08e..ad77c9d90e6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java @@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -45,186 +46,36 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; -import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.client.RegionInfoBuilder; -import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; -import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.replication.WALEntryFilter; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.junit.After; -import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Rule; import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; import org.mockito.Mockito; + import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; -@Category({ ReplicationTests.class, LargeTests.class }) -public class TestWALEntryStream { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestWALEntryStream.class); - - private static final long TEST_TIMEOUT_MS = 5000; - protected static HBaseTestingUtility TEST_UTIL; - protected static Configuration CONF; - protected static FileSystem fs; - protected static MiniDFSCluster cluster; - private static final TableName tableName = TableName.valueOf("tablename"); - private static final byte[] family = Bytes.toBytes("column"); - private static final byte[] qualifier = Bytes.toBytes("qualifier"); - private static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName) - .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build(); - private static final NavigableMap scopes = getScopes(); - private final String fakeWalGroupId = "fake-wal-group-id"; - - /** - * Test helper that waits until a non-null entry is available in the stream next or times out. - * A {@link WALEntryStream} provides a streaming access to a queue of log files. Since the stream - * can be consumed as the file is being written, callers relying on {@link WALEntryStream#next()} - * may need to retry multiple times before an entry appended to the WAL is visible to the stream - * consumers. One such cause of delay is the close() of writer writing these log files. While the - * closure is in progress, the stream does not switch to the next log in the queue and next() may - * return null entries. This utility wraps these retries into a single next call and that makes - * the test code simpler. - */ - private static class WALEntryStreamWithRetries extends WALEntryStream { - // Class member to be able to set a non-final from within a lambda. - private Entry result; - - public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, Configuration conf, - long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, - MetricsSource metrics, String walGroupId) throws IOException { - super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, walGroupId); - } - - @Override - public Entry next() { - Waiter.waitFor(CONF, TEST_TIMEOUT_MS, () -> ( - result = WALEntryStreamWithRetries.super.next()) != null); - return result; - } - } - - private static NavigableMap getScopes() { - NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); - scopes.put(family, 1); - return scopes; - } - - private WAL log; - ReplicationSourceLogQueue logQueue; - private PathWatcher pathWatcher; - - @Rule - public TestName tn = new TestName(); - private final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - TEST_UTIL = new HBaseTestingUtility(); - CONF = TEST_UTIL.getConfiguration(); - CONF.setLong("replication.source.sleepforretries", 10); - TEST_UTIL.startMiniDFSCluster(3); - - cluster = TEST_UTIL.getDFSCluster(); - fs = cluster.getFileSystem(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } +public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase { @Before public void setUp() throws Exception { - ReplicationSource source = mock(ReplicationSource.class); - MetricsSource metricsSource = new MetricsSource("2"); - // Source with the same id is shared and carries values from the last run - metricsSource.clear(); - logQueue = new ReplicationSourceLogQueue(CONF, metricsSource, source); - pathWatcher = new PathWatcher(); - final WALFactory wals = new WALFactory(CONF, tn.getMethodName()); - wals.getWALProvider().addWALActionsListener(pathWatcher); - log = wals.getWAL(info); - } - - @After - public void tearDown() throws Exception { - if (log != null) { - log.close(); - } - } - - // Try out different combinations of row count and KeyValue count - @Test - public void testDifferentCounts() throws Exception { - int[] NB_ROWS = { 1500, 60000 }; - int[] NB_KVS = { 1, 100 }; - // whether compression is used - Boolean[] BOOL_VALS = { false, true }; - // long lastPosition = 0; - for (int nbRows : NB_ROWS) { - for (int walEditKVs : NB_KVS) { - for (boolean isCompressionEnabled : BOOL_VALS) { - TEST_UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION, - isCompressionEnabled); - mvcc.advanceTo(1); - - for (int i = 0; i < nbRows; i++) { - appendToLogAndSync(walEditKVs); - } - - log.rollWriter(); - - try (WALEntryStream entryStream = - new WALEntryStream(logQueue, CONF, 0, log, null, - new MetricsSource("1"), fakeWalGroupId)) { - int i = 0; - while (entryStream.hasNext()) { - assertNotNull(entryStream.next()); - i++; - } - assertEquals(nbRows, i); - - // should've read all entries - assertFalse(entryStream.hasNext()); - } - // reset everything for next loop - log.close(); - setUp(); - } - } - } + initWAL(); } /** @@ -235,7 +86,7 @@ public class TestWALEntryStream { appendToLogAndSync(); long oldPos; try (WALEntryStream entryStream = - new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { + new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { // There's one edit in the log, read it. Reading past it needs to throw exception assertTrue(entryStream.hasNext()); WAL.Entry entry = entryStream.peek(); @@ -249,8 +100,8 @@ public class TestWALEntryStream { appendToLogAndSync(); - try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, - log, null, new MetricsSource("1"), fakeWalGroupId)) { + try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, log, + null, new MetricsSource("1"), fakeWalGroupId)) { // Read the newly added entry, make sure we made progress WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); @@ -263,8 +114,8 @@ public class TestWALEntryStream { log.rollWriter(); appendToLogAndSync(); - try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, - log, null, new MetricsSource("1"), fakeWalGroupId)) { + try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, oldPos, log, + null, new MetricsSource("1"), fakeWalGroupId)) { WAL.Entry entry = entryStream.next(); assertNotEquals(oldPos, entryStream.getPosition()); assertNotNull(entry); @@ -288,9 +139,8 @@ public class TestWALEntryStream { public void testLogrollWhileStreaming() throws Exception { appendToLog("1"); appendToLog("2");// 2 - try (WALEntryStream entryStream = - new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null, - new MetricsSource("1"), fakeWalGroupId)) { + try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null, + new MetricsSource("1"), fakeWalGroupId)) { assertEquals("1", getRow(entryStream.next())); appendToLog("3"); // 3 - comes in after reader opened @@ -299,7 +149,7 @@ public class TestWALEntryStream { assertEquals("2", getRow(entryStream.next())); assertEquals(2, getQueue().size()); // we should not have dequeued yet since there's still an - // entry in first log + // entry in first log assertEquals("3", getRow(entryStream.next())); // if implemented improperly, this would be 4 // and 3 would be skipped assertEquals("4", getRow(entryStream.next())); // 4 @@ -316,8 +166,7 @@ public class TestWALEntryStream { public void testNewEntriesWhileStreaming() throws Exception { appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(logQueue, CONF, 0, log, null, - new MetricsSource("1"), fakeWalGroupId)) { + new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { entryStream.next(); // we've hit the end of the stream at this point // some new entries come in while we're streaming @@ -340,17 +189,15 @@ public class TestWALEntryStream { long lastPosition = 0; appendToLog("1"); try (WALEntryStream entryStream = - new WALEntryStream(logQueue, CONF, 0, log, null, - new MetricsSource("1"), fakeWalGroupId)) { + new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { entryStream.next(); // we've hit the end of the stream at this point appendToLog("2"); appendToLog("3"); lastPosition = entryStream.getPosition(); } // next stream should picks up where we left off - try (WALEntryStream entryStream = - new WALEntryStream(logQueue, CONF, lastPosition, log, null, - new MetricsSource("1"), fakeWalGroupId)) { + try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null, + new MetricsSource("1"), fakeWalGroupId)) { assertEquals("2", getRow(entryStream.next())); assertEquals("3", getRow(entryStream.next())); assertFalse(entryStream.hasNext()); // done @@ -368,27 +215,24 @@ public class TestWALEntryStream { long lastPosition = 0; appendEntriesToLogAndSync(3); // read only one element - try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, - log, null, new MetricsSource("1"), fakeWalGroupId)) { + try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null, + new MetricsSource("1"), fakeWalGroupId)) { entryStream.next(); lastPosition = entryStream.getPosition(); } // there should still be two more entries from where we left off - try (WALEntryStream entryStream = - new WALEntryStream(logQueue, CONF, lastPosition, log, null, - new MetricsSource("1"), fakeWalGroupId)) { + try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, lastPosition, log, null, + new MetricsSource("1"), fakeWalGroupId)) { assertNotNull(entryStream.next()); assertNotNull(entryStream.next()); assertFalse(entryStream.hasNext()); } } - @Test public void testEmptyStream() throws Exception { try (WALEntryStream entryStream = - new WALEntryStream(logQueue, CONF, 0, log, null, - new MetricsSource("1"), fakeWalGroupId)) { + new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { assertFalse(entryStream.hasNext()); } } @@ -398,9 +242,9 @@ public class TestWALEntryStream { Map attributes = new HashMap(); attributes.put("foo", Bytes.toBytes("foo-value")); attributes.put("bar", Bytes.toBytes("bar-value")); - WALKeyImpl key = new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), new ArrayList(), 0L, 0L, - mvcc, scopes, attributes); + WALKeyImpl key = + new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, EnvironmentEdgeManager.currentTime(), + new ArrayList(), 0L, 0L, mvcc, scopes, attributes); Assert.assertEquals(attributes, key.getExtendedAttributes()); WALProtos.WALKey.Builder builder = key.getBuilder(WALCellCodec.getNoneCompressor()); @@ -409,12 +253,12 @@ public class TestWALEntryStream { WALKeyImpl deserializedKey = new WALKeyImpl(); deserializedKey.readFieldsFromPb(serializedKey, WALCellCodec.getNoneUncompressor()); - //equals() only checks region name, sequence id and write time + // equals() only checks region name, sequence id and write time Assert.assertEquals(key, deserializedKey); - //can't use Map.equals() because byte arrays use reference equality + // can't use Map.equals() because byte arrays use reference equality Assert.assertEquals(key.getExtendedAttributes().keySet(), - deserializedKey.getExtendedAttributes().keySet()); - for (Map.Entry entry : deserializedKey.getExtendedAttributes().entrySet()){ + deserializedKey.getExtendedAttributes().keySet()); + for (Map.Entry entry : deserializedKey.getExtendedAttributes().entrySet()) { Assert.assertArrayEquals(key.getExtendedAttribute(entry.getKey()), entry.getValue()); } Assert.assertEquals(key.getReplicationScopes(), deserializedKey.getReplicationScopes()); @@ -423,8 +267,8 @@ public class TestWALEntryStream { private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) { ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); - when(mockSourceManager.getTotalBufferLimit()).thenReturn( - (long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); + when(mockSourceManager.getTotalBufferLimit()) + .thenReturn((long) HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT); Server mockServer = Mockito.mock(Server.class); ReplicationSource source = Mockito.mock(ReplicationSource.class); when(source.getSourceManager()).thenReturn(mockSourceManager); @@ -432,8 +276,8 @@ public class TestWALEntryStream { when(source.getWALFileLengthProvider()).thenReturn(log); when(source.getServer()).thenReturn(mockServer); when(source.isRecovered()).thenReturn(recovered); - MetricsReplicationGlobalSourceSource globalMetrics = Mockito.mock( - MetricsReplicationGlobalSourceSource.class); + MetricsReplicationGlobalSourceSource globalMetrics = + Mockito.mock(MetricsReplicationGlobalSourceSource.class); when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics); return source; } @@ -441,20 +285,18 @@ public class TestWALEntryStream { private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) { ReplicationSource source = mockReplicationSource(recovered, conf); when(source.isPeerEnabled()).thenReturn(true); - ReplicationSourceWALReader reader = - new ReplicationSourceWALReader(fs, conf, logQueue, 0, getDummyFilter(), source, - fakeWalGroupId); + ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0, + getDummyFilter(), source, fakeWalGroupId); reader.start(); return reader; } private ReplicationSourceWALReader createReaderWithBadReplicationFilter(int numFailures, - Configuration conf) { + Configuration conf) { ReplicationSource source = mockReplicationSource(false, conf); when(source.isPeerEnabled()).thenReturn(true); - ReplicationSourceWALReader reader = - new ReplicationSourceWALReader(fs, conf, logQueue, 0, - getIntermittentFailingFilter(numFailures), source, fakeWalGroupId); + ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, logQueue, 0, + getIntermittentFailingFilter(numFailures), source, fakeWalGroupId); reader.start(); return reader; } @@ -465,8 +307,7 @@ public class TestWALEntryStream { // get ending position long position; try (WALEntryStream entryStream = - new WALEntryStream(logQueue, CONF, 0, log, null, - new MetricsSource("1"), fakeWalGroupId)) { + new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { entryStream.next(); entryStream.next(); entryStream.next(); @@ -497,8 +338,7 @@ public class TestWALEntryStream { // get ending position long position; try (WALEntryStream entryStream = - new WALEntryStream(logQueue, CONF, 0, log, null, - new MetricsSource("1"), fakeWalGroupId)) { + new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { entryStream.next(); entryStream.next(); entryStream.next(); @@ -508,8 +348,8 @@ public class TestWALEntryStream { // start up a reader Path walPath = getQueue().peek(); int numFailuresInFilter = 5; - ReplicationSourceWALReader reader = createReaderWithBadReplicationFilter( - numFailuresInFilter, CONF); + ReplicationSourceWALReader reader = + createReaderWithBadReplicationFilter(numFailuresInFilter, CONF); WALEntryBatch entryBatch = reader.take(); assertEquals(numFailuresInFilter, FailingWALEntryFilter.numFailures()); @@ -565,7 +405,7 @@ public class TestWALEntryStream { @Override public boolean evaluate() throws Exception { return fs.getFileStatus(walPath).getLen() > 0 && - ((AbstractFSWAL) log).getInflightWALCloseCount() == 0; + ((AbstractFSWAL) log).getInflightWALCloseCount() == 0; } @Override @@ -608,13 +448,12 @@ public class TestWALEntryStream { @Test public void testReplicationSourceWALReaderDisabled() - throws IOException, InterruptedException, ExecutionException { + throws IOException, InterruptedException, ExecutionException { appendEntriesToLogAndSync(3); // get ending position long position; try (WALEntryStream entryStream = - new WALEntryStream(logQueue, CONF, 0, log, null, - new MetricsSource("1"), fakeWalGroupId)) { + new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { entryStream.next(); entryStream.next(); entryStream.next(); @@ -631,9 +470,8 @@ public class TestWALEntryStream { return enabled.get(); }); - ReplicationSourceWALReader reader = - new ReplicationSourceWALReader(fs, CONF, logQueue, 0, getDummyFilter(), - source, fakeWalGroupId); + ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, CONF, logQueue, 0, + getDummyFilter(), source, fakeWalGroupId); reader.start(); Future future = ForkJoinPool.commonPool().submit(() -> { return reader.take(); @@ -660,9 +498,8 @@ public class TestWALEntryStream { } private void appendToLog(String key) throws IOException { - final long txid = log.appendData(info, - new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, System.currentTimeMillis(), - mvcc, scopes), getWALEdit(key)); + final long txid = log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, + EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdit(key)); log.sync(txid); } @@ -674,33 +511,10 @@ public class TestWALEntryStream { log.sync(txid); } - private void appendToLogAndSync() throws IOException { - appendToLogAndSync(1); - } - - private void appendToLogAndSync(int count) throws IOException { - long txid = appendToLog(count); - log.sync(txid); - } - - private long appendToLog(int count) throws IOException { - return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, - System.currentTimeMillis(), mvcc, scopes), getWALEdits(count)); - } - - private WALEdit getWALEdits(int count) { - WALEdit edit = new WALEdit(); - for (int i = 0; i < count; i++) { - edit.add(new KeyValue(Bytes.toBytes(System.currentTimeMillis()), family, qualifier, - System.currentTimeMillis(), qualifier)); - } - return edit; - } - private WALEdit getWALEdit(String row) { WALEdit edit = new WALEdit(); - edit.add( - new KeyValue(Bytes.toBytes(row), family, qualifier, System.currentTimeMillis(), qualifier)); + edit.add(new KeyValue(Bytes.toBytes(row), family, qualifier, + EnvironmentEdgeManager.currentTime(), qualifier)); return edit; } @@ -735,29 +549,18 @@ public class TestWALEntryStream { throw new WALEntryFilterRetryableException("failing filter"); } - public static int numFailures(){ + public static int numFailures() { return countFailures; } } - class PathWatcher implements WALActionsListener { - - Path currentPath; - - @Override - public void preLogRoll(Path oldPath, Path newPath) { - logQueue.enqueueLog(newPath, fakeWalGroupId); - currentPath = newPath; - } - } - @Test public void testReadBeyondCommittedLength() throws IOException, InterruptedException { appendToLog("1"); appendToLog("2"); long size = log.getLogFileSizeIfBeingWritten(getQueue().peek()).getAsLong(); AtomicLong fileLength = new AtomicLong(size - 1); - try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, 0, + try (WALEntryStream entryStream = new WALEntryStream(logQueue, CONF, 0, p -> OptionalLong.of(fileLength.get()), null, new MetricsSource("1"), fakeWalGroupId)) { assertTrue(entryStream.hasNext()); assertNotNull(entryStream.next()); @@ -779,8 +582,8 @@ public class TestWALEntryStream { } /* - Test removal of 0 length log from logQueue if the source is a recovered source and - size of logQueue is only 1. + * Test removal of 0 length log from logQueue if the source is a recovered source and size of + * logQueue is only 1. */ @Test public void testEOFExceptionForRecoveredQueue() throws Exception { @@ -804,9 +607,8 @@ public class TestWALEntryStream { doNothing().when(metrics).decrSizeOfLogQueue(); ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source); localLogQueue.enqueueLog(emptyLog, fakeWalGroupId); - ReplicationSourceWALReader reader = - new ReplicationSourceWALReader(fs, conf, localLogQueue, 0, - getDummyFilter(), source, fakeWalGroupId); + ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, localLogQueue, 0, + getDummyFilter(), source, fakeWalGroupId); reader.run(); // ReplicationSourceWALReaderThread#handleEofException method will // remove empty log from logQueue. @@ -820,7 +622,7 @@ public class TestWALEntryStream { ReplicationSource source = mockReplicationSource(true, conf); ReplicationSourceLogQueue localLogQueue = new ReplicationSourceLogQueue(conf, metrics, source); // Create a 0 length log. - Path emptyLog = new Path(fs.getHomeDirectory(),"log.2"); + Path emptyLog = new Path(fs.getHomeDirectory(), "log.2"); FSDataOutputStream fsdos = fs.create(emptyLog); fsdos.close(); assertEquals(0, fs.getFileStatus(emptyLog).getLen()); @@ -834,7 +636,7 @@ public class TestWALEntryStream { ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class); // Make it look like the source is from recovered source. when(mockSourceManager.getOldSources()) - .thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface)source))); + .thenReturn(new ArrayList<>(Arrays.asList((ReplicationSourceInterface) source))); when(source.isPeerEnabled()).thenReturn(true); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); // Override the max retries multiplier to fail fast. @@ -842,11 +644,10 @@ public class TestWALEntryStream { conf.setBoolean("replication.source.eof.autorecovery", true); conf.setInt("replication.source.nb.batches", 10); // Create a reader thread. - ReplicationSourceWALReader reader = - new ReplicationSourceWALReader(fs, conf, localLogQueue, 0, - getDummyFilter(), source, fakeWalGroupId); - assertEquals("Initial log queue size is not correct", - 2, localLogQueue.getQueueSize(fakeWalGroupId)); + ReplicationSourceWALReader reader = new ReplicationSourceWALReader(fs, conf, localLogQueue, 0, + getDummyFilter(), source, fakeWalGroupId); + assertEquals("Initial log queue size is not correct", 2, + localLogQueue.getQueueSize(fakeWalGroupId)); reader.run(); // remove empty log from logQueue. @@ -861,11 +662,10 @@ public class TestWALEntryStream { private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException { for (int i = 0; i < numEntries; i++) { byte[] b = Bytes.toBytes(Integer.toString(i)); - KeyValue kv = new KeyValue(b,b,b); + KeyValue kv = new KeyValue(b, b, b); WALEdit edit = new WALEdit(); edit.add(kv); - WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, - HConstants.DEFAULT_CLUSTER_ID); + WALKeyImpl key = new WALKeyImpl(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID); NavigableMap scopes = new TreeMap(Bytes.BYTES_COMPARATOR); scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL); writer.append(new WAL.Entry(key, edit)); @@ -887,8 +687,8 @@ public class TestWALEntryStream { // After rolling there will be 2 wals in the queue assertEquals(2, logQueue.getMetrics().getSizeOfLogQueue()); - try (WALEntryStream entryStream = new WALEntryStream( - logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) { + try (WALEntryStream entryStream = + new WALEntryStream(logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) { // There's one edit in the log, read it. assertTrue(entryStream.hasNext()); WAL.Entry entry = entryStream.next(); @@ -900,13 +700,13 @@ public class TestWALEntryStream { } /** - * Tests that wals are closed cleanly and we read the trailer when we remove wal - * from WALEntryStream. + * Tests that wals are closed cleanly and we read the trailer when we remove wal from + * WALEntryStream. */ @Test public void testCleanClosedWALs() throws Exception { - try (WALEntryStream entryStream = new WALEntryStreamWithRetries( - logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) { + try (WALEntryStream entryStream = new WALEntryStreamWithRetries(logQueue, CONF, 0, log, null, + logQueue.getMetrics(), fakeWalGroupId)) { assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs()); appendToLogAndSync(); assertNotNull(entryStream.next()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamAsyncFSWAL.java new file mode 100644 index 00000000000..6ad0d152820 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamAsyncFSWAL.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +/** + * TestBasicWALEntryStream with {@link AsyncFSWALProvider} as the WAL provider. + */ +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestBasicWALEntryStreamAsyncFSWAL extends TestBasicWALEntryStream { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBasicWALEntryStreamAsyncFSWAL.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, AsyncFSWALProvider.class, + AbstractFSWALProvider.class); + startCluster(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamFSHLog.java new file mode 100644 index 00000000000..75e85b550de --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStreamFSHLog.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSHLogProvider; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +/** + * TestBasicWALEntryStream with {@link FSHLogProvider} as the WAL provider. + */ +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestBasicWALEntryStreamFSHLog extends TestBasicWALEntryStream { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBasicWALEntryStreamFSHLog.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class, + AbstractFSWALProvider.class); + startCluster(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCounts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCounts.java new file mode 100644 index 00000000000..bf4562014a0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCounts.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.hbase.HConstants; +import org.junit.Before; +import org.junit.Test; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +/** + * Try out different combinations of row count and KeyValue count + */ +public abstract class TestWALEntryStreamDifferentCounts extends WALEntryStreamTestBase { + + @Parameter(0) + public int nbRows; + + @Parameter(1) + public int walEditKVs; + + @Parameter(2) + public boolean isCompressionEnabled; + + @Parameters(name = "{index}: nbRows={0}, walEditKVs={1}, isCompressionEnabled={2}") + public static Iterable data() { + List params = new ArrayList<>(); + for (int nbRows : new int[] { 1500, 60000 }) { + for (int walEditKVs : new int[] { 1, 100 }) { + for (boolean isCompressionEnabled : new boolean[] { false, true }) { + params.add(new Object[] { nbRows, walEditKVs, isCompressionEnabled }); + } + } + } + return params; + } + + @Before + public void setUp() throws IOException { + CONF.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, isCompressionEnabled); + initWAL(); + } + + @Test + public void testDifferentCounts() throws Exception { + mvcc.advanceTo(1); + + for (int i = 0; i < nbRows; i++) { + appendToLogAndSync(walEditKVs); + } + + log.rollWriter(); + + try (WALEntryStream entryStream = + new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) { + int i = 0; + while (entryStream.hasNext()) { + assertNotNull(entryStream.next()); + i++; + } + assertEquals(nbRows, i); + + // should've read all entries + assertFalse(entryStream.hasNext()); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCountsAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCountsAsyncFSWAL.java new file mode 100644 index 00000000000..c734f7985ea --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCountsAsyncFSWAL.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestWALEntryStreamDifferentCountsAsyncFSWAL + extends TestWALEntryStreamDifferentCounts { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWALEntryStreamDifferentCountsAsyncFSWAL.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, AsyncFSWALProvider.class, + AbstractFSWALProvider.class); + startCluster(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCountsFSHLog.java similarity index 68% rename from hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java rename to hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCountsFSHLog.java index 32d6ec4b806..66dc00eaa41 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestFSHLogWALEntryStream.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntryStreamDifferentCountsFSHLog.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.replication.regionserver; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; @@ -27,25 +26,21 @@ import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; -/** - * TestWALEntryStream with {@link org.apache.hadoop.hbase.wal.FSHLogProvider} as the WAL provider. - */ +@RunWith(Parameterized.class) @Category({ ReplicationTests.class, LargeTests.class }) -public class TestFSHLogWALEntryStream extends TestWALEntryStream { +public class TestWALEntryStreamDifferentCountsFSHLog extends TestWALEntryStreamDifferentCounts { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestFSHLogWALEntryStream.class); + HBaseClassTestRule.forClass(TestWALEntryStreamDifferentCountsFSHLog.class); @BeforeClass public static void setUpBeforeClass() throws Exception { - TEST_UTIL = new HBaseTestingUtility(); - CONF = TEST_UTIL.getConfiguration(); - CONF.setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class, AbstractFSWALProvider.class); - CONF.setLong("replication.source.sleepforretries", 10); - TEST_UTIL.startMiniDFSCluster(3); - cluster = TEST_UTIL.getDFSCluster(); - fs = cluster.getFileSystem(); + TEST_UTIL.getConfiguration().setClass(WALFactory.WAL_PROVIDER, FSHLogProvider.class, + AbstractFSWALProvider.class); + startCluster(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStreamTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStreamTestBase.java new file mode 100644 index 00000000000..d232a6bd734 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryStreamTestBase.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.mockito.Mockito.mock; + +import java.io.IOException; +import java.util.NavigableMap; +import java.util.TreeMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNameTestRule; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Rule; +import org.junit.rules.TestName; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +/** + * Base class for WALEntryStream tests. + */ +public abstract class WALEntryStreamTestBase { + + protected static final long TEST_TIMEOUT_MS = 5000; + protected static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();; + protected static Configuration CONF; + protected static FileSystem fs; + protected static MiniDFSCluster cluster; + protected static final TableName tableName = TableName.valueOf("tablename"); + protected static final byte[] family = Bytes.toBytes("column"); + protected static final byte[] qualifier = Bytes.toBytes("qualifier"); + protected static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName) + .setStartKey(HConstants.EMPTY_START_ROW).setEndKey(HConstants.LAST_ROW).build(); + protected static final NavigableMap scopes = getScopes(); + protected final String fakeWalGroupId = "fake-wal-group-id"; + + /** + * Test helper that waits until a non-null entry is available in the stream next or times out. A + * {@link WALEntryStream} provides a streaming access to a queue of log files. Since the stream + * can be consumed as the file is being written, callers relying on {@link WALEntryStream#next()} + * may need to retry multiple times before an entry appended to the WAL is visible to the stream + * consumers. One such cause of delay is the close() of writer writing these log files. While the + * closure is in progress, the stream does not switch to the next log in the queue and next() may + * return null entries. This utility wraps these retries into a single next call and that makes + * the test code simpler. + */ + protected static class WALEntryStreamWithRetries extends WALEntryStream { + // Class member to be able to set a non-final from within a lambda. + private Entry result; + + public WALEntryStreamWithRetries(ReplicationSourceLogQueue logQueue, Configuration conf, + long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, + MetricsSource metrics, String walGroupId) throws IOException { + super(logQueue, conf, startPosition, walFileLengthProvider, serverName, metrics, walGroupId); + } + + @Override + public Entry next() { + Waiter.waitFor(CONF, TEST_TIMEOUT_MS, () -> { + result = WALEntryStreamWithRetries.super.next(); + return result != null; + }); + return result; + } + } + + private static NavigableMap getScopes() { + NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); + scopes.put(family, 1); + return scopes; + } + + class PathWatcher implements WALActionsListener { + + Path currentPath; + + @Override + public void preLogRoll(Path oldPath, Path newPath) { + logQueue.enqueueLog(newPath, fakeWalGroupId); + currentPath = newPath; + } + } + + protected WAL log; + protected ReplicationSourceLogQueue logQueue; + protected PathWatcher pathWatcher; + + @Rule + public TestName tn = new TestName(); + protected final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); + + protected static void startCluster() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + CONF = TEST_UTIL.getConfiguration(); + CONF.setLong("replication.source.sleepforretries", 10); + TEST_UTIL.startMiniDFSCluster(3); + + cluster = TEST_UTIL.getDFSCluster(); + fs = cluster.getFileSystem(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + protected void initWAL() throws IOException { + ReplicationSource source = mock(ReplicationSource.class); + MetricsSource metricsSource = new MetricsSource("2"); + // Source with the same id is shared and carries values from the last run + metricsSource.clear(); + logQueue = new ReplicationSourceLogQueue(CONF, metricsSource, source); + pathWatcher = new PathWatcher(); + final WALFactory wals = + new WALFactory(CONF, TableNameTestRule.cleanUpTestName(tn.getMethodName())); + wals.getWALProvider().addWALActionsListener(pathWatcher); + log = wals.getWAL(info); + } + + @After + public void tearDown() throws Exception { + Closeables.close(log, true); + } + + protected void appendToLogAndSync() throws IOException { + appendToLogAndSync(1); + } + + protected void appendToLogAndSync(int count) throws IOException { + long txid = appendToLog(count); + log.sync(txid); + } + + protected long appendToLog(int count) throws IOException { + return log.appendData(info, new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, + EnvironmentEdgeManager.currentTime(), mvcc, scopes), getWALEdits(count)); + } + + protected WALEdit getWALEdits(int count) { + WALEdit edit = new WALEdit(); + for (int i = 0; i < count; i++) { + edit.add(new KeyValue(Bytes.toBytes(EnvironmentEdgeManager.currentTime()), family, qualifier, + EnvironmentEdgeManager.currentTime(), qualifier)); + } + return edit; + } +}