From 68b56beab744e983df0877eec9f576ef884a2807 Mon Sep 17 00:00:00 2001 From: XinSun Date: Tue, 29 Sep 2020 08:27:37 +0800 Subject: [PATCH] HBASE-25100 conf and conn are assigned twice in HBaseReplicationEndpoint and HBaseInterClusterReplicationEndpoint (#2463) Signed-off-by: Duo Zhang Signed-off-by: Guanghao Zhang --- .../replication/HBaseReplicationEndpoint.java | 61 +++++++++++++------ .../HBaseInterClusterReplicationEndpoint.java | 33 ---------- 2 files changed, 42 insertions(+), 52 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java index 850a7912556..b08c99098c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HBaseReplicationEndpoint.java @@ -60,10 +60,11 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class); private ZKWatcher zkw = null; + private final Object zkwLock = new Object(); protected Configuration conf; - protected AsyncClusterConnection conn; + private AsyncClusterConnection conn; /** * Default maximum number of times a replication sink can be reported as bad before @@ -103,10 +104,6 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint public void init(Context context) throws IOException { super.init(context); this.conf = HBaseConfiguration.create(ctx.getConfiguration()); - // TODO: This connection is replication specific or we should make it particular to - // replication and make replication specific settings such as compression or codec to use - // passing Cells. - this.conn = createConnection(this.conf); this.ratio = ctx.getConfiguration().getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO); this.badSinkThreshold = @@ -114,9 +111,19 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint this.badReportCounts = Maps.newHashMap(); } - protected synchronized void disconnect() { - if (zkw != null) { - zkw.close(); + protected void disconnect() { + synchronized (zkwLock) { + if (zkw != null) { + zkw.close(); + } + } + if (this.conn != null) { + try { + this.conn.close(); + this.conn = null; + } catch (IOException e) { + LOG.warn("{} Failed to close the connection", ctx.getPeerId()); + } } } @@ -128,11 +135,11 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException || ke instanceof AuthFailedException) { String clusterKey = ctx.getPeerConfig().getClusterKey(); - LOG.warn("Lost the ZooKeeper connection for peer " + clusterKey, ke); + LOG.warn("Lost the ZooKeeper connection for peer {}", clusterKey, ke); try { reloadZkWatcher(); } catch (IOException io) { - LOG.warn("Creation of ZookeeperWatcher failed for peer " + clusterKey, io); + LOG.warn("Creation of ZookeeperWatcher failed for peer {}", clusterKey, io); } } } @@ -151,6 +158,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint protected void doStart() { try { reloadZkWatcher(); + connectPeerCluster(); notifyStarted(); } catch (IOException e) { notifyFailed(e); @@ -168,10 +176,12 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint // limit connections when multiple replication sources try to connect to // the peer cluster. If the peer cluster is down we can get out of control // over time. - public synchronized UUID getPeerUUID() { + public UUID getPeerUUID() { UUID peerUUID = null; try { - peerUUID = ZKClusterId.getUUIDForCluster(zkw); + synchronized (zkwLock) { + peerUUID = ZKClusterId.getUUIDForCluster(zkw); + } } catch (KeeperException ke) { reconnect(ke); } @@ -182,13 +192,24 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint * Closes the current ZKW (if not null) and creates a new one * @throws IOException If anything goes wrong connecting */ - private synchronized void reloadZkWatcher() throws IOException { - if (zkw != null) { - zkw.close(); + private void reloadZkWatcher() throws IOException { + synchronized (zkwLock) { + if (zkw != null) { + zkw.close(); + } + zkw = new ZKWatcher(ctx.getConfiguration(), + "connection to cluster: " + ctx.getPeerId(), this); + zkw.registerListener(new PeerRegionServerListener(this)); + } + } + + private void connectPeerCluster() throws IOException { + try { + conn = createConnection(this.conf); + } catch (IOException ioe) { + LOG.warn("{} Failed to create connection for peer cluster", ctx.getPeerId(), ioe); + throw ioe; } - zkw = new ZKWatcher(ctx.getConfiguration(), - "connection to cluster: " + ctx.getPeerId(), this); - zkw.registerListener(new PeerRegionServerListener(this)); } @Override @@ -211,7 +232,9 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint protected List fetchSlavesAddresses() { List children = null; try { - children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode); + synchronized (zkwLock) { + children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode); + } } catch (KeeperException ke) { if (LOG.isDebugEnabled()) { LOG.debug("Fetch slaves addresses failed", ke); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index b6e1f69173f..b127b467505 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -44,14 +44,11 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin; -import org.apache.hadoop.hbase.client.ClusterConnectionFactory; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.ipc.RpcServer; @@ -59,7 +56,6 @@ import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Threads; @@ -127,7 +123,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi @Override public void init(Context context) throws IOException { super.init(context); - this.conf = HBaseConfiguration.create(ctx.getConfiguration()); decorateConf(); this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300); this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", @@ -139,10 +134,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER); this.maxTerminationWait = maxTerminationWaitMultiplier * this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - // TODO: This connection is replication specific or we should make it particular to - // replication and make replication specific settings such as compression or codec to use - // passing Cells. - this.conn = createConnection(this.conf); this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000); this.metrics = context.getMetrics(); @@ -412,19 +403,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi return entryList; } - private void reconnectToPeerCluster() { - AsyncClusterConnection connection = null; - try { - connection = - ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent()); - } catch (IOException ioe) { - LOG.warn("{} Failed to create connection for peer cluster", logPeerId(), ioe); - } - if (connection != null) { - this.conn = connection; - } - } - private long parallelReplicate(CompletionService pool, ReplicateContext replicateContext, List> batches) throws IOException { int futures = 0; @@ -504,9 +482,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi } continue; } - if (this.conn == null) { - reconnectToPeerCluster(); - } try { // replicate the batches to sink side. parallelReplicate(pool, replicateContext, batches); @@ -564,14 +539,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi @Override protected void doStop() { disconnect(); // don't call super.doStop() - if (this.conn != null) { - try { - this.conn.close(); - this.conn = null; - } catch (IOException e) { - LOG.warn("{} Failed to close the connection", logPeerId()); - } - } // Allow currently running replication tasks to finish exec.shutdown(); try {