HBASE-18192: Replication drops recovered queues on region server shutdown

Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
Ashu Pachauri 2017-06-09 14:36:45 -07:00 committed by tedyu
parent 961337aadc
commit 6e3da5a39a
2 changed files with 162 additions and 11 deletions

View File

@ -143,6 +143,13 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
private ConcurrentHashMap<String, ReplicationSourceShipperThread> workerThreads =
new ConcurrentHashMap<String, ReplicationSourceShipperThread>();
// Hold the state of a replication worker thread
public enum WorkerState {
RUNNING,
STOPPED,
FINISHED // The worker is done processing a recovered queue
}
private AtomicLong totalBufferUsed;
/**
@ -399,7 +406,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
this.sourceRunning = false;
Collection<ReplicationSourceShipperThread> workers = workerThreads.values();
for (ReplicationSourceShipperThread worker : workers) {
worker.setWorkerRunning(false);
worker.setWorkerState(WorkerState.STOPPED);
worker.entryReader.interrupt();
worker.interrupt();
}
@ -513,8 +520,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
private long lastLoggedPosition = -1;
// Path of the current log
private volatile Path currentPath;
// Indicates whether this particular worker is running
private boolean workerRunning = true;
// Current state of the worker thread
private WorkerState state;
ReplicationSourceWALReaderThread entryReader;
// Use guava cache to set ttl for each key
private LoadingCache<String, Boolean> canSkipWaitingSet = CacheBuilder.newBuilder()
@ -538,6 +545,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
@Override
public void run() {
setWorkerState(WorkerState.RUNNING);
// Loop until we close down
while (isWorkerActive()) {
int sleepMultiplier = 1;
@ -570,7 +578,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
+ peerClusterZnode);
metrics.incrCompletedRecoveryQueue();
setWorkerRunning(false);
setWorkerState(WorkerState.FINISHED);
continue;
}
} catch (InterruptedException e) {
@ -579,13 +587,13 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
}
if (replicationQueueInfo.isQueueRecovered()) {
if (replicationQueueInfo.isQueueRecovered() && getWorkerState() == WorkerState.FINISHED) {
// use synchronize to make sure one last thread will clean the queue
synchronized (workerThreads) {
Threads.sleep(100);// wait a short while for other worker thread to fully exit
boolean allOtherTaskDone = true;
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
if (!worker.equals(this) && worker.isAlive()) {
if (!worker.equals(this) && worker.getWorkerState() != WorkerState.FINISHED) {
allOtherTaskDone = false;
break;
}
@ -597,6 +605,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
}
}
// If the worker exits run loop without finishing it's task, mark it as stopped.
if (state != WorkerState.FINISHED) {
setWorkerState(WorkerState.STOPPED);
}
}
private void waitingUntilCanPush(Map.Entry<String, Long> entry) {
@ -927,7 +939,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
private boolean isWorkerActive() {
return !stopper.isStopped() && workerRunning && !isInterrupted();
return !stopper.isStopped() && state == WorkerState.RUNNING && !isInterrupted();
}
private void terminate(String reason, Exception cause) {
@ -940,14 +952,29 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
entryReader.interrupt();
Threads.shutdown(entryReader, sleepForRetries);
setWorkerState(WorkerState.STOPPED);
this.interrupt();
Threads.shutdown(this, sleepForRetries);
LOG.info("ReplicationSourceWorker " + this.getName() + " terminated");
}
public void setWorkerRunning(boolean workerRunning) {
entryReader.setReaderRunning(workerRunning);
this.workerRunning = workerRunning;
/**
* Set the worker state
* @param state
*/
public void setWorkerState(WorkerState state) {
this.state = state;
if (entryReader != null) {
entryReader.setReaderRunning(state == WorkerState.RUNNING);
}
}
/**
* Get the current state of this worker.
* @return WorkerState
*/
public WorkerState getWorkerState() {
return state;
}
private void releaseBufferQuota(int size) {

View File

@ -18,9 +18,11 @@
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -31,6 +33,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.Waiter.Predicate;
@ -38,6 +42,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.wal.WAL;
@ -49,6 +56,7 @@ import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplica
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -63,10 +71,12 @@ public class TestReplicationSource {
LogFactory.getLog(TestReplicationSource.class);
private final static HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
private final static HBaseTestingUtility TEST_UTIL_PEER =
new HBaseTestingUtility();
private static FileSystem FS;
private static Path oldLogDir;
private static Path logDir;
private static Configuration conf = HBaseConfiguration.create();
private static Configuration conf = TEST_UTIL.getConfiguration();
/**
* @throws java.lang.Exception
@ -82,6 +92,13 @@ public class TestReplicationSource {
if (FS.exists(logDir)) FS.delete(logDir, true);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL_PEER.shutdownMiniHBaseCluster();
TEST_UTIL.shutdownMiniHBaseCluster();
TEST_UTIL.shutdownMiniDFSCluster();
}
/**
* Sanity check that we can move logs around while we are reading
* from them. Should this test fail, ReplicationSource would have a hard
@ -172,5 +189,112 @@ public class TestReplicationSource {
}
/**
* Tests that recovered queues are preserved on a regionserver shutdown.
* See HBASE-18192
* @throws Exception
*/
@Test
public void testServerShutdownRecoveredQueue() throws Exception {
try {
// Ensure single-threaded WAL
conf.set("hbase.wal.provider", "defaultProvider");
conf.setInt("replication.sleep.before.failover", 2000);
// Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
TEST_UTIL_PEER.startMiniCluster(1);
HRegionServer serverA = cluster.getRegionServer(0);
final ReplicationSourceManager managerA =
((Replication) serverA.getReplicationSourceService()).getReplicationManager();
HRegionServer serverB = cluster.getRegionServer(1);
final ReplicationSourceManager managerB =
((Replication) serverB.getReplicationSourceService()).getReplicationManager();
final ReplicationAdmin replicationAdmin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
final String peerId = "TestPeer";
replicationAdmin.addPeer(peerId,
new ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey()), null);
// Wait for replication sources to come up
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
}
});
// Disabling peer makes sure there is at least one log to claim when the server dies
// The recovered queue will also stay there until the peer is disabled even if the
// WALs it contains have no data.
replicationAdmin.disablePeer(peerId);
// Stopping serverA
// It's queues should be claimed by the only other alive server i.e. serverB
cluster.stopRegionServer(serverA.getServerName());
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
return managerB.getOldSources().size() == 1;
}
});
final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
serverC.waitForServerOnline();
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
return serverC.getReplicationSourceService() != null;
}
});
final ReplicationSourceManager managerC =
((Replication) serverC.getReplicationSourceService()).getReplicationManager();
// Sanity check
assertEquals(0, managerC.getOldSources().size());
// Stopping serverB
// Now serverC should have two recovered queues:
// 1. The serverB's normal queue
// 2. serverA's recovered queue on serverB
cluster.stopRegionServer(serverB.getServerName());
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
return managerC.getOldSources().size() == 2;
}
});
replicationAdmin.enablePeer(peerId);
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
return managerC.getOldSources().size() == 0;
}
});
} finally {
conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
}
}
/**
* Regionserver implementation that adds a delay on the graceful shutdown.
*/
public static class ShutdownDelayRegionServer extends HRegionServer {
public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException {
super(conf);
}
public ShutdownDelayRegionServer(Configuration conf, CoordinatedStateManager csm)
throws IOException, InterruptedException {
super(conf, csm);
}
@Override
protected void stopServiceThreads() {
// Add a delay before service threads are shutdown.
// This will keep the zookeeper connection alive for the duration of the delay.
LOG.info("Adding a delay to the regionserver shutdown");
try {
Thread.sleep(2000);
} catch (InterruptedException ex) {
LOG.error("Interrupted while sleeping");
}
super.stopServiceThreads();
}
}
}