HBASE-19707 Race in start and terminate of a replication source after we async start replicatione endpoint

This commit is contained in:
zhangduo 2018-01-05 18:28:44 +08:00
parent 01f1fc8284
commit ec364d0d6c
3 changed files with 116 additions and 103 deletions

View File

@ -68,7 +68,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
} else {
LOG.debug("Starting up worker for wal group " + walGroupId);
worker.startup(getUncaughtExceptionHandler());
worker.startup(this::uncaughtException);
worker.setWALReader(
startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition()));
workerThreads.put(walGroupId, worker);
@ -76,13 +76,13 @@ public class RecoveredReplicationSource extends ReplicationSource {
}
@Override
protected ReplicationSourceWALReader startNewWALReader(String threadName,
String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) {
ReplicationSourceWALReader walReader = new RecoveredReplicationSourceWALReader(fs,
conf, queue, startPosition, walEntryFilter, this);
Threads.setDaemonThreadRunning(walReader, threadName
+ ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId,
getUncaughtExceptionHandler());
protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId,
PriorityBlockingQueue<Path> queue, long startPosition) {
ReplicationSourceWALReader walReader =
new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
Threads.setDaemonThreadRunning(walReader,
threadName + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId,
this::uncaughtException);
return walReader;
}

View File

