HBASE-25117 ReplicationSourceShipper thread can not be finished (#2521)

Signed-off-by: Wellington Chevreuil <wellington.chevreuil@gmail.com>
Signed-off-by: stack <stack@apache.org>
Signed-off-by: Guanghao Zhang <zghao@apache.org>
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
XinSun 2020-10-15 01:08:54 +08:00 committed by GitHub
parent fd0ecadbb9
commit 78b7244091
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 6 additions and 5 deletions

View File

@ -193,7 +193,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
* @param sleepMultiplier by how many times the default sleeping time is augmented * @param sleepMultiplier by how many times the default sleeping time is augmented
* @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code> * @return True if <code>sleepMultiplier</code> is &lt; <code>maxRetriesMultiplier</code>
*/ */
protected boolean sleepForRetries(String msg, int sleepMultiplier) { private boolean sleepForRetries(String msg, int sleepMultiplier) {
try { try {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("{} {}, sleeping {} times {}", LOG.trace("{} {}, sleeping {} times {}",
@ -201,8 +201,9 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
} }
Thread.sleep(this.sleepForRetries * sleepMultiplier); Thread.sleep(this.sleepForRetries * sleepMultiplier);
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("{} Interrupted while sleeping between retries", logPeerId()); LOG.debug("{} {} Interrupted while sleeping between retries", msg, logPeerId());
} }
} }
return sleepMultiplier < maxRetriesMultiplier; return sleepMultiplier < maxRetriesMultiplier;

View File

@ -691,6 +691,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
} }
} }
if (this.replicationEndpoint != null) {
this.replicationEndpoint.stop();
}
for (ReplicationSourceShipper worker : workers) { for (ReplicationSourceShipper worker : workers) {
if (worker.isAlive() || worker.entryReader.isAlive()) { if (worker.isAlive() || worker.entryReader.isAlive()) {
try { try {
@ -711,9 +714,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
} }
} }
if (this.replicationEndpoint != null) {
this.replicationEndpoint.stop();
}
if (join) { if (join) {
for (ReplicationSourceShipper worker : workers) { for (ReplicationSourceShipper worker : workers) {
Threads.shutdown(worker, this.sleepForRetries); Threads.shutdown(worker, this.sleepForRetries);