HBASE-16834 Add AsyncConnection support for ConnectionFactory

This commit is contained in:
zhangduo 2016-10-17 09:45:43 +08:00
parent 109db38b6a
commit 3a0dbf71a9
2 changed files with 54 additions and 2 deletions

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/** /**
@ -57,6 +58,9 @@ import org.apache.hadoop.hbase.security.UserProvider;
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ConnectionFactory { public class ConnectionFactory {
public static final String HBASE_CLIENT_ASYNC_CONNECTION_IMPL =
"hbase.client.async.connection.impl";
/** No public c.tors */ /** No public c.tors */
protected ConnectionFactory() { protected ConnectionFactory() {
} }
@ -233,4 +237,53 @@ public class ConnectionFactory {
throw new IOException(e); throw new IOException(e);
} }
} }
/**
* Call {@link #createAsyncConnection(Configuration)} using default HBaseConfiguration.
* @see #createAsyncConnection(Configuration)
* @return AsyncConnection object
*/
public static AsyncConnection createAsyncConnection() throws IOException {
return createAsyncConnection(HBaseConfiguration.create());
}
/**
* Call {@link #createAsyncConnection(Configuration, User)} using the given {@code conf} and a
* User object created by {@link UserProvider}. The given {@code conf} will also be used to
* initialize the {@link UserProvider}.
* @param conf configuration
* @return AsyncConnection object
* @see #createAsyncConnection(Configuration, User)
* @see UserProvider
*/
public static AsyncConnection createAsyncConnection(Configuration conf) throws IOException {
return createAsyncConnection(conf, UserProvider.instantiate(conf).getCurrent());
}
/**
* Create a new AsyncConnection instance using the passed {@code conf} and {@code user}.
* AsyncConnection encapsulates all housekeeping for a connection to the cluster. All tables and
* interfaces created from returned connection share zookeeper connection, meta cache, and
* connections to region servers and masters.
* <p>
* The caller is responsible for calling {@link AsyncConnection#close()} on the returned
* connection instance.
* <p>
* Usually you should only create one AsyncConnection instance in your code and use it everywhere
* as it is thread safe.
* @param conf configuration
* @param user the user the asynchronous connection is for
* @return AsyncConnection object
* @throws IOException
*/
public static AsyncConnection createAsyncConnection(Configuration conf, User user)
throws IOException {
Class<? extends AsyncConnection> clazz = conf.getClass(HBASE_CLIENT_ASYNC_CONNECTION_IMPL,
AsyncConnectionImpl.class, AsyncConnection.class);
try {
return ReflectionUtils.newInstance(clazz, conf, user);
} catch (Exception e) {
throw new IOException(e);
}
}
} }

View File

@ -28,7 +28,6 @@ import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -60,7 +59,7 @@ public class TestAsyncTable {
TEST_UTIL.startMiniCluster(1); TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME); TEST_UTIL.waitTableAvailable(TABLE_NAME);
ASYNC_CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent()); ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
} }
@AfterClass @AfterClass