HBASE-14361 ReplicationSink should create Connection instances lazily

This commit is contained in:
stack 2015-09-15 05:15:47 -07:00
parent 042a63c24d
commit 236f005088
1 changed files with 23 additions and 4 deletions

View File

@ -72,9 +72,12 @@ public class ReplicationSink {
private static final Log LOG = LogFactory.getLog(ReplicationSink.class); private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
private final Configuration conf; private final Configuration conf;
private final Connection sharedHtableCon; // Volatile because of note in here -- look for double-checked locking:
// http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html
private volatile Connection sharedHtableCon;
private final MetricsSink metrics; private final MetricsSink metrics;
private final AtomicLong totalReplicatedEdits = new AtomicLong(); private final AtomicLong totalReplicatedEdits = new AtomicLong();
private final Object sharedHtableConLock = new Object();
/** /**
* Create a sink for replication * Create a sink for replication
@ -88,7 +91,6 @@ public class ReplicationSink {
this.conf = HBaseConfiguration.create(conf); this.conf = HBaseConfiguration.create(conf);
decorateConf(); decorateConf();
this.metrics = new MetricsSink(); this.metrics = new MetricsSink();
this.sharedHtableCon = ConnectionFactory.createConnection(this.conf);
} }
/** /**
@ -213,7 +215,14 @@ public class ReplicationSink {
*/ */
public void stopReplicationSinkServices() { public void stopReplicationSinkServices() {
try { try {
this.sharedHtableCon.close(); if (this.sharedHtableCon != null) {
synchronized (sharedHtableConLock) {
if (this.sharedHtableCon != null) {
this.sharedHtableCon.close();
this.sharedHtableCon = null;
}
}
}
} catch (IOException e) { } catch (IOException e) {
LOG.warn("IOException while closing the connection", e); // ignoring as we are closing. LOG.warn("IOException while closing the connection", e); // ignoring as we are closing.
} }
@ -232,7 +241,17 @@ public class ReplicationSink {
} }
Table table = null; Table table = null;
try { try {
table = this.sharedHtableCon.getTable(tableName); // See https://en.wikipedia.org/wiki/Double-checked_locking
Connection connection = this.sharedHtableCon;
if (connection == null) {
synchronized (sharedHtableConLock) {
connection = this.sharedHtableCon;
if (connection == null) {
connection = this.sharedHtableCon = ConnectionFactory.createConnection(this.conf);
}
}
}
table = connection.getTable(tableName);
for (List<Row> rows : allRows) { for (List<Row> rows : allRows) {
table.batch(rows); table.batch(rows);
} }