diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index 3631cad2962..6f2856663a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -122,65 +122,51 @@ class ReplicationSourceWALReader extends Thread { @Override public void run() { int sleepMultiplier = 1; - WALEntryBatch batch = null; - WALEntryStream entryStream = null; - try { - // we only loop back here if something fatal happened to our stream - while (isReaderRunning()) { - try { - entryStream = - new WALEntryStream(logQueue, conf, currentPosition, source.getWALFileLengthProvider(), - source.getServerWALsBelongTo(), source.getSourceMetrics(), walGroupId); - while (isReaderRunning()) { // loop here to keep reusing stream while we can - if (!source.isPeerEnabled()) { - Threads.sleep(sleepForRetries); - continue; - } - if (!checkQuota()) { - continue; - } - - batch = createBatch(entryStream); - batch = readWALEntries(entryStream, batch); + while (isReaderRunning()) { // we only loop back here if something fatal happened to our stream + WALEntryBatch batch = null; + try (WALEntryStream entryStream = + new WALEntryStream(logQueue, conf, currentPosition, + source.getWALFileLengthProvider(), source.getServerWALsBelongTo(), + source.getSourceMetrics(), walGroupId)) { + while (isReaderRunning()) { // loop here to keep reusing stream while we can + batch = null; + if (!source.isPeerEnabled()) { + Threads.sleep(sleepForRetries); + continue; + } + if (!checkQuota()) { + continue; + } + batch = tryAdvanceStreamAndCreateWALBatch(entryStream); + if (batch == null) { + // got no entries and didn't advance position in WAL + handleEmptyWALEntryBatch(); + entryStream.reset(); // reuse stream + continue; + } + // if we have already switched a file, skip reading and put it directly to the ship queue + if (!batch.isEndOfFile()) { + readWALEntries(entryStream, batch); currentPosition = entryStream.getPosition(); - if (batch == null) { - // either the queue have no WAL to read - // or got no new entries (didn't advance position in WAL) - handleEmptyWALEntryBatch(); - entryStream.reset(); // reuse stream - } else { - addBatchToShippingQueue(batch); - } } - } catch (WALEntryFilterRetryableException | IOException e) { // stream related - if (handleEofException(e, batch)) { - sleepMultiplier = 1; - } else { - LOG.warn("Failed to read stream of replication entries " - + "or replication filter is recovering", e); - if (sleepMultiplier < maxRetriesMultiplier) { - sleepMultiplier++; - } - Threads.sleep(sleepForRetries * sleepMultiplier); - } - } catch (InterruptedException e) { - LOG.trace("Interrupted while sleeping between WAL reads"); - Thread.currentThread().interrupt(); - } finally { - entryStream.close(); + // need to propagate the batch even it has no entries since it may carry the last + // sequence id information for serial replication. + LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries()); + entryBatchQueue.put(batch); + sleepMultiplier = 1; } + } catch (IOException e) { // stream related + if (!handleEofException(e, batch)) { + LOG.warn("Failed to read stream of replication entries", e); + if (sleepMultiplier < maxRetriesMultiplier) { + sleepMultiplier ++; + } + Threads.sleep(sleepForRetries * sleepMultiplier); + } + } catch (InterruptedException e) { + LOG.trace("Interrupted while sleeping between WAL reads or adding WAL batch to ship queue"); + Thread.currentThread().interrupt(); } - } catch (IOException e) { - if (sleepMultiplier < maxRetriesMultiplier) { - LOG.debug("Failed to read stream of replication entries: ", e); - sleepMultiplier++; - } else { - LOG.error("Failed to read stream of replication entries", e); - } - Threads.sleep(sleepForRetries * sleepMultiplier); - } catch (InterruptedException e) { - LOG.trace("Interrupted while sleeping between WAL reads"); - Thread.currentThread().interrupt(); } } @@ -211,29 +197,10 @@ class ReplicationSourceWALReader extends Thread { // We need to get the WALEntryBatch from the caller so we can add entries in there // This is required in case there is any exception in while reading entries - // we do want to loss the existing entries in the batch - protected WALEntryBatch readWALEntries(WALEntryStream entryStream, - WALEntryBatch batch) throws IOException, InterruptedException { + // we do not want to loss the existing entries in the batch + protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch) + throws IOException, InterruptedException { Path currentPath = entryStream.getCurrentPath(); - if (!entryStream.hasNext()) { - // check whether we have switched a file - if (currentPath != null && switched(entryStream, currentPath)) { - return WALEntryBatch.endOfFile(currentPath); - } else { - // This would mean either no more files in the queue - // or there is no new data yet on the current wal - return null; - } - } - if (currentPath != null) { - if (switched(entryStream, currentPath)) { - return WALEntryBatch.endOfFile(currentPath); - } - } else { - // when reading from the entry stream first time we will enter here - currentPath = entryStream.getCurrentPath(); - } - batch.setLastWalPath(currentPath); for (;;) { Entry entry = entryStream.next(); batch.setLastWalPosition(entryStream.getPosition()); @@ -253,7 +220,6 @@ class ReplicationSourceWALReader extends Thread { break; } } - return batch; } private void handleEmptyWALEntryBatch() throws InterruptedException { @@ -270,6 +236,25 @@ class ReplicationSourceWALReader extends Thread { } } + private WALEntryBatch tryAdvanceStreamAndCreateWALBatch(WALEntryStream entryStream) + throws IOException { + Path currentPath = entryStream.getCurrentPath(); + if (!entryStream.hasNext()) { + // check whether we have switched a file + if (currentPath != null && switched(entryStream, currentPath)) { + return WALEntryBatch.endOfFile(currentPath); + } else { + return null; + } + } + if (currentPath != null) { + if (switched(entryStream, currentPath)) { + return WALEntryBatch.endOfFile(currentPath); + } + } + return createBatch(entryStream); + } + /** * This is to handle the EOFException from the WAL entry stream. EOFException should * be handled carefully because there are chances of data loss because of never replicating @@ -277,19 +262,18 @@ class ReplicationSourceWALReader extends Thread { * If there was only one log in the queue before EOF, we ship the empty batch here * and since reader is still active, in the next iteration of reader we will * stop the reader. + *

* If there was more than one log in the queue before EOF, we ship the existing batch * and reset the wal patch and position to the log with EOF, so shipper can remove * logs from replication queue * @return true only the IOE can be handled */ - private boolean handleEofException(Exception e, WALEntryBatch batch) - throws InterruptedException { + private boolean handleEofException(Exception e, WALEntryBatch batch) { PriorityBlockingQueue queue = logQueue.getQueue(walGroupId); // Dump the log even if logQueue size is 1 if the source is from recovered Source // since we don't add current log to recovered source queue so it is safe to remove. - if ((e instanceof EOFException || e.getCause() instanceof EOFException) - && (source.isRecovered() || queue.size() > 1) - && this.eofAutoRecovery) { + if ((e instanceof EOFException || e.getCause() instanceof EOFException) && + (source.isRecovered() || queue.size() > 1) && this.eofAutoRecovery) { Path head = queue.peek(); try { if (fs.getFileStatus(head).getLen() == 0) { @@ -297,16 +281,18 @@ class ReplicationSourceWALReader extends Thread { LOG.warn("Forcing removal of 0 length log in queue: {}", head); logQueue.remove(walGroupId); currentPosition = 0; - // After we removed the WAL from the queue, we should - // try shipping the existing batch of entries and set the wal position - // and path to the wal just dequeued to correctly remove logs from the zk - batch.setLastWalPath(head); - batch.setLastWalPosition(currentPosition); - addBatchToShippingQueue(batch); + if (batch != null) { + // After we removed the WAL from the queue, we should try shipping the existing batch of + // entries + addBatchToShippingQueue(batch); + } return true; } } catch (IOException ioe) { LOG.warn("Couldn't get file length information about log " + queue.peek(), ioe); + } catch (InterruptedException ie) { + LOG.trace("Interrupted while adding WAL batch to ship queue"); + Thread.currentThread().interrupt(); } } return false; @@ -316,10 +302,8 @@ class ReplicationSourceWALReader extends Thread { * Update the batch try to ship and return true if shipped * @param batch Batch of entries to ship * @throws InterruptedException throws interrupted exception - * @throws IOException throws io exception from stream */ - private void addBatchToShippingQueue(WALEntryBatch batch) - throws InterruptedException, IOException { + private void addBatchToShippingQueue(WALEntryBatch batch) throws InterruptedException { // need to propagate the batch even it has no entries since it may carry the last // sequence id information for serial replication. LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries()); @@ -348,7 +332,7 @@ class ReplicationSourceWALReader extends Thread { return true; } - protected final WALEntryBatch createBatch(WALEntryStream entryStream) { + private WALEntryBatch createBatch(WALEntryStream entryStream) { return new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java index 254dc4afe22..1de4c998546 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationSourceWALReader.java @@ -50,27 +50,10 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader } @Override - protected WALEntryBatch readWALEntries(WALEntryStream entryStream, WALEntryBatch batch) - throws IOException, InterruptedException { + protected void readWALEntries(WALEntryStream entryStream, WALEntryBatch batch) + throws IOException, InterruptedException { Path currentPath = entryStream.getCurrentPath(); - if (!entryStream.hasNext()) { - // check whether we have switched a file - if (currentPath != null && switched(entryStream, currentPath)) { - return WALEntryBatch.endOfFile(currentPath); - } else { - return null; - } - } - if (currentPath != null) { - if (switched(entryStream, currentPath)) { - return WALEntryBatch.endOfFile(currentPath); - } - } else { - // when reading from the entry stream first time we will enter here - currentPath = entryStream.getCurrentPath(); - } long positionBefore = entryStream.getPosition(); - batch = createBatch(entryStream); for (;;) { Entry entry = entryStream.peek(); boolean doFiltering = true; @@ -122,7 +105,6 @@ public class SerialReplicationSourceWALReader extends ReplicationSourceWALReader break; } } - return batch; } private void removeEntryFromStream(WALEntryStream entryStream, WALEntryBatch batch) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java index 511161ce538..da7f98824cf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java @@ -47,14 +47,14 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; -@Category - ({ ReplicationTests.class, LargeTests.class }) public class TestReplicationEmptyWALRecovery - extends TestReplicationBase { +@Category({ ReplicationTests.class, LargeTests.class }) +public class TestReplicationEmptyWALRecovery extends TestReplicationBase { MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); NavigableMap scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); - @ClassRule public static final HBaseClassTestRule CLASS_RULE = + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class); @Before @@ -151,10 +151,9 @@ import org.junit.experimental.categories.Category; } /** - * Test empty WAL along with non empty WALs in the same batch. This test is to make sure - * when we see the empty and handle the EOF exception, we are able to existing the previous - * batch of entries without loosing it. This test also tests the number of batches shipped - * + * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we + * see the empty and handle the EOF exception, we are able to ship the previous batch of entries + * without loosing it. This test also tests the number of batches shipped * @throws Exception throws any exception */ @Test @@ -174,7 +173,6 @@ import org.junit.experimental.categories.Category; Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); appendEntriesToWal(numOfEntriesToReplicate, wal); - wal.rollWriter(); String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts); UTIL1.getTestFileSystem().create(emptyWalPath).close(); @@ -183,10 +181,10 @@ import org.junit.experimental.categories.Category; injectEmptyWAL(numRs, emptyWalPaths); // There should be three WALs in queue - // 1. empty WAL - // 2. non empty WAL + // 1. non empty WAL + // 2. empty WAL // 3. live WAL - //verifyNumberOfLogsInQueue(3, numRs); + verifyNumberOfLogsInQueue(3, numRs); hbaseAdmin.enableReplicationPeer(PEER_ID2); // ReplicationSource should advance past the empty wal, or else the test will fail waitForLogAdvance(numRs); @@ -209,10 +207,9 @@ import org.junit.experimental.categories.Category; } /** - * Test empty WAL along with non empty WALs in the same batch. This test is to make sure - * when we see the empty WAL and handle the EOF exception, we are able to proceed - * with next batch and replicate it properly without missing data. - * + * Test empty WAL along with non empty WALs in the same batch. This test is to make sure when we + * see the empty WAL and handle the EOF exception, we are able to proceed with next batch and + * replicate it properly without missing data. * @throws Exception throws any exception */ @Test @@ -265,9 +262,8 @@ import org.junit.experimental.categories.Category; } /** - * This test make sure we replicate all the enties from the non empty WALs which - * are surrounding the empty WALs - * + * This test make sure we replicate all the enties from the non empty WALs which are surrounding + * the empty WALs * @throws Exception throws exception */ @Test