HBASE-23683 Make HBaseInterClusterReplicationEndpoint more extensible (#1027)
Signed-off-by: Bharath Vissapragada <bharathv@apache.org> Signed-off-by: Josh Elser <elserj@apache.org> Signed-off-by: binlijin <binlijin@gmail.com>
This commit is contained in:
parent
d60ce17c17
commit
cb78b103a7
|
@ -113,6 +113,25 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
private boolean dropOnDeletedTables;
|
||||
private boolean isSerial = false;
|
||||
|
||||
/*
|
||||
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
|
||||
* Connection implementations, or initialize it in a different way, so defining createConnection
|
||||
* as protected for possible overridings.
|
||||
*/
|
||||
protected AsyncClusterConnection createConnection(Configuration conf) throws IOException {
|
||||
return ClusterConnectionFactory.createAsyncClusterConnection(conf,
|
||||
null, User.getCurrent());
|
||||
}
|
||||
|
||||
/*
|
||||
* Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
|
||||
* ReplicationSinkManager implementations, or initialize it in a different way,
|
||||
* so defining createReplicationSinkManager as protected for possible overridings.
|
||||
*/
|
||||
protected ReplicationSinkManager createReplicationSinkManager(AsyncClusterConnection conn) {
|
||||
return new ReplicationSinkManager(conn, this, this.conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Context context) throws IOException {
|
||||
super.init(context);
|
||||
|
@ -131,13 +150,12 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
// TODO: This connection is replication specific or we should make it particular to
|
||||
// replication and make replication specific settings such as compression or codec to use
|
||||
// passing Cells.
|
||||
this.conn =
|
||||
ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent());
|
||||
this.conn = createConnection(this.conf);
|
||||
this.sleepForRetries =
|
||||
this.conf.getLong("replication.source.sleepforretries", 1000);
|
||||
this.metrics = context.getMetrics();
|
||||
// ReplicationQueueInfo parses the peerId out of the znode for us
|
||||
this.replicationSinkMgr = new ReplicationSinkManager(conn, this, this.conf);
|
||||
this.replicationSinkMgr = createReplicationSinkManager(conn);
|
||||
// per sink thread pool
|
||||
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
|
||||
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
|
||||
|
|
Loading…
Reference in New Issue