HBASE-25992 Polish the ReplicationSourceWALReader code for 2.x after HBASE-25596 (#3376)

Signed-off-by: Yulin Niu <niuyulin@apache.org>
This commit is contained in:
Duo Zhang 2021-06-20 16:32:42 +08:00 committed by GitHub
parent f0a39590d0
commit d2923755ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 94 additions and 132 deletions

View File

@ -122,65 +122,51 @@ class ReplicationSourceWALReader extends Thread {
@Override @Override
public void run() { public void run() {
int sleepMultiplier = 1; int sleepMultiplier = 1;
WALEntryBatch batch = null; while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream
WALEntryStream entryStream = null; WALEntryBatch batch = null;
try { try (WALEntryStream entryStream =
// we only loop back here if something fatal happened to our stream new WALEntryStream(logQueue, conf, currentPosition,
while (isReaderRunning()) { source.getWALFileLengthProvider(), source.getServerWALsBelongTo(),
try { source.getSourceMetrics(), walGroupId)) {
entryStream = while (isReaderRunning()) { // loop here to keep reusing stream while we can
new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(), batch = null;
source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId); if (!source.isPeerEnabled()) {
while (isReaderRunning()) { // loop here to keep reusing stream while we can Threads.sleep(sleepForRetries);
if (!source.isPeerEnabled()) { continue;
Threads.sleep(sleepForRetries); }
continue; if (!checkQuota()) {
} continue;
if (!checkQuota()) { }
continue; batch = tryAdvanceStreamAndCreateWALBatch(entryStream);
} if (batch == null) {
// got no entries and didn't advance position in WAL
batch = createBatch(entryStream); handleEmptyWALEntryBatch();
batch = readWALEntries(entryStream, batch); entryStream.reset(); // reuse stream
continue;
}
// if we have already switched a file, skip reading and put it directly to the ship queue
if (!batch.isEndOfFile()) {
readWALEntries(entryStream, batch);
currentPosition = entryStream.getPosition(); currentPosition = entryStream.getPosition();
if (batch == null) {
// either the queue have no WAL to read
// or got no new entries (didn't advance position in WAL)
handleEmptyWALEntryBatch();
entryStream.reset(); // reuse stream
} else {
addBatchToShippingQueue(batch);
}
} }
} catch (WALEntryFilterRetryableException | IOException e) { // stream related // need to propagate the batch even it has no entries since it may carry the last
if (handleEofException(e, batch)) { // sequence id information for serial replication.
sleepMultiplier = 1; LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
} else { entryBatchQueue.put(batch);
LOG.warn("Failed to read stream of replication entries " sleepMultiplier = 1;
+ "or replication filter is recovering", e);
if (sleepMultiplier < maxRetriesMultiplier) {
sleepMultiplier++;
}
Threads.sleep(sleepForRetries * sleepMultiplier);
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
} finally {
entryStream.close();
} }
} catch (IOException e) { // stream related
if (!handleEofException(e, batch)) {
LOG.warn("Failed to read stream of replication entries", e);
if (sleepMultiplier < maxRetriesMultiplier) {
sleepMultiplier ++;
}
Threads.sleep(sleepForRetries * sleepMultiplier);
}
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue");
Thread.currentThread().interrupt();
} }
} catch (IOException e) {
if (sleepMultiplier < maxRetriesMultiplier) {
LOG.debug("Failed to read stream of replication entries: ", e);
sleepMultiplier++;
} else {
LOG.error("Failed to read stream of replication entries", e);
}
Threads.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) {
LOG.trace("Interrupted while sleeping between WAL reads");
Thread.currentThread().interrupt();
} }
} }
@ -211,29 +197,10 @@ class ReplicationSourceWALReader extends Thread {
// We need to get the WALEntryBatch from the caller so we can add entries in there // We need to get the WALEntryBatch from the caller so we can add entries in there
// This is required in case there is any exception in while reading entries // This is required in case there is any exception in while reading entries
// we do want to loss the existing entries in the batch // we do not want to loss the existing entries in the batch
protected WALEntryBatch readWALEntries(WALEntryStream entryStream, protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
WALEntryBatch batch) throws IOException, InterruptedException { throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath(); Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
// check whether we have switched a file
if (currentPath != null && switched(entryStream, currentPath)) {
return WALEntryBatch.endOfFile(currentPath);
} else {
// This would mean either no more files in the queue
// or there is no new data yet on the current wal
return null;
}
}
if (currentPath != null) {
if (switched(entryStream, currentPath)) {
return WALEntryBatch.endOfFile(currentPath);
}
} else {
// when reading from the entry stream first time we will enter here
currentPath = entryStream.getCurrentPath();
}
batch.setLastWalPath(currentPath);
for (;;) { for (;;) {
Entry entry = entryStream.next(); Entry entry = entryStream.next();
batch.setLastWalPosition(entryStream.getPosition()); batch.setLastWalPosition(entryStream.getPosition());
@ -253,7 +220,6 @@ class ReplicationSourceWALReader extends Thread {
break; break;
} }
} }
return batch;
} }
private void handleEmptyWALEntryBatch() throws InterruptedException { private void handleEmptyWALEntryBatch() throws InterruptedException {
@ -270,6 +236,25 @@ class ReplicationSourceWALReader extends Thread {
} }
} }
private WALEntryBatch tryAdvanceStreamAndCreateWALBatch(WALEntryStream entryStream)
throws IOException {
Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
// check whether we have switched a file
if (currentPath != null && switched(entryStream, currentPath)) {
return WALEntryBatch.endOfFile(currentPath);
} else {
return null;
}
}
if (currentPath != null) {
if (switched(entryStream, currentPath)) {
return WALEntryBatch.endOfFile(currentPath);
}
}
return createBatch(entryStream);
}
/** /**
* This is to handle the EOFException from the WAL entry stream. EOFException should * This is to handle the EOFException from the WAL entry stream. EOFException should
* be handled carefully because there are chances of data loss because of never replicating * be handled carefully because there are chances of data loss because of never replicating
@ -277,19 +262,18 @@ class ReplicationSourceWALReader extends Thread {
* If there was only one log in the queue before EOF, we ship the empty batch here * If there was only one log in the queue before EOF, we ship the empty batch here
* and since reader is still active, in the next iteration of reader we will * and since reader is still active, in the next iteration of reader we will
* stop the reader. * stop the reader.
* <p/>
* If there was more than one log in the queue before EOF, we ship the existing batch * If there was more than one log in the queue before EOF, we ship the existing batch
* and reset the wal patch and position to the log with EOF, so shipper can remove * and reset the wal patch and position to the log with EOF, so shipper can remove
* logs from replication queue * logs from replication queue
* @return true only the IOE can be handled * @return true only the IOE can be handled
*/ */
private boolean handleEofException(Exception e, WALEntryBatch batch) private boolean handleEofException(Exception e, WALEntryBatch batch) {
throws InterruptedException {
PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId); PriorityBlockingQueue<Path> queue = logQueue.getQueue(walGroupId);
// Dump the log even if logQueue size is 1 if the source is from recovered Source // Dump the log even if logQueue size is 1 if the source is from recovered Source
// since we don't add current log to recovered source queue so it is safe to remove. // since we don't add current log to recovered source queue so it is safe to remove.
if ((e instanceof EOFException || e.getCause() instanceof EOFException) if ((e instanceof EOFException || e.getCause() instanceof EOFException) &&
&& (source.isRecovered() || queue.size() > 1) (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) {
&& this.eofAutoRecovery) {
Path head = queue.peek(); Path head = queue.peek();
try { try {
if (fs.getFileStatus(head).getLen() == 0) { if (fs.getFileStatus(head).getLen() == 0) {
@ -297,16 +281,18 @@ class ReplicationSourceWALReader extends Thread {
LOG.warn("Forcing removal of 0 length log in queue: {}", head); LOG.warn("Forcing removal of 0 length log in queue: {}", head);
logQueue.remove(walGroupId); logQueue.remove(walGroupId);
currentPosition = 0; currentPosition = 0;
// After we removed the WAL from the queue, we should if (batch != null) {
// try shipping the existing batch of entries and set the wal position // After we removed the WAL from the queue, we should try shipping the existing batch of
// and path to the wal just dequeued to correctly remove logs from the zk // entries
batch.setLastWalPath(head); addBatchToShippingQueue(batch);
batch.setLastWalPosition(currentPosition); }
addBatchToShippingQueue(batch);
return true; return true;
} }
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("Couldn't get file length information about log " + queue.peek(), ioe); LOG.warn("Couldn't get file length information about log " + queue.peek(), ioe);
} catch (InterruptedException ie) {
LOG.trace("Interrupted while adding WAL batch to ship queue");
Thread.currentThread().interrupt();
} }
} }
return false; return false;
@ -316,10 +302,8 @@ class ReplicationSourceWALReader extends Thread {
* Update the batch try to ship and return true if shipped * Update the batch try to ship and return true if shipped
* @param batch Batch of entries to ship * @param batch Batch of entries to ship
* @throws InterruptedException throws interrupted exception * @throws InterruptedException throws interrupted exception
* @throws IOException throws io exception from stream
*/ */
private void addBatchToShippingQueue(WALEntryBatch batch) private void addBatchToShippingQueue(WALEntryBatch batch) throws InterruptedException {
throws InterruptedException, IOException {
// need to propagate the batch even it has no entries since it may carry the last // need to propagate the batch even it has no entries since it may carry the last
// sequence id information for serial replication. // sequence id information for serial replication.
LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries()); LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
@ -348,7 +332,7 @@ class ReplicationSourceWALReader extends Thread {
return true; return true;
} }
protected final WALEntryBatch createBatch(WALEntryStream entryStream) { private WALEntryBatch createBatch(WALEntryStream entryStream) {
return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
} }

View File

@ -50,27 +50,10 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
} }
@Override @Override
protected WALEntryBatch readWALEntries(WALEntryStream entryStream, WALEntryBatch batch) protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch)
throws IOException, InterruptedException { throws IOException, InterruptedException {
Path currentPath = entryStream.getCurrentPath(); Path currentPath = entryStream.getCurrentPath();
if (!entryStream.hasNext()) {
// check whether we have switched a file
if (currentPath != null && switched(entryStream, currentPath)) {
return WALEntryBatch.endOfFile(currentPath);
} else {
return null;
}
}
if (currentPath != null) {
if (switched(entryStream, currentPath)) {
return WALEntryBatch.endOfFile(currentPath);
}
} else {
// when reading from the entry stream first time we will enter here
currentPath = entryStream.getCurrentPath();
}
long positionBefore = entryStream.getPosition(); long positionBefore = entryStream.getPosition();
batch = createBatch(entryStream);
for (;;) { for (;;) {
Entry entry = entryStream.peek(); Entry entry = entryStream.peek();
boolean doFiltering = true; boolean doFiltering = true;
@ -122,7 +105,6 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader
break; break;
} }
} }
return batch;
} }
private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch) private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch)

