HBASE-18137 Replication gets stuck for empty WALs

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Vincent 2017-06-09 18:47:14 -07:00 committed by Andrew Purtell
parent ea64dbef7f
commit 384e308e9f
4 changed files with 112 additions and 1 deletions

View File

@ -303,7 +303,7 @@ public class ReplicationSourceShipperThread extends Thread {
} }
public Path getCurrentPath() { public Path getCurrentPath() {
return this.currentPath; return this.entryReader.getCurrentPath();
} }
public long getCurrentPosition() { public long getCurrentPosition() {

View File

@ -18,6 +18,7 @@
*/ */
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
@ -189,6 +190,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
sleepMultiplier++; sleepMultiplier++;
} else { } else {
LOG.error("Failed to read stream of replication entries", e); LOG.error("Failed to read stream of replication entries", e);
handleEofException(e);
} }
Threads.sleep(sleepForRetries * sleepMultiplier); Threads.sleep(sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -198,6 +200,34 @@ public class ReplicationSourceWALReaderThread extends Thread {
} }
} }
// if we get an EOF due to a zero-length log, and there are other logs in queue
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
// enabled, then dump the log
private void handleEofException(Exception e) {
if (e.getCause() instanceof EOFException && logQueue.size() > 1
&& conf.getBoolean("replication.source.eof.autorecovery", false)) {
try {
if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
logQueue.remove();
currentPosition = 0;
}
} catch (IOException ioe) {
LOG.warn("Couldn't get file length information about log " + logQueue.peek());
}
}
}
public Path getCurrentPath() {
// if we've read some WAL entries, get the Path we read from
WALEntryBatch batchQueueHead = entryBatchQueue.peek();
if (batchQueueHead != null) {
return batchQueueHead.lastWalPath;
}
// otherwise, we must be currently reading from the head of the log queue
return logQueue.peek();
}
//returns false if we've already exceeded the global quota //returns false if we've already exceeded the global quota
private boolean checkQuota() { private boolean checkQuota() {
// try not to go over total quota // try not to go over total quota

View File

@ -104,6 +104,7 @@ public class TestReplicationBase {
conf1.setLong("replication.sleep.before.failover", 2000); conf1.setLong("replication.sleep.before.failover", 2000);
conf1.setInt("replication.source.maxretriesmultiplier", 10); conf1.setInt("replication.source.maxretriesmultiplier", 10);
conf1.setFloat("replication.source.ratio", 1.0f); conf1.setFloat("replication.source.ratio", 1.0f);
conf1.setBoolean("replication.source.eof.autorecovery", true);
utility1 = new HBaseTestingUtility(conf1); utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster(); utility1.startMiniZKCluster();

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
@ -57,6 +58,8 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
@ -65,6 +68,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
@ -977,4 +981,80 @@ public class TestReplicationSmallTests extends TestReplicationBase {
assertEquals(NB_ROWS_IN_BATCH, assertEquals(NB_ROWS_IN_BATCH,
job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue()); job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
} }
@Test
public void testEmptyWALRecovery() throws Exception {
final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size();
// for each RS, create an empty wal with same walGroupId
final List<Path> emptyWalPaths = new ArrayList<>();
long ts = System.currentTimeMillis();
for (int i = 0; i < numRs; i++) {
HRegionInfo regionInfo =
utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts);
utility1.getTestFileSystem().create(emptyWalPath).close();
emptyWalPaths.add(emptyWalPath);
}
// inject our empty wal into the replication queue
for (int i = 0; i < numRs; i++) {
Replication replicationService =
(Replication) utility1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
replicationService.preLogRoll(null, emptyWalPaths.get(i));
replicationService.postLogRoll(null, emptyWalPaths.get(i));
}
// wait for ReplicationSource to start reading from our empty wal
waitForLogAdvance(numRs, emptyWalPaths, false);
// roll the original wal, which enqueues a new wal behind our empty wal
for (int i = 0; i < numRs; i++) {
HRegionInfo regionInfo =
utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
wal.rollWriter(true);
}
// ReplicationSource should advance past the empty wal, or else the test will fail
waitForLogAdvance(numRs, emptyWalPaths, true);
// we're now writing to the new wal
// if everything works, the source should've stopped reading from the empty wal, and start
// replicating from the new wal
testSimplePutDelete();
}
/**
* Waits for the ReplicationSource to start reading from the given paths
* @param numRs number of regionservers
* @param emptyWalPaths path for each regionserver
* @param invert if true, waits until ReplicationSource is NOT reading from the given paths
*/
private void waitForLogAdvance(final int numRs, final List<Path> emptyWalPaths,
final boolean invert) throws Exception {
Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
for (int i = 0; i < numRs; i++) {
Replication replicationService = (Replication) utility1.getHBaseCluster()
.getRegionServer(i).getReplicationSourceService();
for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
.getSources()) {
ReplicationSource source = (ReplicationSource) rsi;
if (!invert && !emptyWalPaths.get(i).equals(source.getCurrentPath())) {
return false;
}
if (invert && emptyWalPaths.get(i).equals(source.getCurrentPath())) {
return false;
}
}
}
return true;
}
});
}
} }