HBASE-25100 conf and conn are assigned twice in HBaseReplicationEndpoint and HBaseInterClusterReplicationEndpoint (#2463)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
fbef545989
commit
68b56beab7
|
@ -60,10 +60,11 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
|||
private static final Logger LOG = LoggerFactory.getLogger(HBaseReplicationEndpoint.class);
|
||||
|
||||
private ZKWatcher zkw = null;
|
||||
private final Object zkwLock = new Object();
|
||||
|
||||
protected Configuration conf;
|
||||
|
||||
protected AsyncClusterConnection conn;
|
||||
private AsyncClusterConnection conn;
|
||||
|
||||
/**
|
||||
* Default maximum number of times a replication sink can be reported as bad before
|
||||
|
@ -103,10 +104,6 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
|||
public void init(Context context) throws IOException {
|
||||
super.init(context);
|
||||
this.conf = HBaseConfiguration.create(ctx.getConfiguration());
|
||||
// TODO: This connection is replication specific or we should make it particular to
|
||||
// replication and make replication specific settings such as compression or codec to use
|
||||
// passing Cells.
|
||||
this.conn = createConnection(this.conf);
|
||||
this.ratio =
|
||||
ctx.getConfiguration().getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
|
||||
this.badSinkThreshold =
|
||||
|
@ -114,9 +111,19 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
|||
this.badReportCounts = Maps.newHashMap();
|
||||
}
|
||||
|
||||
protected synchronized void disconnect() {
|
||||
if (zkw != null) {
|
||||
zkw.close();
|
||||
protected void disconnect() {
|
||||
synchronized (zkwLock) {
|
||||
if (zkw != null) {
|
||||
zkw.close();
|
||||
}
|
||||
}
|
||||
if (this.conn != null) {
|
||||
try {
|
||||
this.conn.close();
|
||||
this.conn = null;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("{} Failed to close the connection", ctx.getPeerId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -128,11 +135,11 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
|||
if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException
|
||||
|| ke instanceof AuthFailedException) {
|
||||
String clusterKey = ctx.getPeerConfig().getClusterKey();
|
||||
LOG.warn("Lost the ZooKeeper connection for peer " + clusterKey, ke);
|
||||
LOG.warn("Lost the ZooKeeper connection for peer {}", clusterKey, ke);
|
||||
try {
|
||||
reloadZkWatcher();
|
||||
} catch (IOException io) {
|
||||
LOG.warn("Creation of ZookeeperWatcher failed for peer " + clusterKey, io);
|
||||
LOG.warn("Creation of ZookeeperWatcher failed for peer {}", clusterKey, io);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -151,6 +158,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
|||
protected void doStart() {
|
||||
try {
|
||||
reloadZkWatcher();
|
||||
connectPeerCluster();
|
||||
notifyStarted();
|
||||
} catch (IOException e) {
|
||||
notifyFailed(e);
|
||||
|
@ -168,10 +176,12 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
|||
// limit connections when multiple replication sources try to connect to
|
||||
// the peer cluster. If the peer cluster is down we can get out of control
|
||||
// over time.
|
||||
public synchronized UUID getPeerUUID() {
|
||||
public UUID getPeerUUID() {
|
||||
UUID peerUUID = null;
|
||||
try {
|
||||
peerUUID = ZKClusterId.getUUIDForCluster(zkw);
|
||||
synchronized (zkwLock) {
|
||||
peerUUID = ZKClusterId.getUUIDForCluster(zkw);
|
||||
}
|
||||
} catch (KeeperException ke) {
|
||||
reconnect(ke);
|
||||
}
|
||||
|
@ -182,13 +192,24 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
|||
* Closes the current ZKW (if not null) and creates a new one
|
||||
* @throws IOException If anything goes wrong connecting
|
||||
*/
|
||||
private synchronized void reloadZkWatcher() throws IOException {
|
||||
if (zkw != null) {
|
||||
zkw.close();
|
||||
private void reloadZkWatcher() throws IOException {
|
||||
synchronized (zkwLock) {
|
||||
if (zkw != null) {
|
||||
zkw.close();
|
||||
}
|
||||
zkw = new ZKWatcher(ctx.getConfiguration(),
|
||||
"connection to cluster: " + ctx.getPeerId(), this);
|
||||
zkw.registerListener(new PeerRegionServerListener(this));
|
||||
}
|
||||
}
|
||||
|
||||
private void connectPeerCluster() throws IOException {
|
||||
try {
|
||||
conn = createConnection(this.conf);
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("{} Failed to create connection for peer cluster", ctx.getPeerId(), ioe);
|
||||
throw ioe;
|
||||
}
|
||||
zkw = new ZKWatcher(ctx.getConfiguration(),
|
||||
"connection to cluster: " + ctx.getPeerId(), this);
|
||||
zkw.registerListener(new PeerRegionServerListener(this));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -211,7 +232,9 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
|
|||
protected List<ServerName> fetchSlavesAddresses() {
|
||||
List<String> children = null;
|
||||
try {
|
||||
children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode);
|
||||
synchronized (zkwLock) {
|
||||
children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.getZNodePaths().rsZNode);
|
||||
}
|
||||
} catch (KeeperException ke) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Fetch slaves addresses failed", ke);
|
||||
|
|
|
@ -44,14 +44,11 @@ import org.apache.commons.lang3.StringUtils;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
|
@ -59,7 +56,6 @@ import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
|
|||
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
|
||||
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
@ -127,7 +123,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
@Override
|
||||
public void init(Context context) throws IOException {
|
||||
super.init(context);
|
||||
this.conf = HBaseConfiguration.create(ctx.getConfiguration());
|
||||
decorateConf();
|
||||
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
|
||||
this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
|
||||
|
@ -139,10 +134,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
|
||||
this.maxTerminationWait = maxTerminationWaitMultiplier *
|
||||
this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
|
||||
// TODO: This connection is replication specific or we should make it particular to
|
||||
// replication and make replication specific settings such as compression or codec to use
|
||||
// passing Cells.
|
||||
this.conn = createConnection(this.conf);
|
||||
this.sleepForRetries =
|
||||
this.conf.getLong("replication.source.sleepforretries", 1000);
|
||||
this.metrics = context.getMetrics();
|
||||
|
@ -412,19 +403,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
return entryList;
|
||||
}
|
||||
|
||||
private void reconnectToPeerCluster() {
|
||||
AsyncClusterConnection connection = null;
|
||||
try {
|
||||
connection =
|
||||
ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent());
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("{} Failed to create connection for peer cluster", logPeerId(), ioe);
|
||||
}
|
||||
if (connection != null) {
|
||||
this.conn = connection;
|
||||
}
|
||||
}
|
||||
|
||||
private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext,
|
||||
List<List<Entry>> batches) throws IOException {
|
||||
int futures = 0;
|
||||
|
@ -504,9 +482,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
}
|
||||
continue;
|
||||
}
|
||||
if (this.conn == null) {
|
||||
reconnectToPeerCluster();
|
||||
}
|
||||
try {
|
||||
// replicate the batches to sink side.
|
||||
parallelReplicate(pool, replicateContext, batches);
|
||||
|
@ -564,14 +539,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
@Override
|
||||
protected void doStop() {
|
||||
disconnect(); // don't call super.doStop()
|
||||
if (this.conn != null) {
|
||||
try {
|
||||
this.conn.close();
|
||||
this.conn = null;
|
||||
} catch (IOException e) {
|
||||
LOG.warn("{} Failed to close the connection", logPeerId());
|
||||
}
|
||||
}
|
||||
// Allow currently running replication tasks to finish
|
||||
exec.shutdown();
|
||||
try {
|
||||
|
|
Loading…
Reference in New Issue