HBASE-24805 HBaseTestingUtility.getConnection should be threadsafe
* refactor how we use connection to rely on the access method * refactor initialization and cleanup of the shared connection * incompatibly change HCTU's Configuration member variable to be final so it can be safely accessed from multiple threads. Closes #2188 adapted for jdk7 Signed-off-by: Viraj Jasani <vjasani@apache.org> (cherry picked from commit86ebbdd8a2
) (cherry picked from commit0806349ada
)
This commit is contained in:
parent
1f0abf8279
commit
af18670665
|
@ -40,7 +40,7 @@ import org.apache.hadoop.fs.Path;
|
|||
public class HBaseCommonTestingUtility {
|
||||
protected static final Log LOG = LogFactory.getLog(HBaseCommonTestingUtility.class);
|
||||
|
||||
protected Configuration conf;
|
||||
protected final Configuration conf;
|
||||
|
||||
public HBaseCommonTestingUtility() {
|
||||
this(null);
|
||||
|
|
|
@ -46,6 +46,7 @@ import java.util.Random;
|
|||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
|
@ -190,10 +191,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
* HBaseTestingUtility*/
|
||||
private Path dataTestDirOnTestFS = null;
|
||||
|
||||
/**
|
||||
* Shared cluster connection.
|
||||
*/
|
||||
private volatile Connection connection;
|
||||
private final AtomicReference<Connection> connectionRef = new AtomicReference<>();
|
||||
|
||||
/**
|
||||
* System property key to get test directory value.
|
||||
|
@ -1170,10 +1168,6 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
*/
|
||||
public void shutdownMiniCluster() throws Exception {
|
||||
LOG.info("Shutting down minicluster");
|
||||
if (this.connection != null && !this.connection.isClosed()) {
|
||||
this.connection.close();
|
||||
this.connection = null;
|
||||
}
|
||||
shutdownMiniHBaseCluster();
|
||||
if (!this.passedZkCluster){
|
||||
shutdownMiniZKCluster();
|
||||
|
@ -1203,10 +1197,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
* @throws IOException
|
||||
*/
|
||||
public void shutdownMiniHBaseCluster() throws IOException {
|
||||
if (hbaseAdmin != null) {
|
||||
hbaseAdmin.close0();
|
||||
hbaseAdmin = null;
|
||||
}
|
||||
closeConnection();
|
||||
|
||||
// unset the configuration for MIN and MAX RS to start
|
||||
conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1);
|
||||
|
@ -3020,16 +3011,26 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get a Connection to the cluster.
|
||||
* Not thread-safe (This class needs a lot of work to make it thread-safe).
|
||||
* Get a shared Connection to the cluster.
|
||||
* this method is threadsafe.
|
||||
* @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
|
||||
* @throws IOException
|
||||
*/
|
||||
public Connection getConnection() throws IOException {
|
||||
if (this.connection == null) {
|
||||
this.connection = ConnectionFactory.createConnection(this.conf);
|
||||
Connection connection = this.connectionRef.get();
|
||||
while (connection == null) {
|
||||
connection = ConnectionFactory.createConnection(this.conf);
|
||||
if (! this.connectionRef.compareAndSet(null, connection)) {
|
||||
try {
|
||||
connection.close();
|
||||
} catch (IOException exception) {
|
||||
LOG.debug("Ignored failure while closing connection on contended connection creation.",
|
||||
exception);
|
||||
}
|
||||
connection = this.connectionRef.get();
|
||||
}
|
||||
}
|
||||
return this.connection;
|
||||
return connection;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -3067,6 +3068,25 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
}
|
||||
}
|
||||
|
||||
public void closeConnection() throws IOException {
|
||||
if (hbaseAdmin != null) {
|
||||
try {
|
||||
hbaseAdmin.close0();
|
||||
} catch (IOException exception) {
|
||||
LOG.debug("Ignored failure while closing admin.", exception);
|
||||
}
|
||||
hbaseAdmin = null;
|
||||
}
|
||||
Connection connection = this.connectionRef.getAndSet(null);
|
||||
if (connection != null) {
|
||||
try {
|
||||
connection.close();
|
||||
} catch (IOException exception) {
|
||||
LOG.debug("Ignored failure while closing connection.", exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a ZooKeeperWatcher instance.
|
||||
* This instance is shared between HBaseTestingUtility instance users.
|
||||
|
@ -3240,7 +3260,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
|
|||
.getRegionAssignments();
|
||||
final List<Pair<HRegionInfo, ServerName>> metaLocations =
|
||||
MetaTableAccessor
|
||||
.getTableRegionsAndLocations(getZooKeeperWatcher(), connection, tableName);
|
||||
.getTableRegionsAndLocations(getZooKeeperWatcher(), getConnection(), tableName);
|
||||
for (Pair<HRegionInfo, ServerName> metaLocation : metaLocations) {
|
||||
HRegionInfo hri = metaLocation.getFirst();
|
||||
ServerName sn = metaLocation.getSecond();
|
||||
|
|
Loading…
Reference in New Issue