HBASE-19707 Race in start and terminate of a replication source after we async start replicatione endpoint

This commit is contained in:
zhangduo 2018-01-05 18:28:44 +08:00
parent d36aacdf9e
commit 0165455d34
2 changed files with 116 additions and 103 deletions

View File

@ -68,7 +68,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
} else { } else {
LOG.debug("Starting up worker for wal group " + walGroupId); LOG.debug("Starting up worker for wal group " + walGroupId);
worker.startup(getUncaughtExceptionHandler()); worker.startup(this::uncaughtException);
worker.setWALReader( worker.setWALReader(
startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition())); startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition()));
workerThreads.put(walGroupId, worker); workerThreads.put(walGroupId, worker);
@ -76,13 +76,13 @@ public class RecoveredReplicationSource extends ReplicationSource {
} }
@Override @Override
protected ReplicationSourceWALReader startNewWALReader(String threadName, protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId,
String walGroupId, PriorityBlockingQueue<Path> queue, long startPosition) { PriorityBlockingQueue<Path> queue, long startPosition) {
ReplicationSourceWALReader walReader = new RecoveredReplicationSourceWALReader(fs, ReplicationSourceWALReader walReader =
conf, queue, startPosition, walEntryFilter, this); new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
Threads.setDaemonThreadRunning(walReader, threadName Threads.setDaemonThreadRunning(walReader,
+ ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId, threadName + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId,
getUncaughtExceptionHandler()); this::uncaughtException);
return walReader; return walReader;
} }

View File

