HBASE-14361 ReplicationSink should create Connection instances lazily
This commit is contained in:
parent
72b4c906b8
commit
d147f8f8f4
|
@ -72,9 +72,12 @@ public class ReplicationSink {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
|
private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final Connection sharedHtableCon;
|
// Volatile because of note in here -- look for double-checked locking:
|
||||||
|
// http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html
|
||||||
|
private volatile Connection sharedHtableCon;
|
||||||
private final MetricsSink metrics;
|
private final MetricsSink metrics;
|
||||||
private final AtomicLong totalReplicatedEdits = new AtomicLong();
|
private final AtomicLong totalReplicatedEdits = new AtomicLong();
|
||||||
|
private final Object sharedHtableConLock = new Object();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a sink for replication
|
* Create a sink for replication
|
||||||
|
@ -88,7 +91,6 @@ public class ReplicationSink {
|
||||||
this.conf = HBaseConfiguration.create(conf);
|
this.conf = HBaseConfiguration.create(conf);
|
||||||
decorateConf();
|
decorateConf();
|
||||||
this.metrics = new MetricsSink();
|
this.metrics = new MetricsSink();
|
||||||
this.sharedHtableCon = ConnectionFactory.createConnection(this.conf);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -213,7 +215,14 @@ public class ReplicationSink {
|
||||||
*/
|
*/
|
||||||
public void stopReplicationSinkServices() {
|
public void stopReplicationSinkServices() {
|
||||||
try {
|
try {
|
||||||
|
if (this.sharedHtableCon != null) {
|
||||||
|
synchronized (sharedHtableConLock) {
|
||||||
|
if (this.sharedHtableCon != null) {
|
||||||
this.sharedHtableCon.close();
|
this.sharedHtableCon.close();
|
||||||
|
this.sharedHtableCon = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("IOException while closing the connection", e); // ignoring as we are closing.
|
LOG.warn("IOException while closing the connection", e); // ignoring as we are closing.
|
||||||
}
|
}
|
||||||
|
@ -232,7 +241,17 @@ public class ReplicationSink {
|
||||||
}
|
}
|
||||||
Table table = null;
|
Table table = null;
|
||||||
try {
|
try {
|
||||||
table = this.sharedHtableCon.getTable(tableName);
|
// See https://en.wikipedia.org/wiki/Double-checked_locking
|
||||||
|
Connection connection = this.sharedHtableCon;
|
||||||
|
if (connection == null) {
|
||||||
|
synchronized (sharedHtableConLock) {
|
||||||
|
connection = this.sharedHtableCon;
|
||||||
|
if (connection == null) {
|
||||||
|
connection = this.sharedHtableCon = ConnectionFactory.createConnection(this.conf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
table = connection.getTable(tableName);
|
||||||
for (List<Row> rows : allRows) {
|
for (List<Row> rows : allRows) {
|
||||||
table.batch(rows, null);
|
table.batch(rows, null);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue