HBASE-24807 Backport HBASE-20417 to branch-1 (#2197)

Signed-off-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
Wellington Ramos Chevreuil 2020-08-05 12:00:43 +01:00 committed by GitHub
parent 2c047eafc0
commit 2fd587384a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 93 additions and 7 deletions

View File

@ -858,7 +858,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
new ClusterMarkingEntryFilter(clusterId, peerClusterId, replicationEndpoint));
ChainWALEntryFilter readerFilter = new ChainWALEntryFilter(filters);
entryReader = new ReplicationSourceWALReaderThread(manager, replicationQueueInfo, queue,
startPosition, fs, conf, readerFilter, metrics);
startPosition, fs, conf, readerFilter, metrics, ReplicationSource.this);
Threads.setDaemonThreadRunning(entryReader, threadName
+ ".replicationSource.replicationWALReaderThread." + walGroupId + "," + peerClusterZnode,
handler);

View File

@ -80,6 +80,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
private AtomicLong totalBufferUsed;
private long totalBufferQuota;
private ReplicationSource source;
/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue.
@ -94,8 +96,8 @@ public class ReplicationSourceWALReaderThread extends Thread {
*/
public ReplicationSourceWALReaderThread(ReplicationSourceManager manager,
ReplicationQueueInfo replicationQueueInfo, PriorityBlockingQueue<Path> logQueue,
long startPosition,
FileSystem fs, Configuration conf, WALEntryFilter filter, MetricsSource metrics) {
long startPosition, FileSystem fs, Configuration conf, WALEntryFilter filter,
MetricsSource metrics, ReplicationSource source) {
this.replicationQueueInfo = replicationQueueInfo;
this.logQueue = logQueue;
this.lastReadPath = logQueue.peek();
@ -118,6 +120,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.metrics = metrics;
this.entryBatchQueue = new LinkedBlockingQueue<>(batchCount);
this.source = source;
LOG.info("peerClusterZnode=" + replicationQueueInfo.getPeerClusterZnode()
+ ", ReplicationSourceWALReaderThread : " + replicationQueueInfo.getPeerId()
+ " inited, replicationBatchSizeCapacity=" + replicationBatchSizeCapacity
@ -132,6 +135,10 @@ public class ReplicationSourceWALReaderThread extends Thread {
try (WALEntryStream entryStream =
new WALEntryStream(logQueue, fs, conf, lastReadPosition, metrics)) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkQuota()) {
continue;
}

View File

@ -27,6 +27,8 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
@ -38,7 +40,12 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
@ -81,7 +88,9 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
@RunWith(MockitoJUnitRunner.class)
@Category({ ReplicationTests.class, LargeTests.class })
@ -359,10 +368,12 @@ public class TestWALEntryStream {
// start up a batcher
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
ReplicationSourceWALReaderThread batcher =
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(),walQueue, 0,
fs, conf, getDummyFilter(), new MetricsSource("1"));
fs, conf, getDummyFilter(), new MetricsSource("1"), source);
Path walPath = walQueue.peek();
batcher.start();
WALEntryBatch entryBatch = batcher.take();
@ -398,10 +409,13 @@ public class TestWALEntryStream {
}
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
ReplicationSourceWALReaderThread reader =
new ReplicationSourceWALReaderThread(mockSourceManager, getRecoveredQueueInfo(),
walQueue, 0, fs, conf, getDummyFilter(), new MetricsSource("1"));
walQueue, 0, fs, conf, getDummyFilter(),
new MetricsSource("1"), source);
Path walPath = walQueue.toArray(new Path[2])[1];
reader.start();
WALEntryBatch entryBatch = reader.take();
@ -456,10 +470,12 @@ public class TestWALEntryStream {
appendEntriesToLog(2);
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
final ReplicationSourceWALReaderThread reader =
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
0, fs, conf, filter, new MetricsSource("1"));
0, fs, conf, filter, new MetricsSource("1"), source);
reader.start();
WALEntryBatch entryBatch = reader.take();
@ -490,10 +506,12 @@ public class TestWALEntryStream {
final long eof = getPosition(firstWAL);
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.isPeerEnabled()).thenReturn(true);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
final ReplicationSourceWALReaderThread reader =
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
0, fs, conf, filter, new MetricsSource("1"));
0, fs, conf, filter, new MetricsSource("1"), source);
reader.start();
// reader won't put any batch, even if EOF reached.
@ -613,4 +631,65 @@ public class TestWALEntryStream {
currentPath = newPath;
}
}
@Test
public void testReplicationSourceWALReaderDisabled()
throws IOException, InterruptedException, ExecutionException {
for(int i=0; i<3; i++) {
//append and sync
appendToLog("key" + i);
}
// get ending position
long position;
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, conf, 0, new MetricsSource("1"))) {
entryStream.next();
entryStream.next();
entryStream.next();
position = entryStream.getPosition();
}
// start up a reader
Path walPath = walQueue.peek();
ReplicationSource source = Mockito.mock(ReplicationSource.class);
when(source.getSourceMetrics()).thenReturn(new MetricsSource("1"));
final AtomicBoolean enabled = new AtomicBoolean(false);
when(source.isPeerEnabled()).thenAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
return enabled.get();
}
});
ReplicationSourceManager mockSourceManager = mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
final ReplicationSourceWALReaderThread reader =
new ReplicationSourceWALReaderThread(mockSourceManager, getQueueInfo(), walQueue,
0, fs, conf, getDummyFilter(), new MetricsSource("1"), source);
reader.start();
Future<WALEntryBatch> future =
Executors.newSingleThreadExecutor().submit(new Callable<WALEntryBatch>() {
@Override
public WALEntryBatch call() throws Exception {
return reader.take();
}
});
// make sure that the isPeerEnabled has been called several times
verify(source, timeout(30000).atLeast(5)).isPeerEnabled();
// confirm that we can read nothing if the peer is disabled
assertFalse(future.isDone());
// then enable the peer, we should get the batch
enabled.set(true);
WALEntryBatch entryBatch = future.get();
// should've batched up our entries
assertNotNull(entryBatch);
assertEquals(3, entryBatch.getWalEntries().size());
assertEquals(position, entryBatch.getLastWalPosition());
assertEquals(walPath, entryBatch.getLastWalPath());
assertEquals(3, entryBatch.getNbRowKeys());
}
}