@ -76,7 +76,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
* </p>
*/
@InterfaceAudience.Private
public class ReplicationSource extends Thread implements ReplicationSourceInterface {
public class ReplicationSource implements ReplicationSourceInterface {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
// Queues of logs to process, entry in format of walGroupId->queue,
@ -115,10 +115,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
private MetricsSource metrics;
// WARN threshold for the number of queued logs, defaults to 2
private int logQueueWarnThreshold;
// whether the replication endpoint has been initialized
private volatile boolean endpointInitialized = false;
// ReplicationEndpoint which will handle the actual replication
private ReplicationEndpoint replicationEndpoint;
private volatile ReplicationEndpoint replicationEndpoint;
// A filter (or a chain of filters) for the WAL entries.
protected WALEntryFilter walEntryFilter;
// throttler
@ -136,6 +134,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
private int waitOnEndpointSeconds = -1;
private Thread initThread;
/**
* Instantiation method used by region servers
* @param conf configuration to use
@ -197,7 +197,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
if (queue == null) {
queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
queues.put(logPrefix, queue);
if (this.isSourceActive() && this.endpointInitialized) {
if (this.isSourceActive() && this.replicationEndpoint != null) {
// new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that log enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker
@ -236,28 +236,36 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
}
private void initAndStartReplicationEndpoint() throws Exception {
private ReplicationEndpoint createReplicationEndpoint()
throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
RegionServerCoprocessorHost rsServerHost = null;
TableDescriptors tableDescriptors = null;
if (server instanceof HRegionServer) {
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
tableDescriptors = ((HRegionServer) server).getTableDescriptors();
}
String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl();
if (replicationEndpointImpl == null) {
// Default to HBase inter-cluster replication endpoint
replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
}
replicationEndpoint =
Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class).newInstance();
ReplicationEndpoint replicationEndpoint =
Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class).newInstance();
if (rsServerHost != null) {
ReplicationEndpoint newReplicationEndPoint =
rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
if (newReplicationEndPoint != null) {
// Override the newly created endpoint from the hook with configured end point
replicationEndpoint = newReplicationEndPoint;
}
}
return replicationEndpoint;
}
private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint)
throws IOException, TimeoutException {
TableDescriptors tableDescriptors = null;
if (server instanceof HRegionServer) {
tableDescriptors = ((HRegionServer) server).getTableDescriptors();
}
replicationEndpoint
.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId,
clusterId, replicationPeer, metrics, tableDescriptors, server));
@ -265,60 +273,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
}
@Override
public void run() {
// mark we are running now
this.sourceRunning = true;
int sleepMultiplier = 1;
while (this.isSourceActive()) {
try {
initAndStartReplicationEndpoint();
break;
} catch (Exception e) {
LOG.warn("Error starting ReplicationEndpoint, retrying", e);
if (replicationEndpoint != null) {
replicationEndpoint.stop();
replicationEndpoint = null;
}
if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
this.endpointInitialized = true;
sleepMultiplier = 1;
// delay this until we are in an asynchronous thread
while (this.isSourceActive() && this.peerClusterId == null) {
this.peerClusterId = replicationEndpoint.getPeerUUID();
if (this.isSourceActive() && this.peerClusterId == null) {
if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
// peerClusterId value, which is the same as the source clusterId
if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false);
this.manager.removeSource(this);
return;
}
LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
initializeWALEntryFilter();
// start workers
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
String walGroupId = entry.getKey();
PriorityBlockingQueue<Path> queue = entry.getValue();
tryStartNewShipper(walGroupId, queue);
}
}
private void initializeWALEntryFilter() {
// get the WALEntryFilter from ReplicationEndpoint and add it to default filters
ArrayList<WALEntryFilter> filters =
@ -332,37 +286,31 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
final ReplicationSourceShipper worker = new ReplicationSourceShipper(conf,
walGroupId, queue, this);
ReplicationSourceShipper worker = new ReplicationSourceShipper(conf, walGroupId, queue, this);
ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
} else {
LOG.debug("Starting up worker for wal group " + walGroupId);
worker.startup(getUncaughtExceptionHandler());
worker.setWALReader(startNewWALReader(worker.getName(), walGroupId, queue,
worker.getStartPosition()));
worker.startup(this::uncaughtException);
worker.setWALReader(
startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition()));
}
}
protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId,
PriorityBlockingQueue<Path> queue, long startPosition) {
ReplicationSourceWALReader walReader =
new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader,
threadName + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
getUncaughtExceptionHandler());
this::uncaughtException);
}
public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
return new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
RSRpcServices.exitIfOOME(e);
LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e);
server.stop("Unexpected exception in " + t.getName());
}
};
protected final void uncaughtException(Thread t, Throwable e) {
RSRpcServices.exitIfOOME(e);
LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e);
server.abort("Unexpected exception in " + t.getName(), e);
}
@Override
@ -435,17 +383,76 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
return replicationPeer.isPeerEnabled();
}
private void initialize() {
int sleepMultiplier = 1;
while (this.isSourceActive()) {
ReplicationEndpoint replicationEndpoint;
try {
replicationEndpoint = createReplicationEndpoint();
} catch (Exception e) {
LOG.warn("error creating ReplicationEndpoint, retry", e);
if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
try {
initAndStartReplicationEndpoint(replicationEndpoint);
this.replicationEndpoint = replicationEndpoint;
break;
} catch (Exception e) {
LOG.warn("Error starting ReplicationEndpoint, retry", e);
replicationEndpoint.stop();
if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
if (!this.isSourceActive()) {
return;
}
sleepMultiplier = 1;
// delay this until we are in an asynchronous thread
while (this.isSourceActive() && this.peerClusterId == null) {
this.peerClusterId = replicationEndpoint.getPeerUUID();
if (this.isSourceActive() && this.peerClusterId == null) {
if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
// peerClusterId value, which is the same as the source clusterId
if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false);
this.manager.removeSource(this);
return;
}
LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
initializeWALEntryFilter();
// start workers
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
String walGroupId = entry.getKey();
PriorityBlockingQueue<Path> queue = entry.getValue();
tryStartNewShipper(walGroupId, queue);
}
}
@Override
public void startup() {
String n = Thread.currentThread().getName();
Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
LOG.error("Unexpected exception in ReplicationSource", e);
}
};
Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.queueId,
handler);
// mark we are running now
this.sourceRunning = true;
initThread = new Thread(this::initialize);
Threads.setDaemonThreadRunning(initThread,
Thread.currentThread().getName() + ".replicationSource," + this.queueId,
this::uncaughtException);
}
@Override
@ -466,6 +473,13 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
cause);
}
this.sourceRunning = false;
if (initThread != null && Thread.currentThread() != initThread) {
// This usually won't happen but anyway, let's wait until the initialization thread exits.
// And notice that we may call terminate directly from the initThread so here we need to
// avoid join on ourselves.
initThread.interrupt();
Threads.shutdown(initThread, this.sleepForRetries);
}
Collection<ReplicationSourceShipper> workers = workerThreads.values();
for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
@ -482,11 +496,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
if (this.replicationEndpoint != null) {
try {
this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,
TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :"
+ this.queueId,
te);
LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" +
this.queueId, te);
}
}
}

View File

@ -33,7 +33,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;