diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 6954ea2a4eb..8378b9bfe7e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -931,7 +931,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf } public Path getCurrentPath() { - return this.currentPath; + return this.entryReader.getCurrentPath(); } public long getCurrentPosition() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java index 6f1c64130b7..40828b725e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReaderThread.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -188,6 +189,7 @@ public class ReplicationSourceWALReaderThread extends Thread { sleepMultiplier++; } else { LOG.error("Failed to read stream of replication entries", e); + handleEofException(e); } Threads.sleep(sleepForRetries * sleepMultiplier); } catch (InterruptedException e) { @@ -197,6 +199,34 @@ public class ReplicationSourceWALReaderThread extends Thread { } } + // if we get an EOF due to a zero-length log, and there are other logs in queue + // (highly likely we've closed the current log), we've hit the max retries, and autorecovery is + // enabled, then dump the log + private void handleEofException(Exception e) { + if (e.getCause() instanceof EOFException && logQueue.size() > 1 + && conf.getBoolean("replication.source.eof.autorecovery", false)) { + try { + if (fs.getFileStatus(logQueue.peek()).getLen() == 0) { + LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek()); + logQueue.remove(); + currentPosition = 0; + } + } catch (IOException ioe) { + LOG.warn("Couldn't get file length information about log " + logQueue.peek()); + } + } + } + + public Path getCurrentPath() { + // if we've read some WAL entries, get the Path we read from + WALEntryBatch batchQueueHead = entryBatchQueue.peek(); + if (batchQueueHead != null) { + return batchQueueHead.lastWalPath; + } + // otherwise, we must be currently reading from the head of the log queue + return logQueue.peek(); + } + //returns false if we've already exceeded the global quota private boolean checkQuota() { // try not to go over total quota diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java index 498d26a92d5..d0f40a6bd5a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java @@ -102,6 +102,7 @@ public class TestReplicationBase { conf1.setLong("replication.sleep.before.failover", 2000); conf1.setInt("replication.source.maxretriesmultiplier", 10); conf1.setFloat("replication.source.ratio", 1.0f); + conf1.setBoolean("replication.source.eof.autorecovery", true); utility1 = new HBaseTestingUtility(conf1); utility1.startMiniZKCluster(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java index d56834c94f8..f94ad5a1b46 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java @@ -31,6 +31,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -56,10 +58,14 @@ import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; import org.apache.hadoop.hbase.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.replication.regionserver.Replication; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.wal.DefaultWALProvider; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.mapreduce.Job; import org.junit.Before; @@ -809,4 +815,80 @@ public class TestReplicationSmallTests extends TestReplicationBase { tableName.getNameAsString()}; runVerifyReplication(args, NB_ROWS_IN_BATCH *2, 0); } + + @Test + public void testEmptyWALRecovery() throws Exception { + final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size(); + + // for each RS, create an empty wal with same walGroupId + final List emptyWalPaths = new ArrayList<>(); + long ts = System.currentTimeMillis(); + for (int i = 0; i < numRs; i++) { + HRegionInfo regionInfo = + utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); + Path currentWalPath = DefaultWALProvider.getCurrentFileName(wal); + String walGroupId = DefaultWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); + Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts); + utility1.getTestFileSystem().create(emptyWalPath).close(); + emptyWalPaths.add(emptyWalPath); + } + + // inject our empty wal into the replication queue + for (int i = 0; i < numRs; i++) { + Replication replicationService = + (Replication) utility1.getHBaseCluster().getRegionServer(i).getReplicationSourceService(); + replicationService.preLogRoll(null, emptyWalPaths.get(i)); + replicationService.postLogRoll(null, emptyWalPaths.get(i)); + } + + // wait for ReplicationSource to start reading from our empty wal + waitForLogAdvance(numRs, emptyWalPaths, false); + + // roll the original wal, which enqueues a new wal behind our empty wal + for (int i = 0; i < numRs; i++) { + HRegionInfo regionInfo = + utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); + WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); + wal.rollWriter(true); + } + + // ReplicationSource should advance past the empty wal, or else the test will fail + waitForLogAdvance(numRs, emptyWalPaths, true); + + // we're now writing to the new wal + // if everything works, the source should've stopped reading from the empty wal, and start + // replicating from the new wal + testSimplePutDelete(); + } + + /** + * Waits for the ReplicationSource to start reading from the given paths + * @param numRs number of regionservers + * @param emptyWalPaths path for each regionserver + * @param invert if true, waits until ReplicationSource is NOT reading from the given paths + */ + private void waitForLogAdvance(final int numRs, final List emptyWalPaths, + final boolean invert) throws Exception { + Waiter.waitFor(conf1, 10000, new Waiter.Predicate() { + @Override + public boolean evaluate() throws Exception { + for (int i = 0; i < numRs; i++) { + Replication replicationService = (Replication) utility1.getHBaseCluster() + .getRegionServer(i).getReplicationSourceService(); + for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() + .getSources()) { + ReplicationSource source = (ReplicationSource) rsi; + if (!invert && !emptyWalPaths.get(i).equals(source.getCurrentPath())) { + return false; + } + if (invert && emptyWalPaths.get(i).equals(source.getCurrentPath())) { + return false; + } + } + } + return true; + } + }); + } }