@ -75,7 +75,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
* </p> * </p>
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ReplicationSource extends Thread implements ReplicationSourceInterface { public class ReplicationSource implements ReplicationSourceInterface {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class); private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
// Queues of logs to process, entry in format of walGroupId->queue, // Queues of logs to process, entry in format of walGroupId->queue,
@ -114,10 +114,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
private MetricsSource metrics; private MetricsSource metrics;
// WARN threshold for the number of queued logs, defaults to 2 // WARN threshold for the number of queued logs, defaults to 2
private int logQueueWarnThreshold; private int logQueueWarnThreshold;
// whether the replication endpoint has been initialized
private volatile boolean endpointInitialized = false;
// ReplicationEndpoint which will handle the actual replication // ReplicationEndpoint which will handle the actual replication
private ReplicationEndpoint replicationEndpoint; private volatile ReplicationEndpoint replicationEndpoint;
// A filter (or a chain of filters) for the WAL entries. // A filter (or a chain of filters) for the WAL entries.
protected WALEntryFilter walEntryFilter; protected WALEntryFilter walEntryFilter;
// throttler // throttler
@ -135,6 +133,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30; public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
private int waitOnEndpointSeconds = -1; private int waitOnEndpointSeconds = -1;
private Thread initThread;
/** /**
* Instantiation method used by region servers * Instantiation method used by region servers
* @param conf configuration to use * @param conf configuration to use
@ -196,7 +196,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
if (queue == null) { if (queue == null) {
queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator()); queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
queues.put(logPrefix, queue); queues.put(logPrefix, queue);
if (this.isSourceActive() && this.endpointInitialized) { if (this.isSourceActive() && this.replicationEndpoint != null) {
// new wal group observed after source startup, start a new worker thread to track it // new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that log enqueued when this.running is set but worker thread // notice: it's possible that log enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker // still not launched, so it's necessary to check workerThreads before start the worker
@ -235,19 +235,18 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
} }
} }
private void initAndStartReplicationEndpoint() throws Exception { private ReplicationEndpoint createReplicationEndpoint()
throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
RegionServerCoprocessorHost rsServerHost = null; RegionServerCoprocessorHost rsServerHost = null;
TableDescriptors tableDescriptors = null;
if (server instanceof HRegionServer) { if (server instanceof HRegionServer) {
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost(); rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
tableDescriptors = ((HRegionServer) server).getTableDescriptors();
} }
String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl(); String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl();
if (replicationEndpointImpl == null) { if (replicationEndpointImpl == null) {
// Default to HBase inter-cluster replication endpoint // Default to HBase inter-cluster replication endpoint
replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName(); replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
} }
replicationEndpoint = ReplicationEndpoint replicationEndpoint =
Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class).newInstance(); Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class).newInstance();
if (rsServerHost != null) { if (rsServerHost != null) {
ReplicationEndpoint newReplicationEndPoint = ReplicationEndpoint newReplicationEndPoint =
@ -257,6 +256,15 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
replicationEndpoint = newReplicationEndPoint; replicationEndpoint = newReplicationEndPoint;
} }
} }
return replicationEndpoint;
}
private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint)
throws IOException, TimeoutException {
TableDescriptors tableDescriptors = null;
if (server instanceof HRegionServer) {
tableDescriptors = ((HRegionServer) server).getTableDescriptors();
}
replicationEndpoint replicationEndpoint
.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId, .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId,
clusterId, replicationPeer, metrics, tableDescriptors, server)); clusterId, replicationPeer, metrics, tableDescriptors, server));
@ -264,60 +272,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS); replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
} }
@Override
public void run() {
// mark we are running now
this.sourceRunning = true;
int sleepMultiplier = 1;
while (this.isSourceActive()) {
try {
initAndStartReplicationEndpoint();
break;
} catch (Exception e) {
LOG.warn("Error starting ReplicationEndpoint, retrying", e);
if (replicationEndpoint != null) {
replicationEndpoint.stop();
replicationEndpoint = null;
}
if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
this.endpointInitialized = true;
sleepMultiplier = 1;
// delay this until we are in an asynchronous thread
while (this.isSourceActive() && this.peerClusterId == null) {
this.peerClusterId = replicationEndpoint.getPeerUUID();
if (this.isSourceActive() && this.peerClusterId == null) {
if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
// peerClusterId value, which is the same as the source clusterId
if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false);
this.manager.removeSource(this);
return;
}
LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
initializeWALEntryFilter();
// start workers
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
String walGroupId = entry.getKey();
PriorityBlockingQueue<Path> queue = entry.getValue();
tryStartNewShipper(walGroupId, queue);
}
}
private void initializeWALEntryFilter() { private void initializeWALEntryFilter() {
// get the WALEntryFilter from ReplicationEndpoint and add it to default filters // get the WALEntryFilter from ReplicationEndpoint and add it to default filters
ArrayList<WALEntryFilter> filters = ArrayList<WALEntryFilter> filters =
@ -331,16 +285,15 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
} }
protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) { protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
final ReplicationSourceShipper worker = new ReplicationSourceShipper(conf, ReplicationSourceShipper worker = new ReplicationSourceShipper(conf, walGroupId, queue, this);
walGroupId, queue, this);
ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker); ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) { if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId); LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
} else { } else {
LOG.debug("Starting up worker for wal group " + walGroupId); LOG.debug("Starting up worker for wal group " + walGroupId);
worker.startup(getUncaughtExceptionHandler()); worker.startup(this::uncaughtException);
worker.setWALReader(startNewWALReader(worker.getName(), walGroupId, queue, worker.setWALReader(
worker.getStartPosition())); startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition()));
} }
} }
@ -350,18 +303,13 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this); new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader, return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader,
threadName + ".replicationSource.wal-reader." + walGroupId + "," + queueId, threadName + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
getUncaughtExceptionHandler()); this::uncaughtException);
} }
public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() { protected final void uncaughtException(Thread t, Throwable e) {
return new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(final Thread t, final Throwable e) {
RSRpcServices.exitIfOOME(e); RSRpcServices.exitIfOOME(e);
LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e); LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e);
server.stop("Unexpected exception in " + t.getName()); server.abort("Unexpected exception in " + t.getName(), e);
}
};
} }
@Override @Override
@ -434,17 +382,76 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
return replicationPeer.isPeerEnabled(); return replicationPeer.isPeerEnabled();
} }
private void initialize() {
int sleepMultiplier = 1;
while (this.isSourceActive()) {
ReplicationEndpoint replicationEndpoint;
try {
replicationEndpoint = createReplicationEndpoint();
} catch (Exception e) {
LOG.warn("error creating ReplicationEndpoint, retry", e);
if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
try {
initAndStartReplicationEndpoint(replicationEndpoint);
this.replicationEndpoint = replicationEndpoint;
break;
} catch (Exception e) {
LOG.warn("Error starting ReplicationEndpoint, retry", e);
replicationEndpoint.stop();
if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
if (!this.isSourceActive()) {
return;
}
sleepMultiplier = 1;
// delay this until we are in an asynchronous thread
while (this.isSourceActive() && this.peerClusterId == null) {
this.peerClusterId = replicationEndpoint.getPeerUUID();
if (this.isSourceActive() && this.peerClusterId == null) {
if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
sleepMultiplier++;
}
}
}
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
// peerClusterId value, which is the same as the source clusterId
if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false);
this.manager.removeSource(this);
return;
}
LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
initializeWALEntryFilter();
// start workers
for (Map.Entry<String, PriorityBlockingQueue<Path>> entry : queues.entrySet()) {
String walGroupId = entry.getKey();
PriorityBlockingQueue<Path> queue = entry.getValue();
tryStartNewShipper(walGroupId, queue);
}
}
@Override @Override
public void startup() { public void startup() {
String n = Thread.currentThread().getName(); // mark we are running now
Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() { this.sourceRunning = true;
@Override initThread = new Thread(this::initialize);
public void uncaughtException(final Thread t, final Throwable e) { Threads.setDaemonThreadRunning(initThread,
LOG.error("Unexpected exception in ReplicationSource", e); Thread.currentThread().getName() + ".replicationSource," + this.queueId,
} this::uncaughtException);
};
Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.queueId,
handler);
} }
@Override @Override
@ -465,6 +472,13 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
cause); cause);
} }
this.sourceRunning = false; this.sourceRunning = false;
if (initThread != null && Thread.currentThread() != initThread) {
// This usually won't happen but anyway, let's wait until the initialization thread exits.
// And notice that we may call terminate directly from the initThread so here we need to
// avoid join on ourselves.
initThread.interrupt();
Threads.shutdown(initThread, this.sleepForRetries);
}
Collection<ReplicationSourceShipper> workers = workerThreads.values(); Collection<ReplicationSourceShipper> workers = workerThreads.values();
for (ReplicationSourceShipper worker : workers) { for (ReplicationSourceShipper worker : workers) {
worker.stopWorker(); worker.stopWorker();
@ -481,12 +495,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
} }
if (this.replicationEndpoint != null) { if (this.replicationEndpoint != null) {
try { try {
this.replicationEndpoint this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,
.awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
} catch (TimeoutException te) { } catch (TimeoutException te) {
LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" +
+ this.queueId, this.queueId, te);
te);
} }
} }
} }