diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
index 1be9a88f456..3cae0f2d1f9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.java
@@ -68,7 +68,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
} else {
LOG.debug("Starting up worker for wal group " + walGroupId);
- worker.startup(getUncaughtExceptionHandler());
+ worker.startup(this::uncaughtException);
worker.setWALReader(
startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition()));
workerThreads.put(walGroupId, worker);
@@ -76,13 +76,13 @@ public class RecoveredReplicationSource extends ReplicationSource {
}
@Override
- protected ReplicationSourceWALReader startNewWALReader(String threadName,
- String walGroupId, PriorityBlockingQueue queue, long startPosition) {
- ReplicationSourceWALReader walReader = new RecoveredReplicationSourceWALReader(fs,
- conf, queue, startPosition, walEntryFilter, this);
- Threads.setDaemonThreadRunning(walReader, threadName
- + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId,
- getUncaughtExceptionHandler());
+ protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId,
+ PriorityBlockingQueue queue, long startPosition) {
+ ReplicationSourceWALReader walReader =
+ new RecoveredReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
+ Threads.setDaemonThreadRunning(walReader,
+ threadName + ".replicationSource.replicationWALReaderThread." + walGroupId + "," + queueId,
+ this::uncaughtException);
return walReader;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 6b622eeb4cc..923d893dbbb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -76,7 +76,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
*
*/
@InterfaceAudience.Private
-public class ReplicationSource extends Thread implements ReplicationSourceInterface {
+public class ReplicationSource implements ReplicationSourceInterface {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSource.class);
// Queues of logs to process, entry in format of walGroupId->queue,
@@ -115,10 +115,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
private MetricsSource metrics;
// WARN threshold for the number of queued logs, defaults to 2
private int logQueueWarnThreshold;
- // whether the replication endpoint has been initialized
- private volatile boolean endpointInitialized = false;
// ReplicationEndpoint which will handle the actual replication
- private ReplicationEndpoint replicationEndpoint;
+ private volatile ReplicationEndpoint replicationEndpoint;
// A filter (or a chain of filters) for the WAL entries.
protected WALEntryFilter walEntryFilter;
// throttler
@@ -136,6 +134,8 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
public static final int DEFAULT_WAIT_ON_ENDPOINT_SECONDS = 30;
private int waitOnEndpointSeconds = -1;
+ private Thread initThread;
+
/**
* Instantiation method used by region servers
* @param conf configuration to use
@@ -197,7 +197,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
if (queue == null) {
queue = new PriorityBlockingQueue<>(queueSizePerGroup, new LogsComparator());
queues.put(logPrefix, queue);
- if (this.isSourceActive() && this.endpointInitialized) {
+ if (this.isSourceActive() && this.replicationEndpoint != null) {
// new wal group observed after source startup, start a new worker thread to track it
// notice: it's possible that log enqueued when this.running is set but worker thread
// still not launched, so it's necessary to check workerThreads before start the worker
@@ -236,28 +236,36 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
}
- private void initAndStartReplicationEndpoint() throws Exception {
+ private ReplicationEndpoint createReplicationEndpoint()
+ throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
RegionServerCoprocessorHost rsServerHost = null;
- TableDescriptors tableDescriptors = null;
if (server instanceof HRegionServer) {
rsServerHost = ((HRegionServer) server).getRegionServerCoprocessorHost();
- tableDescriptors = ((HRegionServer) server).getTableDescriptors();
}
String replicationEndpointImpl = replicationPeer.getPeerConfig().getReplicationEndpointImpl();
if (replicationEndpointImpl == null) {
// Default to HBase inter-cluster replication endpoint
replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
}
- replicationEndpoint =
- Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class).newInstance();
+ ReplicationEndpoint replicationEndpoint =
+ Class.forName(replicationEndpointImpl).asSubclass(ReplicationEndpoint.class).newInstance();
if (rsServerHost != null) {
ReplicationEndpoint newReplicationEndPoint =
- rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
+ rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
if (newReplicationEndPoint != null) {
// Override the newly created endpoint from the hook with configured end point
replicationEndpoint = newReplicationEndPoint;
}
}
+ return replicationEndpoint;
+ }
+
+ private void initAndStartReplicationEndpoint(ReplicationEndpoint replicationEndpoint)
+ throws IOException, TimeoutException {
+ TableDescriptors tableDescriptors = null;
+ if (server instanceof HRegionServer) {
+ tableDescriptors = ((HRegionServer) server).getTableDescriptors();
+ }
replicationEndpoint
.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs, peerId,
clusterId, replicationPeer, metrics, tableDescriptors, server));
@@ -265,60 +273,6 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
}
- @Override
- public void run() {
- // mark we are running now
- this.sourceRunning = true;
-
- int sleepMultiplier = 1;
- while (this.isSourceActive()) {
- try {
- initAndStartReplicationEndpoint();
- break;
- } catch (Exception e) {
- LOG.warn("Error starting ReplicationEndpoint, retrying", e);
- if (replicationEndpoint != null) {
- replicationEndpoint.stop();
- replicationEndpoint = null;
- }
- if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
- sleepMultiplier++;
- }
- }
- }
- this.endpointInitialized = true;
-
- sleepMultiplier = 1;
- // delay this until we are in an asynchronous thread
- while (this.isSourceActive() && this.peerClusterId == null) {
- this.peerClusterId = replicationEndpoint.getPeerUUID();
- if (this.isSourceActive() && this.peerClusterId == null) {
- if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
- sleepMultiplier++;
- }
- }
- }
-
- // In rare case, zookeeper setting may be messed up. That leads to the incorrect
- // peerClusterId value, which is the same as the source clusterId
- if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
- this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
- + peerClusterId + " which is not allowed by ReplicationEndpoint:"
- + replicationEndpoint.getClass().getName(), null, false);
- this.manager.removeSource(this);
- return;
- }
- LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
-
- initializeWALEntryFilter();
- // start workers
- for (Map.Entry> entry : queues.entrySet()) {
- String walGroupId = entry.getKey();
- PriorityBlockingQueue queue = entry.getValue();
- tryStartNewShipper(walGroupId, queue);
- }
- }
-
private void initializeWALEntryFilter() {
// get the WALEntryFilter from ReplicationEndpoint and add it to default filters
ArrayList filters =
@@ -332,37 +286,31 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue queue) {
- final ReplicationSourceShipper worker = new ReplicationSourceShipper(conf,
- walGroupId, queue, this);
+ ReplicationSourceShipper worker = new ReplicationSourceShipper(conf, walGroupId, queue, this);
ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
} else {
LOG.debug("Starting up worker for wal group " + walGroupId);
- worker.startup(getUncaughtExceptionHandler());
- worker.setWALReader(startNewWALReader(worker.getName(), walGroupId, queue,
- worker.getStartPosition()));
+ worker.startup(this::uncaughtException);
+ worker.setWALReader(
+ startNewWALReader(worker.getName(), walGroupId, queue, worker.getStartPosition()));
}
}
protected ReplicationSourceWALReader startNewWALReader(String threadName, String walGroupId,
PriorityBlockingQueue queue, long startPosition) {
ReplicationSourceWALReader walReader =
- new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
+ new ReplicationSourceWALReader(fs, conf, queue, startPosition, walEntryFilter, this);
return (ReplicationSourceWALReader) Threads.setDaemonThreadRunning(walReader,
threadName + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
- getUncaughtExceptionHandler());
+ this::uncaughtException);
}
- public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
- return new Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(final Thread t, final Throwable e) {
- RSRpcServices.exitIfOOME(e);
- LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e);
- server.stop("Unexpected exception in " + t.getName());
- }
- };
+ protected final void uncaughtException(Thread t, Throwable e) {
+ RSRpcServices.exitIfOOME(e);
+ LOG.error("Unexpected exception in " + t.getName() + " currentPath=" + getCurrentPath(), e);
+ server.abort("Unexpected exception in " + t.getName(), e);
}
@Override
@@ -435,17 +383,76 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
return replicationPeer.isPeerEnabled();
}
+ private void initialize() {
+ int sleepMultiplier = 1;
+ while (this.isSourceActive()) {
+ ReplicationEndpoint replicationEndpoint;
+ try {
+ replicationEndpoint = createReplicationEndpoint();
+ } catch (Exception e) {
+ LOG.warn("error creating ReplicationEndpoint, retry", e);
+ if (sleepForRetries("Error creating ReplicationEndpoint", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
+
+ try {
+ initAndStartReplicationEndpoint(replicationEndpoint);
+ this.replicationEndpoint = replicationEndpoint;
+ break;
+ } catch (Exception e) {
+ LOG.warn("Error starting ReplicationEndpoint, retry", e);
+ replicationEndpoint.stop();
+ if (sleepForRetries("Error starting ReplicationEndpoint", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ }
+ }
+
+ if (!this.isSourceActive()) {
+ return;
+ }
+
+ sleepMultiplier = 1;
+ // delay this until we are in an asynchronous thread
+ while (this.isSourceActive() && this.peerClusterId == null) {
+ this.peerClusterId = replicationEndpoint.getPeerUUID();
+ if (this.isSourceActive() && this.peerClusterId == null) {
+ if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ }
+ }
+
+ // In rare case, zookeeper setting may be messed up. That leads to the incorrect
+ // peerClusterId value, which is the same as the source clusterId
+ if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
+ this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
+ + peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ + replicationEndpoint.getClass().getName(), null, false);
+ this.manager.removeSource(this);
+ return;
+ }
+ LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
+
+ initializeWALEntryFilter();
+ // start workers
+ for (Map.Entry> entry : queues.entrySet()) {
+ String walGroupId = entry.getKey();
+ PriorityBlockingQueue queue = entry.getValue();
+ tryStartNewShipper(walGroupId, queue);
+ }
+ }
+
@Override
public void startup() {
- String n = Thread.currentThread().getName();
- Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(final Thread t, final Throwable e) {
- LOG.error("Unexpected exception in ReplicationSource", e);
- }
- };
- Threads.setDaemonThreadRunning(this, n + ".replicationSource," + this.queueId,
- handler);
+ // mark we are running now
+ this.sourceRunning = true;
+ initThread = new Thread(this::initialize);
+ Threads.setDaemonThreadRunning(initThread,
+ Thread.currentThread().getName() + ".replicationSource," + this.queueId,
+ this::uncaughtException);
}
@Override
@@ -466,6 +473,13 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
cause);
}
this.sourceRunning = false;
+ if (initThread != null && Thread.currentThread() != initThread) {
+ // This usually won't happen but anyway, let's wait until the initialization thread exits.
+ // And notice that we may call terminate directly from the initThread so here we need to
+ // avoid join on ourselves.
+ initThread.interrupt();
+ Threads.shutdown(initThread, this.sleepForRetries);
+ }
Collection workers = workerThreads.values();
for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
@@ -482,11 +496,11 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
if (this.replicationEndpoint != null) {
try {
- this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier, TimeUnit.MILLISECONDS);
+ this.replicationEndpoint.awaitTerminated(sleepForRetries * maxRetriesMultiplier,
+ TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
- LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :"
- + this.queueId,
- te);
+ LOG.warn("Got exception while waiting for endpoint to shutdown for replication source :" +
+ this.queueId, te);
}
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index 89cf3934a9a..a6091e16639 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -33,7 +33,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;