HBASE-20417 Do not read wal entries when peer is disabled

This commit is contained in:
zhangduo 2018-04-14 22:00:15 +08:00
parent 1339ff9666
commit 773aff90fd
3 changed files with 68 additions and 10 deletions

View File

@ -87,12 +87,11 @@ public class ReplicationSourceShipper extends Thread {
setWorkerState(WorkerState.RUNNING); setWorkerState(WorkerState.RUNNING);
// Loop until we close down // Loop until we close down
while (isActive()) { while (isActive()) {
int sleepMultiplier = 1;
// Sleep until replication is enabled again // Sleep until replication is enabled again
if (!source.isPeerEnabled()) { if (!source.isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) { // The peer enabled check is in memory, not expensive, so do not need to increase the
sleepMultiplier++; // sleep interval as it may cause a long lag when we enable the peer.
} sleepForRetries("Replication is disabled", 1);
continue; continue;
} }
try { try {
@ -188,8 +187,8 @@ public class ReplicationSourceShipper extends Thread {
} }
break; break;
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn(source.getReplicationEndpoint().getClass().getName() + " threw unknown exception:" LOG.warn("{} threw unknown exception:",
+ org.apache.hadoop.util.StringUtils.stringifyException(ex)); source.getReplicationEndpoint().getClass().getName(), ex);
if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) { if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
sleepMultiplier++; sleepMultiplier++;
} }
@ -292,9 +291,7 @@ public class ReplicationSourceShipper extends Thread {
*/ */
public boolean sleepForRetries(String msg, int sleepMultiplier) { public boolean sleepForRetries(String msg, int sleepMultiplier) {
try { try {
if (LOG.isTraceEnabled()) { LOG.trace("{}, sleeping {} times {}", msg, sleepForRetries, sleepMultiplier);
LOG.trace(msg + ", sleeping " + sleepForRetries + " times " + sleepMultiplier);
}
Thread.sleep(this.sleepForRetries * sleepMultiplier); Thread.sleep(this.sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.debug("Interrupted while sleeping between retries"); LOG.debug("Interrupted while sleeping between retries");

View File

@ -126,6 +126,10 @@ class ReplicationSourceWALReader extends Thread {
source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
source.getSourceMetrics())) { source.getSourceMetrics())) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can while (isReaderRunning()) { // loop here to keep reusing stream while we can
if (!source.isPeerEnabled()) {
Threads.sleep(sleepForRetries);
continue;
}
if (!checkQuota()) { if (!checkQuota()) {
continue; continue;
} }

View File

@ -30,7 +30,12 @@ import java.io.IOException;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.OptionalLong; import java.util.OptionalLong;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -328,7 +333,7 @@ public class TestWALEntryStream {
} }
} }
private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) { private ReplicationSource mockReplicationSource(boolean recovered, Configuration conf) {
ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class); ReplicationSourceManager mockSourceManager = Mockito.mock(ReplicationSourceManager.class);
when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0)); when(mockSourceManager.getTotalBufferUsed()).thenReturn(new AtomicLong(0));
Server mockServer = Mockito.mock(Server.class); Server mockServer = Mockito.mock(Server.class);
@ -338,6 +343,12 @@ public class TestWALEntryStream {
when(source.getWALFileLengthProvider()).thenReturn(log); when(source.getWALFileLengthProvider()).thenReturn(log);
when(source.getServer()).thenReturn(mockServer); when(source.getServer()).thenReturn(mockServer);
when(source.isRecovered()).thenReturn(recovered); when(source.isRecovered()).thenReturn(recovered);
return source;
}
private ReplicationSourceWALReader createReader(boolean recovered, Configuration conf) {
ReplicationSource source = mockReplicationSource(recovered, conf);
when(source.isPeerEnabled()).thenReturn(true);
ReplicationSourceWALReader reader = ReplicationSourceWALReader reader =
new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source); new ReplicationSourceWALReader(fs, conf, walQueue, 0, getDummyFilter(), source);
reader.start(); reader.start();
@ -460,6 +471,52 @@ public class TestWALEntryStream {
assertFalse(entryBatch.isEndOfFile()); assertFalse(entryBatch.isEndOfFile());
} }
@Test
public void testReplicationSourceWALReaderDisabled()
throws IOException, InterruptedException, ExecutionException {
appendEntriesToLogAndSync(3);
// get ending position
long position;
try (WALEntryStream entryStream =
new WALEntryStream(walQueue, fs, CONF, 0, log, null, new MetricsSource("1"))) {
entryStream.next();
entryStream.next();
entryStream.next();
position = entryStream.getPosition();
}
// start up a reader
Path walPath = walQueue.peek();
ReplicationSource source = mockReplicationSource(false, CONF);
AtomicInteger invokeCount = new AtomicInteger(0);
AtomicBoolean enabled = new AtomicBoolean(false);
when(source.isPeerEnabled()).then(i -> {
invokeCount.incrementAndGet();
return enabled.get();
});
ReplicationSourceWALReader reader =
new ReplicationSourceWALReader(fs, CONF, walQueue, 0, getDummyFilter(), source);
reader.start();
Future<WALEntryBatch> future = ForkJoinPool.commonPool().submit(() -> {
return reader.take();
});
// make sure that the isPeerEnabled has been called several times
TEST_UTIL.waitFor(30000, () -> invokeCount.get() >= 5);
// confirm that we can read nothing if the peer is disabled
assertFalse(future.isDone());
// then enable the peer, we should get the batch
enabled.set(true);
WALEntryBatch entryBatch = future.get();
// should've batched up our entries
assertNotNull(entryBatch);
assertEquals(3, entryBatch.getWalEntries().size());
assertEquals(position, entryBatch.getLastWalPosition());
assertEquals(walPath, entryBatch.getLastWalPath());
assertEquals(3, entryBatch.getNbRowKeys());
}
private String getRow(WAL.Entry entry) { private String getRow(WAL.Entry entry) {
Cell cell = entry.getEdit().getCells().get(0); Cell cell = entry.getEdit().getCells().get(0);
return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); return Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());