HBASE-16081 Wait for Replication Tasks to complete before killing the ThreadPoolExecutor inside of HBaseInterClusterReplicationEndpoint

Signed-off-by: Mikhail Antonov <antonov@apache.org>
This commit is contained in:
Joseph Hwang 2016-07-11 13:17:56 -07:00 committed by Mikhail Antonov
parent a396ae773a
commit ccf293d7fb
3 changed files with 36 additions and 7 deletions

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -57,6 +58,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
private final String peerId; private final String peerId;
private final UUID clusterId; private final UUID clusterId;
private final MetricsSource metrics; private final MetricsSource metrics;
private final Abortable abortable;
@InterfaceAudience.Private @InterfaceAudience.Private
public Context( public Context(
@ -66,7 +68,8 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
final UUID clusterId, final UUID clusterId,
final ReplicationPeer replicationPeer, final ReplicationPeer replicationPeer,
final MetricsSource metrics, final MetricsSource metrics,
final TableDescriptors tableDescriptors) { final TableDescriptors tableDescriptors,
final Abortable abortable) {
this.conf = conf; this.conf = conf;
this.fs = fs; this.fs = fs;
this.clusterId = clusterId; this.clusterId = clusterId;
@ -74,6 +77,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
this.replicationPeer = replicationPeer; this.replicationPeer = replicationPeer;
this.metrics = metrics; this.metrics = metrics;
this.tableDescriptors = tableDescriptors; this.tableDescriptors = tableDescriptors;
this.abortable = abortable;
} }
public Configuration getConfiguration() { public Configuration getConfiguration() {
return conf; return conf;
@ -99,6 +103,7 @@ public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListe
public TableDescriptors getTableDescriptors() { public TableDescriptors getTableDescriptors() {
return tableDescriptors; return tableDescriptors;
} }
public Abortable getAbortable() { return abortable; }
} }
/** /**

View File

@ -40,6 +40,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
@ -71,17 +72,19 @@ import org.apache.hadoop.ipc.RemoteException;
public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint { public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class); private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
private ClusterConnection conn; private ClusterConnection conn;
private Configuration conf; private Configuration conf;
// How long should we sleep for each retry // How long should we sleep for each retry
private long sleepForRetries; private long sleepForRetries;
// Maximum number of retries before taking bold actions // Maximum number of retries before taking bold actions
private int maxRetriesMultiplier; private int maxRetriesMultiplier;
// Socket timeouts require even bolder actions since we don't want to DDOS // Socket timeouts require even bolder actions since we don't want to DDOS
private int socketTimeoutMultiplier; private int socketTimeoutMultiplier;
// Amount of time for shutdown to wait for all tasks to complete
private long maxTerminationWait;
//Metrics for this source //Metrics for this source
private MetricsSource metrics; private MetricsSource metrics;
// Handles connecting to peer region servers // Handles connecting to peer region servers
@ -93,6 +96,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private Path baseNamespaceDir; private Path baseNamespaceDir;
private Path hfileArchiveDir; private Path hfileArchiveDir;
private boolean replicationBulkLoadDataEnabled; private boolean replicationBulkLoadDataEnabled;
private Abortable abortable;
@Override @Override
public void init(Context context) throws IOException { public void init(Context context) throws IOException {
@ -102,6 +106,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
maxRetriesMultiplier); maxRetriesMultiplier);
// A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator
// tasks to terminate when doStop() is called.
long maxTerminationWaitMultiplier = this.conf.getLong(
"replication.source.maxterminationmultiplier",
DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
this.maxTerminationWait = maxTerminationWaitMultiplier *
this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
// TODO: This connection is replication specific or we should make it particular to // TODO: This connection is replication specific or we should make it particular to
// replication and make replication specific settings such as compression or codec to use // replication and make replication specific settings such as compression or codec to use
// passing Cells. // passing Cells.
@ -117,6 +128,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS, this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()); new LinkedBlockingQueue<Runnable>());
this.exec.allowCoreThreadTimeOut(true); this.exec.allowCoreThreadTimeOut(true);
this.abortable = ctx.getAbortable();
this.replicationBulkLoadDataEnabled = this.replicationBulkLoadDataEnabled =
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
@ -211,7 +223,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e); entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e);
} }
} }
while (this.isRunning()) { while (this.isRunning() && !exec.isShutdown()) {
if (!isPeerEnabled()) { if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++; sleepMultiplier++;
@ -321,7 +333,19 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
LOG.warn("Failed to close the connection"); LOG.warn("Failed to close the connection");
} }
} }
exec.shutdownNow(); // Allow currently running replication tasks to finish
exec.shutdown();
try {
exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
// Abort if the tasks did not terminate in time
if (!exec.isTerminated()) {
String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The " +
"ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. " +
"Aborting to prevent Replication from deadlocking. See HBASE-16081.";
abortable.abort(errMsg, new IOException(errMsg));
}
notifyStopped(); notifyStopped();
} }

View File

@ -489,7 +489,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// init replication endpoint // init replication endpoint
replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(), replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors)); fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server));
return src; return src;
} }