HBASE-20417 Do not read wal entries when peer is disabled

This commit is contained in:
zhangduo 2018-04-14 22:00:15 +08:00
parent 1cb05a18bc
commit f73986d110
3 changed files with 68 additions and 10 deletions

View File

@ -87,12 +87,11 @@ public class ReplicationSourceShipper extends Thread {
setWorkerState(WorkerState.RUNNING);
// Loop until we close down
while (isActive()) {
int sleepMultiplier = 1;
// Sleep until replication is enabled again
if (!source.isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
}
// The peer enabled check is in memory, not expensive, so do not need to increase the
// sleep interval as it may cause a long lag when we enable the peer.
sleepForRetries("Replication is disabled", 1);
continue;
}
try {
@ -188,8 +187,8 @@ public class ReplicationSourceShipper extends Thread {
}
break;
} catch (Exception ex) {
LOG.warn(source.getReplicationEndpoint().getClass().getName() + " threw unknown exception:"
+ org.apache.hadoop.util.StringUtils.stringifyException(ex));
LOG.warn("{} threw unknown exception:",
source.getReplicationEndpoint().getClass().getName(), ex);
if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
sleepMultiplier++;
}
@ -292,9 +291,7 @@ public class ReplicationSourceShipper extends Thread {
*/
public boolean sleepForRetries(String msg, int sleepMultiplier) {
try {
if (LOG.isTraceEnabled()) {
LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
}
LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier);
Thread.sleep(this.sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping between retries");

View File

@ -126,6 +126,10 @@ class ReplicationSourceWALReader extends Thread {
source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
source.getSourceMetrics())) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkQuota()) {
continue;
}

View File

@ -30,7 +30,12 @@ import java.io.IOException;
import java.util.NavigableMap;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -328,7 +333,7 @@ public class TestWALEntryStream {
}
}
private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
Server mockServer = Mockito.mock(Server.class);
@ -338,6 +343,12 @@ public class TestWALEntryStream {
when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer);
when(source.isRecovered()).thenReturn(recovered);
return source;
}
private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
ReplicationSource source = mockReplicationSource(recovered, conf);
when(source.isPeerEnabled()).thenReturn(true);
ReplicationSourceWALReader reader =
new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source);
reader.start();
@ -460,6 +471,52 @@ public class TestWALEntryStream {
assertFalse(entryBatch.isEndOfFile());
}
@Test
public void testReplicationSourceWALReaderDisabled()
throws IOException, InterruptedException, ExecutionException {
appendEntriesToLogAndSync(3);
// get ending position
long position;
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
entryStream.next();
entryStream.next();
entryStream.next();
position = entryStream.getPosition();
}
// start up a reader
Path walPath = walQueue.peek();
ReplicationSource source = mockReplicationSource(false, CONF);
AtomicInteger invokeCount = new AtomicInteger(0);
AtomicBoolean enabled = new AtomicBoolean(false);
when(source.isPeerEnabled()).then(i -> {
invokeCount.incrementAndGet();
return enabled.get();
});
ReplicationSourceWALReader reader =
new ReplicationSourceWALReader(fs, CONF, walQueue, 0, getDummyFilter(), source);
reader.start();
Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> {
return reader.take();
});
// make sure that the isPeerEnabled has been called several times
TEST_UTIL.waitFor(30000, () -> invokeCount.get() >= 5);
// confirm that we can read nothing if the peer is disabled
assertFalse(future.isDone());
// then enable the peer, we should get the batch
enabled.set(true);
WALEntryBatch entryBatch = future.get();
// should've batched up our entries
assertNotNull(entryBatch);
assertEquals(3, entryBatch.getWalEntries().size());
assertEquals(position, entryBatch.getLastWalPosition());
assertEquals(walPath, entryBatch.getLastWalPath());
assertEquals(3, entryBatch.getNbRowKeys());
}
private String getRow(WAL.Entry entry) {
Cell cell = entry.getEdit().getCells().get(0);
return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());