View File

@ -47,14 +47,14 @@ import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@Category @Category({ ReplicationTests.class, LargeTests.class })
({ ReplicationTests.class, LargeTests.class }) public class TestReplicationEmptyWALRecovery public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
extends TestReplicationBase {
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@ClassRule public static final HBaseClassTestRule CLASS_RULE = @ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class); HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
@Before @Before
@ -151,10 +151,9 @@ import org.junit.experimental.categories.Category;
} }
/** /**
* Test empty WAL along with non empty WALs in the same batch. This test is to make sure * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we
* when we see the empty and handle the EOF exception, we are able to existing the previous * see the empty and handle the EOF exception, we are able to ship the previous batch of entries
* batch of entries without loosing it. This test also tests the number of batches shipped * without loosing it. This test also tests the number of batches shipped
*
* @throws Exception throws any exception * @throws Exception throws any exception
*/ */
@Test @Test
@ -174,7 +173,6 @@ import org.junit.experimental.categories.Category;
Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
appendEntriesToWal(numOfEntriesToReplicate, wal); appendEntriesToWal(numOfEntriesToReplicate, wal);
wal.rollWriter();
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts); Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts);
UTIL1.getTestFileSystem().create(emptyWalPath).close(); UTIL1.getTestFileSystem().create(emptyWalPath).close();
@ -183,10 +181,10 @@ import org.junit.experimental.categories.Category;
injectEmptyWAL(numRs, emptyWalPaths); injectEmptyWAL(numRs, emptyWalPaths);
// There should be three WALs in queue // There should be three WALs in queue
// 1. empty WAL // 1. non empty WAL
// 2. non empty WAL // 2. empty WAL
// 3. live WAL // 3. live WAL
//verifyNumberOfLogsInQueue(3, numRs); verifyNumberOfLogsInQueue(3, numRs);
hbaseAdmin.enableReplicationPeer(PEER_ID2); hbaseAdmin.enableReplicationPeer(PEER_ID2);
// ReplicationSource should advance past the empty wal, or else the test will fail // ReplicationSource should advance past the empty wal, or else the test will fail
waitForLogAdvance(numRs); waitForLogAdvance(numRs);
@ -209,10 +207,9 @@ import org.junit.experimental.categories.Category;
} }
/** /**
* Test empty WAL along with non empty WALs in the same batch. This test is to make sure * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we
* when we see the empty WAL and handle the EOF exception, we are able to proceed * see the empty WAL and handle the EOF exception, we are able to proceed with next batch and
* with next batch and replicate it properly without missing data. * replicate it properly without missing data.
*
* @throws Exception throws any exception * @throws Exception throws any exception
*/ */
@Test @Test
@ -265,9 +262,8 @@ import org.junit.experimental.categories.Category;
} }
/** /**
* This test make sure we replicate all the enties from the non empty WALs which * This test make sure we replicate all the enties from the non empty WALs which are surrounding
* are surrounding the empty WALs * the empty WALs
*
* @throws Exception throws exception * @throws Exception throws exception
*/ */
@Test @Test