HBASE-18137 Replication gets stuck for empty WALs
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
6860ddca9f
commit
650ef5cf59
|
@ -931,7 +931,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
}
|
||||
|
||||
public Path getCurrentPath() {
|
||||
return this.currentPath;
|
||||
return this.entryReader.getCurrentPath();
|
||||
}
|
||||
|
||||
public long getCurrentPosition() {
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
|
@ -188,6 +189,7 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
sleepMultiplier++;
|
||||
} else {
|
||||
LOG.error("Failed to read stream of replication entries", e);
|
||||
handleEofException(e);
|
||||
}
|
||||
Threads.sleep(sleepForRetries * sleepMultiplier);
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -197,6 +199,34 @@ public class ReplicationSourceWALReaderThread extends Thread {
|
|||
}
|
||||
}
|
||||
|
||||
// if we get an EOF due to a zero-length log, and there are other logs in queue
|
||||
// (highly likely we've closed the current log), we've hit the max retries, and autorecovery is
|
||||
// enabled, then dump the log
|
||||
private void handleEofException(Exception e) {
|
||||
if (e.getCause() instanceof EOFException && logQueue.size() > 1
|
||||
&& conf.getBoolean("replication.source.eof.autorecovery", false)) {
|
||||
try {
|
||||
if (fs.getFileStatus(logQueue.peek()).getLen() == 0) {
|
||||
LOG.warn("Forcing removal of 0 length log in queue: " + logQueue.peek());
|
||||
logQueue.remove();
|
||||
currentPosition = 0;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Couldn't get file length information about log " + logQueue.peek());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public Path getCurrentPath() {
|
||||
// if we've read some WAL entries, get the Path we read from
|
||||
WALEntryBatch batchQueueHead = entryBatchQueue.peek();
|
||||
if (batchQueueHead != null) {
|
||||
return batchQueueHead.lastWalPath;
|
||||
}
|
||||
// otherwise, we must be currently reading from the head of the log queue
|
||||
return logQueue.peek();
|
||||
}
|
||||
|
||||
//returns false if we've already exceeded the global quota
|
||||
private boolean checkQuota() {
|
||||
// try not to go over total quota
|
||||
|
|
|
@ -102,6 +102,7 @@ public class TestReplicationBase {
|
|||
conf1.setLong("replication.sleep.before.failover", 2000);
|
||||
conf1.setInt("replication.source.maxretriesmultiplier", 10);
|
||||
conf1.setFloat("replication.source.ratio", 1.0f);
|
||||
conf1.setBoolean("replication.source.eof.autorecovery", true);
|
||||
|
||||
utility1 = new HBaseTestingUtility(conf1);
|
||||
utility1.startMiniZKCluster();
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.List;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
|
@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
|
@ -56,10 +58,14 @@ import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WAL;
|
||||
import org.apache.hadoop.hbase.wal.WALKey;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.junit.Before;
|
||||
|
@ -809,4 +815,80 @@ public class TestReplicationSmallTests extends TestReplicationBase {
|
|||
tableName.getNameAsString()};
|
||||
runVerifyReplication(args, NB_ROWS_IN_BATCH *2, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyWALRecovery() throws Exception {
|
||||
final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size();
|
||||
|
||||
// for each RS, create an empty wal with same walGroupId
|
||||
final List<Path> emptyWalPaths = new ArrayList<>();
|
||||
long ts = System.currentTimeMillis();
|
||||
for (int i = 0; i < numRs; i++) {
|
||||
HRegionInfo regionInfo =
|
||||
utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
|
||||
WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
|
||||
Path currentWalPath = DefaultWALProvider.getCurrentFileName(wal);
|
||||
String walGroupId = DefaultWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
|
||||
Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts);
|
||||
utility1.getTestFileSystem().create(emptyWalPath).close();
|
||||
emptyWalPaths.add(emptyWalPath);
|
||||
}
|
||||
|
||||
// inject our empty wal into the replication queue
|
||||
for (int i = 0; i < numRs; i++) {
|
||||
Replication replicationService =
|
||||
(Replication) utility1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
|
||||
replicationService.preLogRoll(null, emptyWalPaths.get(i));
|
||||
replicationService.postLogRoll(null, emptyWalPaths.get(i));
|
||||
}
|
||||
|
||||
// wait for ReplicationSource to start reading from our empty wal
|
||||
waitForLogAdvance(numRs, emptyWalPaths, false);
|
||||
|
||||
// roll the original wal, which enqueues a new wal behind our empty wal
|
||||
for (int i = 0; i < numRs; i++) {
|
||||
HRegionInfo regionInfo =
|
||||
utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
|
||||
WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
|
||||
wal.rollWriter(true);
|
||||
}
|
||||
|
||||
// ReplicationSource should advance past the empty wal, or else the test will fail
|
||||
waitForLogAdvance(numRs, emptyWalPaths, true);
|
||||
|
||||
// we're now writing to the new wal
|
||||
// if everything works, the source should've stopped reading from the empty wal, and start
|
||||
// replicating from the new wal
|
||||
testSimplePutDelete();
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for the ReplicationSource to start reading from the given paths
|
||||
* @param numRs number of regionservers
|
||||
* @param emptyWalPaths path for each regionserver
|
||||
* @param invert if true, waits until ReplicationSource is NOT reading from the given paths
|
||||
*/
|
||||
private void waitForLogAdvance(final int numRs, final List<Path> emptyWalPaths,
|
||||
final boolean invert) throws Exception {
|
||||
Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
for (int i = 0; i < numRs; i++) {
|
||||
Replication replicationService = (Replication) utility1.getHBaseCluster()
|
||||
.getRegionServer(i).getReplicationSourceService();
|
||||
for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
|
||||
.getSources()) {
|
||||
ReplicationSource source = (ReplicationSource) rsi;
|
||||
if (!invert && !emptyWalPaths.get(i).equals(source.getCurrentPath())) {
|
||||
return false;
|
||||
}
|
||||
if (invert && emptyWalPaths.get(i).equals(source.getCurrentPath())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue