HBASE-18192: Replication drops recovered queues on region server shutdown
Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
parent
e5ea457054
commit
eb2dc5d2a5
|
@ -160,14 +160,14 @@ public class RecoveredReplicationSource extends ReplicationSource {
|
||||||
// use synchronize to make sure one last thread will clean the queue
|
// use synchronize to make sure one last thread will clean the queue
|
||||||
synchronized (workerThreads) {
|
synchronized (workerThreads) {
|
||||||
Threads.sleep(100);// wait a short while for other worker thread to fully exit
|
Threads.sleep(100);// wait a short while for other worker thread to fully exit
|
||||||
boolean allOtherTaskDone = true;
|
boolean allTasksDone = true;
|
||||||
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
|
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
|
||||||
if (worker.isActive()) {
|
if (!worker.isFinished()) {
|
||||||
allOtherTaskDone = false;
|
allTasksDone = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (allOtherTaskDone) {
|
if (allTasksDone) {
|
||||||
manager.closeRecoveredQueue(this);
|
manager.closeRecoveredQueue(this);
|
||||||
LOG.info("Finished recovering queue " + peerClusterZnode + " with the following stats: "
|
LOG.info("Finished recovering queue " + peerClusterZnode + " with the following stats: "
|
||||||
+ getStats());
|
+ getStats());
|
||||||
|
|
|
@ -51,6 +51,7 @@ public class RecoveredReplicationSourceShipperThread extends ReplicationSourceSh
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
setWorkerState(WorkerState.RUNNING);
|
||||||
// Loop until we close down
|
// Loop until we close down
|
||||||
while (isActive()) {
|
while (isActive()) {
|
||||||
int sleepMultiplier = 1;
|
int sleepMultiplier = 1;
|
||||||
|
@ -77,7 +78,7 @@ public class RecoveredReplicationSourceShipperThread extends ReplicationSourceSh
|
||||||
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
|
LOG.debug("Finished recovering queue for group " + walGroupId + " of peer "
|
||||||
+ source.getPeerClusterZnode());
|
+ source.getPeerClusterZnode());
|
||||||
source.getSourceMetrics().incrCompletedRecoveryQueue();
|
source.getSourceMetrics().incrCompletedRecoveryQueue();
|
||||||
setWorkerRunning(false);
|
setWorkerState(WorkerState.FINISHED);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -85,8 +86,11 @@ public class RecoveredReplicationSourceShipperThread extends ReplicationSourceSh
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
source.tryFinish();
|
source.tryFinish();
|
||||||
|
// If the worker exits run loop without finishing its task, mark it as stopped.
|
||||||
|
if (!isFinished()) {
|
||||||
|
setWorkerState(WorkerState.STOPPED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -57,13 +57,13 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||||
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
|
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
|
||||||
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
||||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class that handles the source of a replication stream.
|
* Class that handles the source of a replication stream.
|
||||||
* Currently does not handle more than 1 slave
|
* Currently does not handle more than 1 slave
|
||||||
|
@ -448,7 +448,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
||||||
this.sourceRunning = false;
|
this.sourceRunning = false;
|
||||||
Collection<ReplicationSourceShipperThread> workers = workerThreads.values();
|
Collection<ReplicationSourceShipperThread> workers = workerThreads.values();
|
||||||
for (ReplicationSourceShipperThread worker : workers) {
|
for (ReplicationSourceShipperThread worker : workers) {
|
||||||
worker.setWorkerRunning(false);
|
worker.stopWorker();
|
||||||
worker.entryReader.interrupt();
|
worker.entryReader.interrupt();
|
||||||
worker.interrupt();
|
worker.interrupt();
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,6 +54,13 @@ import com.google.common.cache.LoadingCache;
|
||||||
public class ReplicationSourceShipperThread extends Thread {
|
public class ReplicationSourceShipperThread extends Thread {
|
||||||
private static final Log LOG = LogFactory.getLog(ReplicationSourceShipperThread.class);
|
private static final Log LOG = LogFactory.getLog(ReplicationSourceShipperThread.class);
|
||||||
|
|
||||||
|
// Hold the state of a replication worker thread
|
||||||
|
public enum WorkerState {
|
||||||
|
RUNNING,
|
||||||
|
STOPPED,
|
||||||
|
FINISHED, // The worker is done processing a recovered queue
|
||||||
|
}
|
||||||
|
|
||||||
protected final Configuration conf;
|
protected final Configuration conf;
|
||||||
protected final String walGroupId;
|
protected final String walGroupId;
|
||||||
protected final PriorityBlockingQueue<Path> queue;
|
protected final PriorityBlockingQueue<Path> queue;
|
||||||
|
@ -63,8 +70,8 @@ public class ReplicationSourceShipperThread extends Thread {
|
||||||
protected long lastLoggedPosition = -1;
|
protected long lastLoggedPosition = -1;
|
||||||
// Path of the current log
|
// Path of the current log
|
||||||
protected volatile Path currentPath;
|
protected volatile Path currentPath;
|
||||||
// Indicates whether this particular worker is running
|
// Current state of the worker thread
|
||||||
private boolean workerRunning = true;
|
private WorkerState state;
|
||||||
protected ReplicationSourceWALReaderThread entryReader;
|
protected ReplicationSourceWALReaderThread entryReader;
|
||||||
|
|
||||||
// How long should we sleep for each retry
|
// How long should we sleep for each retry
|
||||||
|
@ -97,6 +104,7 @@ public class ReplicationSourceShipperThread extends Thread {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
setWorkerState(WorkerState.RUNNING);
|
||||||
// Loop until we close down
|
// Loop until we close down
|
||||||
while (isActive()) {
|
while (isActive()) {
|
||||||
int sleepMultiplier = 1;
|
int sleepMultiplier = 1;
|
||||||
|
@ -126,6 +134,10 @@ public class ReplicationSourceShipperThread extends Thread {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// If the worker exits run loop without finishing its task, mark it as stopped.
|
||||||
|
if (state != WorkerState.FINISHED) {
|
||||||
|
setWorkerState(WorkerState.STOPPED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -307,12 +319,23 @@ public class ReplicationSourceShipperThread extends Thread {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean isActive() {
|
protected boolean isActive() {
|
||||||
return source.isSourceActive() && workerRunning && !isInterrupted();
|
return source.isSourceActive() && state == WorkerState.RUNNING && !isInterrupted();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setWorkerRunning(boolean workerRunning) {
|
public void setWorkerState(WorkerState state) {
|
||||||
entryReader.setReaderRunning(workerRunning);
|
this.state = state;
|
||||||
this.workerRunning = workerRunning;
|
}
|
||||||
|
|
||||||
|
public WorkerState getWorkerState() {
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void stopWorker() {
|
||||||
|
setWorkerState(WorkerState.STOPPED);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isFinished() {
|
||||||
|
return state == WorkerState.FINISHED;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -18,9 +18,11 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.replication;
|
package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
@ -31,6 +33,8 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.CoordinatedStateManager;
|
||||||
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.Waiter;
|
import org.apache.hadoop.hbase.Waiter;
|
||||||
import org.apache.hadoop.hbase.Waiter.Predicate;
|
import org.apache.hadoop.hbase.Waiter.Predicate;
|
||||||
|
@ -38,6 +42,9 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.replication.regionserver.Replication;
|
||||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
|
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
import org.apache.hadoop.hbase.wal.WALProvider;
|
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||||
|
@ -49,12 +56,12 @@ import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
|
|
||||||
@Category({ReplicationTests.class, MediumTests.class})
|
@Category({ReplicationTests.class, MediumTests.class})
|
||||||
public class TestReplicationSource {
|
public class TestReplicationSource {
|
||||||
|
@ -63,10 +70,12 @@ public class TestReplicationSource {
|
||||||
LogFactory.getLog(TestReplicationSource.class);
|
LogFactory.getLog(TestReplicationSource.class);
|
||||||
private final static HBaseTestingUtility TEST_UTIL =
|
private final static HBaseTestingUtility TEST_UTIL =
|
||||||
new HBaseTestingUtility();
|
new HBaseTestingUtility();
|
||||||
|
private final static HBaseTestingUtility TEST_UTIL_PEER =
|
||||||
|
new HBaseTestingUtility();
|
||||||
private static FileSystem FS;
|
private static FileSystem FS;
|
||||||
private static Path oldLogDir;
|
private static Path oldLogDir;
|
||||||
private static Path logDir;
|
private static Path logDir;
|
||||||
private static Configuration conf = HBaseConfiguration.create();
|
private static Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @throws java.lang.Exception
|
* @throws java.lang.Exception
|
||||||
|
@ -82,6 +91,13 @@ public class TestReplicationSource {
|
||||||
if (FS.exists(logDir)) FS.delete(logDir, true);
|
if (FS.exists(logDir)) FS.delete(logDir, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
TEST_UTIL_PEER.shutdownMiniHBaseCluster();
|
||||||
|
TEST_UTIL.shutdownMiniHBaseCluster();
|
||||||
|
TEST_UTIL.shutdownMiniDFSCluster();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sanity check that we can move logs around while we are reading
|
* Sanity check that we can move logs around while we are reading
|
||||||
* from them. Should this test fail, ReplicationSource would have a hard
|
* from them. Should this test fail, ReplicationSource would have a hard
|
||||||
|
@ -172,5 +188,112 @@ public class TestReplicationSource {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that recovered queues are preserved on a regionserver shutdown.
|
||||||
|
* See HBASE-18192
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testServerShutdownRecoveredQueue() throws Exception {
|
||||||
|
try {
|
||||||
|
// Ensure single-threaded WAL
|
||||||
|
conf.set("hbase.wal.provider", "defaultProvider");
|
||||||
|
conf.setInt("replication.sleep.before.failover", 2000);
|
||||||
|
// Introduces a delay in regionserver shutdown to give the race condition a chance to kick in.
|
||||||
|
conf.set(HConstants.REGION_SERVER_IMPL, ShutdownDelayRegionServer.class.getName());
|
||||||
|
MiniHBaseCluster cluster = TEST_UTIL.startMiniCluster(2);
|
||||||
|
TEST_UTIL_PEER.startMiniCluster(1);
|
||||||
|
|
||||||
|
HRegionServer serverA = cluster.getRegionServer(0);
|
||||||
|
final ReplicationSourceManager managerA =
|
||||||
|
((Replication) serverA.getReplicationSourceService()).getReplicationManager();
|
||||||
|
HRegionServer serverB = cluster.getRegionServer(1);
|
||||||
|
final ReplicationSourceManager managerB =
|
||||||
|
((Replication) serverB.getReplicationSourceService()).getReplicationManager();
|
||||||
|
final Admin admin = TEST_UTIL.getAdmin();
|
||||||
|
|
||||||
|
final String peerId = "TestPeer";
|
||||||
|
admin.addReplicationPeer(peerId,
|
||||||
|
new ReplicationPeerConfig().setClusterKey(TEST_UTIL_PEER.getClusterKey()));
|
||||||
|
// Wait for replication sources to come up
|
||||||
|
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||||
|
@Override public boolean evaluate() throws Exception {
|
||||||
|
return !(managerA.getSources().isEmpty() || managerB.getSources().isEmpty());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// Disabling peer makes sure there is at least one log to claim when the server dies
|
||||||
|
// The recovered queue will also stay there until the peer is disabled even if the
|
||||||
|
// WALs it contains have no data.
|
||||||
|
admin.disableReplicationPeer(peerId);
|
||||||
|
|
||||||
|
// Stopping serverA
|
||||||
|
// It's queues should be claimed by the only other alive server i.e. serverB
|
||||||
|
cluster.stopRegionServer(serverA.getServerName());
|
||||||
|
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||||
|
@Override public boolean evaluate() throws Exception {
|
||||||
|
return managerB.getOldSources().size() == 1;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
final HRegionServer serverC = cluster.startRegionServer().getRegionServer();
|
||||||
|
serverC.waitForServerOnline();
|
||||||
|
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||||
|
@Override public boolean evaluate() throws Exception {
|
||||||
|
return serverC.getReplicationSourceService() != null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
final ReplicationSourceManager managerC =
|
||||||
|
((Replication) serverC.getReplicationSourceService()).getReplicationManager();
|
||||||
|
// Sanity check
|
||||||
|
assertEquals(0, managerC.getOldSources().size());
|
||||||
|
|
||||||
|
// Stopping serverB
|
||||||
|
// Now serverC should have two recovered queues:
|
||||||
|
// 1. The serverB's normal queue
|
||||||
|
// 2. serverA's recovered queue on serverB
|
||||||
|
cluster.stopRegionServer(serverB.getServerName());
|
||||||
|
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||||
|
@Override public boolean evaluate() throws Exception {
|
||||||
|
return managerC.getOldSources().size() == 2;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
admin.enableReplicationPeer(peerId);
|
||||||
|
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||||
|
@Override public boolean evaluate() throws Exception {
|
||||||
|
return managerC.getOldSources().size() == 0;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} finally {
|
||||||
|
conf.set(HConstants.REGION_SERVER_IMPL, HRegionServer.class.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Regionserver implementation that adds a delay on the graceful shutdown.
|
||||||
|
*/
|
||||||
|
public static class ShutdownDelayRegionServer extends HRegionServer {
|
||||||
|
public ShutdownDelayRegionServer(Configuration conf) throws IOException, InterruptedException {
|
||||||
|
super(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
public ShutdownDelayRegionServer(Configuration conf, CoordinatedStateManager csm)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
super(conf, csm);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void stopServiceThreads() {
|
||||||
|
// Add a delay before service threads are shutdown.
|
||||||
|
// This will keep the zookeeper connection alive for the duration of the delay.
|
||||||
|
LOG.info("Adding a delay to the regionserver shutdown");
|
||||||
|
try {
|
||||||
|
Thread.sleep(2000);
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
LOG.error("Interrupted while sleeping");
|
||||||
|
}
|
||||||
|
super.stopServiceThreads();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue