HBASE-24805 HBaseTestingUtility.getConnection should be threadsafe

* refactor how we use connection and async connection to rely on their access methods
* refactor initialization and cleanup of the shared connection
* incompatibly change HCTU's Configuration member variable to be final so it can be safely accessed from multiple threads.

Closes #2180

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Sean Busbey 2020-07-31 02:34:24 -05:00
parent 3470feed91
commit 86ebbdd8a2
No known key found for this signature in database
GPG Key ID: A926FD051016402D
2 changed files with 37 additions and 31 deletions

View File

@ -69,7 +69,7 @@ public class HBaseCommonTestingUtility {
Compression.Algorithm.NONE, Compression.Algorithm.GZ Compression.Algorithm.NONE, Compression.Algorithm.GZ
}; };
protected Configuration conf; protected final Configuration conf;
public HBaseCommonTestingUtility() { public HBaseCommonTestingUtility() {
this(null); this(null);

View File

@ -25,6 +25,7 @@ import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.Modifier; import java.lang.reflect.Modifier;
import java.net.BindException; import java.net.BindException;
@ -207,7 +208,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
* HBaseTestingUtility*/ * HBaseTestingUtility*/
private Path dataTestDirOnTestFS = null; private Path dataTestDirOnTestFS = null;
private volatile AsyncClusterConnection asyncConnection; private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
/** Filesystem URI used for map-reduce mini-cluster setup */ /** Filesystem URI used for map-reduce mini-cluster setup */
private static String FS_URI; private static String FS_URI;
@ -1237,14 +1238,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
public void restartHBaseCluster(StartMiniClusterOption option) public void restartHBaseCluster(StartMiniClusterOption option)
throws IOException, InterruptedException { throws IOException, InterruptedException {
if (hbaseAdmin != null) { closeConnection();
hbaseAdmin.close();
hbaseAdmin = null;
}
if (this.asyncConnection != null) {
this.asyncConnection.close();
this.asyncConnection = null;
}
this.hbaseCluster = this.hbaseCluster =
new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(), new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(), option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
@ -3041,11 +3035,6 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
return hbaseCluster; return hbaseCluster;
} }
private void initConnection() throws IOException {
User user = UserProvider.instantiate(conf).getCurrent();
this.asyncConnection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
}
/** /**
* Resets the connections so that the next time getConnection() is called, a new connection is * Resets the connections so that the next time getConnection() is called, a new connection is
* created. This is needed in cases where the entire cluster / all the masters are shutdown and * created. This is needed in cases where the entire cluster / all the masters are shutdown and
@ -3067,29 +3056,46 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
} }
/** /**
* Get a Connection to the cluster. Not thread-safe (This class needs a lot of work to make it * Get a shared Connection to the cluster.
* thread-safe). * this method is threadsafe.
* @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster. * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
*/ */
public Connection getConnection() throws IOException { public Connection getConnection() throws IOException {
if (this.asyncConnection == null) { return getAsyncConnection().toConnection();
initConnection();
}
return this.asyncConnection.toConnection();
} }
/**
* Get a shared AsyncClusterConnection to the cluster.
* this method is threadsafe.
* @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown of cluster.
*/
public AsyncClusterConnection getAsyncConnection() throws IOException { public AsyncClusterConnection getAsyncConnection() throws IOException {
if (this.asyncConnection == null) { try {
initConnection(); return asyncConnection.updateAndGet(connection -> {
if (connection == null) {
try {
User user = UserProvider.instantiate(conf).getCurrent();
connection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
} catch(IOException ioe) {
throw new UncheckedIOException("Failed to create connection", ioe);
}
}
return connection;
});
} catch (UncheckedIOException exception) {
throw exception.getCause();
} }
return this.asyncConnection;
} }
public void closeConnection() throws IOException { public void closeConnection() throws IOException {
Closeables.close(hbaseAdmin, true); if (hbaseAdmin != null) {
Closeables.close(asyncConnection, true); Closeables.close(hbaseAdmin, true);
this.hbaseAdmin = null; hbaseAdmin = null;
this.asyncConnection = null; }
AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
if (asyncConnection != null) {
Closeables.close(asyncConnection, true);
}
} }
/** /**
@ -3252,7 +3258,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager() Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
.getRegionStates().getRegionAssignments(); .getRegionStates().getRegionAssignments();
final List<Pair<RegionInfo, ServerName>> metaLocations = final List<Pair<RegionInfo, ServerName>> metaLocations =
MetaTableAccessor.getTableRegionsAndLocations(asyncConnection.toConnection(), tableName); MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) { for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
RegionInfo hri = metaLocation.getFirst(); RegionInfo hri = metaLocation.getFirst();
ServerName sn = metaLocation.getSecond(); ServerName sn = metaLocation.getSecond();
@ -3272,7 +3278,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
public String explainTableState(final TableName table, TableState.State state) public String explainTableState(final TableName table, TableState.State state)
throws IOException { throws IOException {
TableState tableState = MetaTableAccessor.getTableState(asyncConnection.toConnection(), table); TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
if (tableState == null) { if (tableState == null) {
return "TableState in META: No table state in META for table " + table + return "TableState in META: No table state in META for table " + table +
" last state in meta (including deleted is " + findLastTableState(table) + ")"; " last state in meta (including deleted is " + findLastTableState(table) + ")";
@ -3299,7 +3305,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
return true; return true;
} }
}; };
MetaTableAccessor.scanMeta(asyncConnection.toConnection(), null, null, MetaTableAccessor.scanMeta(getConnection(), null, null,
ClientMetaTableAccessor.QueryType.TABLE, Integer.MAX_VALUE, visitor); ClientMetaTableAccessor.QueryType.TABLE, Integer.MAX_VALUE, visitor);
return lastTableState.get(); return lastTableState.get();
} }