diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index c92b53db98a..69db31c15ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.List; import java.util.UUID; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -57,6 +58,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe private final String peerId; private final UUID clusterId; private final MetricsSource metrics; + private final Abortable abortable; @InterfaceAudience.Private public Context( @@ -66,7 +68,8 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe final UUID clusterId, final ReplicationPeer replicationPeer, final MetricsSource metrics, - final TableDescriptors tableDescriptors) { + final TableDescriptors tableDescriptors, + final Abortable abortable) { this.conf = conf; this.fs = fs; this.clusterId = clusterId; @@ -74,6 +77,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe this.replicationPeer = replicationPeer; this.metrics = metrics; this.tableDescriptors = tableDescriptors; + this.abortable = abortable; } public Configuration getConfiguration() { return conf; @@ -99,6 +103,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe public TableDescriptors getTableDescriptors() { return tableDescriptors; } + public Abortable getAbortable() { return abortable; } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 28340b5c1f0..bf3fd1b598f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -40,6 +40,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableNotFoundException; @@ -71,17 +72,19 @@ import org.apache.hadoop.ipc.RemoteException; public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint { private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class); + + private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2; + private ClusterConnection conn; - private Configuration conf; - // How long should we sleep for each retry private long sleepForRetries; - // Maximum number of retries before taking bold actions private int maxRetriesMultiplier; // Socket timeouts require even bolder actions since we don't want to DDOS private int socketTimeoutMultiplier; + // Amount of time for shutdown to wait for all tasks to complete + private long maxTerminationWait; //Metrics for this source private MetricsSource metrics; // Handles connecting to peer region servers @@ -93,6 +96,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi private Path baseNamespaceDir; private Path hfileArchiveDir; private boolean replicationBulkLoadDataEnabled; + private Abortable abortable; @Override public void init(Context context) throws IOException { @@ -102,6 +106,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier); + // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator + // tasks to terminate when doStop() is called. + long maxTerminationWaitMultiplier = this.conf.getLong( + "replication.source.maxterminationmultiplier", + DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER); + this.maxTerminationWait = maxTerminationWaitMultiplier * + this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); // TODO: This connection is replication specific or we should make it particular to // replication and make replication specific settings such as compression or codec to use // passing Cells. @@ -117,6 +128,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS, new LinkedBlockingQueue()); this.exec.allowCoreThreadTimeOut(true); + this.abortable = ctx.getAbortable(); this.replicationBulkLoadDataEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, @@ -211,7 +223,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e); } } - while (this.isRunning()) { + while (this.isRunning() && !exec.isShutdown()) { if (!isPeerEnabled()) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) { sleepMultiplier++; @@ -321,7 +333,19 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi LOG.warn("Failed to close the connection"); } } - exec.shutdownNow(); + // Allow currently running replication tasks to finish + exec.shutdown(); + try { + exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + } + // Abort if the tasks did not terminate in time + if (!exec.isTerminated()) { + String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The " + + "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. " + + "Aborting to prevent Replication from deadlocking. See HBASE-16081."; + abortable.abort(errMsg, new IOException(errMsg)); + } notifyStopped(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java index 143d6e28fef..e2a232f37a5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java @@ -489,7 +489,7 @@ public class ReplicationSourceManager implements ReplicationListener { // init replication endpoint replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(), - fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors)); + fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server)); return src; }