HBASE-11766 Backdoor CoprocessorHConnection is no longer being used for local writes
This commit is contained in:
parent
03687b80bc
commit
5ed89df4bb
|
@ -18,24 +18,18 @@
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
|
||||||
import org.apache.hadoop.hbase.RegionLocations;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
|
||||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
|
||||||
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
|
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.hadoop.hbase.security.UserProvider;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Connection to an HTable from within a Coprocessor. We can do some nice tricks since we know we
|
* Connection to an HTable from within a Coprocessor. We can do some nice tricks since we know we
|
||||||
|
@ -47,8 +41,8 @@ import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
class CoprocessorHConnection implements ClusterConnection {
|
public class CoprocessorHConnection extends HConnectionImplementation {
|
||||||
private static final NonceGenerator ng = new ConnectionManager.NoNonceGenerator();
|
private static final NonceGenerator NO_NONCE_GEN = new ConnectionManager.NoNonceGenerator();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create an unmanaged {@link HConnection} based on the environment in which we are running the
|
* Create an unmanaged {@link HConnection} based on the environment in which we are running the
|
||||||
|
@ -56,31 +50,56 @@ class CoprocessorHConnection implements ClusterConnection {
|
||||||
* cleanup mechanisms since we own everything).
|
* cleanup mechanisms since we own everything).
|
||||||
* @param env environment hosting the {@link HConnection}
|
* @param env environment hosting the {@link HConnection}
|
||||||
* @return an unmanaged {@link HConnection}.
|
* @return an unmanaged {@link HConnection}.
|
||||||
* @throws IOException if we cannot create the basic connection
|
* @throws IOException if we cannot create the connection
|
||||||
*/
|
*/
|
||||||
static ClusterConnection getConnectionForEnvironment(CoprocessorEnvironment env)
|
public static ClusterConnection getConnectionForEnvironment(CoprocessorEnvironment env)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
ClusterConnection connection =
|
|
||||||
ConnectionManager.createConnectionInternal(env.getConfiguration());
|
|
||||||
// this bit is a little hacky - just trying to get it going for the moment
|
// this bit is a little hacky - just trying to get it going for the moment
|
||||||
if (env instanceof RegionCoprocessorEnvironment) {
|
if (env instanceof RegionCoprocessorEnvironment) {
|
||||||
RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) env;
|
RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) env;
|
||||||
RegionServerServices services = e.getRegionServerServices();
|
RegionServerServices services = e.getRegionServerServices();
|
||||||
if (services instanceof HRegionServer) {
|
if (services instanceof HRegionServer) {
|
||||||
return new CoprocessorHConnection(connection, (HRegionServer) services);
|
return new CoprocessorHConnection((HRegionServer) services);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return connection;
|
return ConnectionManager.createConnectionInternal(env.getConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClusterConnection delegate;
|
private final ServerName serverName;
|
||||||
private ServerName serverName;
|
private final HRegionServer server;
|
||||||
private HRegionServer server;
|
|
||||||
|
|
||||||
public CoprocessorHConnection(ClusterConnection delegate, HRegionServer server) {
|
/**
|
||||||
|
* Legacy constructor
|
||||||
|
* @param delegate
|
||||||
|
* @param server
|
||||||
|
* @throws IOException if we cannot create the connection
|
||||||
|
* @deprecated delegate is not used
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
public CoprocessorHConnection(ClusterConnection delegate, HRegionServer server)
|
||||||
|
throws IOException {
|
||||||
|
this(server);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor that uses server configuration
|
||||||
|
* @param server
|
||||||
|
* @throws IOException if we cannot create the connection
|
||||||
|
*/
|
||||||
|
public CoprocessorHConnection(HRegionServer server) throws IOException {
|
||||||
|
this(server.getConfiguration(), server);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor that accepts custom configuration
|
||||||
|
* @param conf
|
||||||
|
* @param server
|
||||||
|
* @throws IOException if we cannot create the connection
|
||||||
|
*/
|
||||||
|
public CoprocessorHConnection(Configuration conf, HRegionServer server) throws IOException {
|
||||||
|
super(conf, false, null, UserProvider.instantiate(conf).getCurrent());
|
||||||
this.server = server;
|
this.server = server;
|
||||||
this.serverName = server.getServerName();
|
this.serverName = server.getServerName();
|
||||||
this.delegate = delegate;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -88,364 +107,15 @@ class CoprocessorHConnection implements ClusterConnection {
|
||||||
getClient(ServerName serverName) throws IOException {
|
getClient(ServerName serverName) throws IOException {
|
||||||
// client is trying to reach off-server, so we can't do anything special
|
// client is trying to reach off-server, so we can't do anything special
|
||||||
if (!this.serverName.equals(serverName)) {
|
if (!this.serverName.equals(serverName)) {
|
||||||
return delegate.getClient(serverName);
|
return super.getClient(serverName);
|
||||||
}
|
}
|
||||||
// the client is attempting to write to the same regionserver, we can short-circuit to our
|
// the client is attempting to write to the same regionserver, we can short-circuit to our
|
||||||
// local regionserver
|
// local regionserver
|
||||||
return server.getRSRpcServices();
|
return server.getRSRpcServices();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void abort(String why, Throwable e) {
|
|
||||||
delegate.abort(why, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isAborted() {
|
|
||||||
return delegate.isAborted();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Configuration getConfiguration() {
|
|
||||||
return delegate.getConfiguration();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HTableInterface getTable(String tableName) throws IOException {
|
|
||||||
return delegate.getTable(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HTableInterface getTable(byte[] tableName) throws IOException {
|
|
||||||
return delegate.getTable(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HTableInterface getTable(TableName tableName) throws IOException {
|
|
||||||
return delegate.getTable(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
|
|
||||||
return delegate.getTable(tableName, pool);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
|
|
||||||
return delegate.getTable(tableName, pool);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
|
|
||||||
return delegate.getTable(tableName, pool);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public RegionLocator getRegionLocator(TableName tableName) throws IOException {
|
|
||||||
return delegate.getRegionLocator(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Admin getAdmin() throws IOException { return delegate.getAdmin(); }
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException {
|
|
||||||
return delegate.isMasterRunning();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isTableEnabled(TableName tableName) throws IOException {
|
|
||||||
return delegate.isTableEnabled(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isTableEnabled(byte[] tableName) throws IOException {
|
|
||||||
return delegate.isTableEnabled(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isTableDisabled(TableName tableName) throws IOException {
|
|
||||||
return delegate.isTableDisabled(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isTableDisabled(byte[] tableName) throws IOException {
|
|
||||||
return delegate.isTableDisabled(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isTableAvailable(TableName tableName) throws IOException {
|
|
||||||
return delegate.isTableAvailable(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isTableAvailable(byte[] tableName) throws IOException {
|
|
||||||
return delegate.isTableAvailable(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws IOException {
|
|
||||||
return delegate.isTableAvailable(tableName, splitKeys);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isTableAvailable(byte[] tableName, byte[][] splitKeys) throws IOException {
|
|
||||||
return delegate.isTableAvailable(tableName, splitKeys);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HTableDescriptor[] listTables() throws IOException {
|
|
||||||
return delegate.listTables();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String[] getTableNames() throws IOException {
|
|
||||||
return delegate.getTableNames();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public TableName[] listTableNames() throws IOException {
|
|
||||||
return delegate.listTableNames();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HTableDescriptor getHTableDescriptor(TableName tableName) throws IOException {
|
|
||||||
return delegate.getHTableDescriptor(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HTableDescriptor getHTableDescriptor(byte[] tableName) throws IOException {
|
|
||||||
return delegate.getHTableDescriptor(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HRegionLocation locateRegion(TableName tableName, byte[] row) throws IOException {
|
|
||||||
return delegate.locateRegion(tableName, row);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HRegionLocation locateRegion(byte[] tableName, byte[] row) throws IOException {
|
|
||||||
return delegate.locateRegion(tableName, row);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void clearRegionCache() {
|
|
||||||
delegate.clearRegionCache();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void clearRegionCache(TableName tableName) {
|
|
||||||
delegate.clearRegionCache(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void clearRegionCache(byte[] tableName) {
|
|
||||||
delegate.clearRegionCache(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HRegionLocation relocateRegion(TableName tableName, byte[] row) throws IOException {
|
|
||||||
return delegate.relocateRegion(tableName, row);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HRegionLocation relocateRegion(TableName tableName, byte[] row, int replicaId)
|
|
||||||
throws IOException {
|
|
||||||
return delegate.relocateRegion(tableName, row, replicaId);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HRegionLocation relocateRegion(byte[] tableName, byte[] row) throws IOException {
|
|
||||||
return delegate.relocateRegion(tableName, row);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void updateCachedLocations(TableName tableName, byte[] regionName, byte[] rowkey,
|
|
||||||
Object exception, ServerName source) {
|
|
||||||
delegate.updateCachedLocations(tableName, regionName, rowkey, exception, source);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void updateCachedLocations(TableName tableName, byte[] rowkey, Object exception,
|
|
||||||
HRegionLocation source) {
|
|
||||||
delegate.updateCachedLocations(tableName, rowkey, exception, source);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void updateCachedLocations(byte[] tableName, byte[] rowkey, Object exception,
|
|
||||||
HRegionLocation source) {
|
|
||||||
delegate.updateCachedLocations(tableName, rowkey, exception, source);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HRegionLocation locateRegion(byte[] regionName) throws IOException {
|
|
||||||
return delegate.locateRegion(regionName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<HRegionLocation> locateRegions(TableName tableName) throws IOException {
|
|
||||||
return delegate.locateRegions(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<HRegionLocation> locateRegions(byte[] tableName) throws IOException {
|
|
||||||
return delegate.locateRegions(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<HRegionLocation>
|
|
||||||
locateRegions(TableName tableName, boolean useCache, boolean offlined) throws IOException {
|
|
||||||
return delegate.locateRegions(tableName, useCache, offlined);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public RegionLocations locateRegion(TableName tableName, byte[] row,
|
|
||||||
boolean useCache, boolean retry) throws IOException {
|
|
||||||
return delegate.locateRegion(tableName, row, useCache, retry);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache,
|
|
||||||
boolean retry, int replicaId) throws IOException {
|
|
||||||
return delegate.locateRegion(tableName, row, useCache, retry, replicaId);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<HRegionLocation> locateRegions(byte[] tableName, boolean useCache, boolean offlined)
|
|
||||||
throws IOException {
|
|
||||||
return delegate.locateRegions(tableName, useCache, offlined);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService.BlockingInterface getMaster()
|
|
||||||
throws IOException {
|
|
||||||
return delegate.getMaster();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface
|
|
||||||
getAdmin(ServerName serverName) throws IOException {
|
|
||||||
return delegate.getAdmin(serverName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface
|
|
||||||
getAdmin(ServerName serverName, boolean getMaster) throws IOException {
|
|
||||||
return delegate.getAdmin(serverName, getMaster);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HRegionLocation getRegionLocation(TableName tableName, byte[] row, boolean reload)
|
|
||||||
throws IOException {
|
|
||||||
return delegate.getRegionLocation(tableName, row, reload);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HRegionLocation getRegionLocation(byte[] tableName, byte[] row, boolean reload)
|
|
||||||
throws IOException {
|
|
||||||
return delegate.getRegionLocation(tableName, row, reload);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void processBatch(List<? extends Row> actions, TableName tableName, ExecutorService pool,
|
|
||||||
Object[] results) throws IOException, InterruptedException {
|
|
||||||
delegate.processBatch(actions, tableName, pool, results);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void processBatch(List<? extends Row> actions, byte[] tableName, ExecutorService pool,
|
|
||||||
Object[] results) throws IOException, InterruptedException {
|
|
||||||
delegate.processBatch(actions, tableName, pool, results);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <R> void processBatchCallback(List<? extends Row> list, TableName tableName,
|
|
||||||
ExecutorService pool, Object[] results, Callback<R> callback) throws IOException,
|
|
||||||
InterruptedException {
|
|
||||||
delegate.processBatchCallback(list, tableName, pool, results, callback);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public <R> void processBatchCallback(List<? extends Row> list, byte[] tableName,
|
|
||||||
ExecutorService pool, Object[] results, Callback<R> callback) throws IOException,
|
|
||||||
InterruptedException {
|
|
||||||
delegate.processBatchCallback(list, tableName, pool, results, callback);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setRegionCachePrefetch(TableName tableName, boolean enable) {
|
|
||||||
delegate.setRegionCachePrefetch(tableName, enable);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setRegionCachePrefetch(byte[] tableName, boolean enable) {
|
|
||||||
delegate.setRegionCachePrefetch(tableName, enable);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean getRegionCachePrefetch(TableName tableName) {
|
|
||||||
return delegate.getRegionCachePrefetch(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean getRegionCachePrefetch(byte[] tableName) {
|
|
||||||
return delegate.getRegionCachePrefetch(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getCurrentNrHRS() throws IOException {
|
|
||||||
return delegate.getCurrentNrHRS();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HTableDescriptor[] getHTableDescriptorsByTableName(List<TableName> tableNames)
|
|
||||||
throws IOException {
|
|
||||||
return delegate.getHTableDescriptorsByTableName(tableNames);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public HTableDescriptor[] getHTableDescriptors(List<String> tableNames) throws IOException {
|
|
||||||
return delegate.getHTableDescriptors(tableNames);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isClosed() {
|
|
||||||
return delegate.isClosed();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void clearCaches(ServerName sn) {
|
|
||||||
delegate.clearCaches(sn);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() throws IOException {
|
|
||||||
delegate.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void deleteCachedRegionLocation(HRegionLocation location) {
|
|
||||||
delegate.deleteCachedRegionLocation(location);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public MasterKeepAliveConnection getKeepAliveMasterService()
|
|
||||||
throws MasterNotRunningException {
|
|
||||||
return delegate.getKeepAliveMasterService();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isDeadServer(ServerName serverName) {
|
|
||||||
return delegate.isDeadServer(serverName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public NonceGenerator getNonceGenerator() {
|
public NonceGenerator getNonceGenerator() {
|
||||||
return ng; // don't use nonces for coprocessor connection
|
return NO_NONCE_GEN; // don't use nonces for coprocessor connection
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public AsyncProcess getAsyncProcess() {
|
|
||||||
return delegate.getAsyncProcess();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue