HBASE-9534: Short-Circuit Coprocessor HTable access when on the same server
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1524514 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
db0a4b2d07
commit
6bc48464ca
|
@ -192,6 +192,30 @@ public class HTable implements HTableInterface {
|
||||||
this.connection = HConnectionManager.getConnection(conf);
|
this.connection = HConnectionManager.getConnection(conf);
|
||||||
this.configuration = conf;
|
this.configuration = conf;
|
||||||
|
|
||||||
|
this.pool = getDefaultExecutor(conf);
|
||||||
|
this.finishSetup();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates an object to access a HBase table. Shares zookeeper connection and other resources with
|
||||||
|
* other HTable instances created with the same <code>connection</code> instance. Use this
|
||||||
|
* constructor when the HConnection instance is externally managed.
|
||||||
|
* @param tableName Name of the table.
|
||||||
|
* @param connection HConnection to be used.
|
||||||
|
* @throws IOException if a remote or network exception occurs
|
||||||
|
*/
|
||||||
|
public HTable(TableName tableName, HConnection connection) throws IOException {
|
||||||
|
this.tableName = tableName;
|
||||||
|
this.cleanupPoolOnClose = true;
|
||||||
|
this.cleanupConnectionOnClose = false;
|
||||||
|
this.connection = connection;
|
||||||
|
this.configuration = connection.getConfiguration();
|
||||||
|
|
||||||
|
this.pool = getDefaultExecutor(this.configuration);
|
||||||
|
this.finishSetup();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
|
||||||
int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
|
int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
|
||||||
if (maxThreads == 0) {
|
if (maxThreads == 0) {
|
||||||
maxThreads = 1; // is there a better default?
|
maxThreads = 1; // is there a better default?
|
||||||
|
@ -202,11 +226,10 @@ public class HTable implements HTableInterface {
|
||||||
// if it is necessary and will grow unbounded. This could be bad but in HCM
|
// if it is necessary and will grow unbounded. This could be bad but in HCM
|
||||||
// we only create as many Runnables as there are region servers. It means
|
// we only create as many Runnables as there are region servers. It means
|
||||||
// it also scales when new region servers are added.
|
// it also scales when new region servers are added.
|
||||||
this.pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
|
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
|
||||||
new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
|
new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("htable"));
|
||||||
((ThreadPoolExecutor) this.pool).allowCoreThreadTimeOut(true);
|
((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
|
||||||
|
return pool;
|
||||||
this.finishSetup();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -0,0 +1,396 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||||
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
|
import org.apache.hadoop.hbase.client.HConnection;
|
||||||
|
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||||
|
import org.apache.hadoop.hbase.client.HTableInterface;
|
||||||
|
import org.apache.hadoop.hbase.client.MasterAdminKeepAliveConnection;
|
||||||
|
import org.apache.hadoop.hbase.client.MasterMonitorKeepAliveConnection;
|
||||||
|
import org.apache.hadoop.hbase.client.Row;
|
||||||
|
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
|
||||||
|
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
|
||||||
|
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MasterAdminService.BlockingInterface;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
|
||||||
|
import com.google.protobuf.BlockingRpcChannel;
|
||||||
|
import com.google.protobuf.BlockingService;
|
||||||
|
import com.google.protobuf.Descriptors.MethodDescriptor;
|
||||||
|
import com.google.protobuf.Message;
|
||||||
|
import com.google.protobuf.RpcController;
|
||||||
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connection to an HTable from within a Coprocessor. We can do some nice tricks since we know we
|
||||||
|
* are on a regionserver, for instance skipping the full serialization/deserialization of objects
|
||||||
|
* when talking to the server.
|
||||||
|
* <p>
|
||||||
|
* You should not use this class from any client - its an internal class meant for use by the
|
||||||
|
* coprocessor framework.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class CoprocessorHConnection implements HConnection {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create an unmanaged {@link HConnection} based on the environment in which we are running the
|
||||||
|
* coprocessor. The {@link HConnection} must be externally cleaned up (we bypass the usual HTable
|
||||||
|
* cleanup mechanisms since we own everything).
|
||||||
|
* @param env environment hosting the {@link HConnection}
|
||||||
|
* @return an unmanaged {@link HConnection}.
|
||||||
|
* @throws IOException if we cannot create the basic connection
|
||||||
|
*/
|
||||||
|
public static HConnection getConnectionForEnvironment(CoprocessorEnvironment env)
|
||||||
|
throws IOException {
|
||||||
|
HConnection connection = HConnectionManager.createConnection(env.getConfiguration());
|
||||||
|
// this bit is a little hacky - just trying to get it going for the moment
|
||||||
|
if (env instanceof RegionCoprocessorEnvironment) {
|
||||||
|
RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) env;
|
||||||
|
RegionServerServices services = e.getRegionServerServices();
|
||||||
|
if (services instanceof HRegionServer) {
|
||||||
|
return new CoprocessorHConnection(connection, (HRegionServer) services);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
private HConnection delegate;
|
||||||
|
private ServerName serverName;
|
||||||
|
private HRegionServer server;
|
||||||
|
|
||||||
|
public CoprocessorHConnection(HConnection delegate, HRegionServer server) {
|
||||||
|
this.server = server;
|
||||||
|
this.serverName = server.getServerName();
|
||||||
|
this.delegate = delegate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface
|
||||||
|
getClient(ServerName serverName) throws IOException {
|
||||||
|
// client is trying to reach off-server, so we can't do anything special
|
||||||
|
if (!this.serverName.equals(serverName)) {
|
||||||
|
return delegate.getClient(serverName);
|
||||||
|
}
|
||||||
|
// the client is attempting to write to the same regionserver, we can short-circuit to our
|
||||||
|
// local regionserver
|
||||||
|
final BlockingService blocking = ClientService.newReflectiveBlockingService(this.server);
|
||||||
|
final RpcServerInterface rpc = this.server.getRpcServer();
|
||||||
|
|
||||||
|
final MonitoredRPCHandler status =
|
||||||
|
TaskMonitor.get().createRPCStatus(Thread.currentThread().getName());
|
||||||
|
status.pause("Setting up server-local call");
|
||||||
|
|
||||||
|
final long timestamp = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
|
BlockingRpcChannel channel = new BlockingRpcChannel() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Message callBlockingMethod(MethodDescriptor method, RpcController controller,
|
||||||
|
Message request, Message responsePrototype) throws ServiceException {
|
||||||
|
try {
|
||||||
|
// we never need a cell-scanner - everything is already fully formed
|
||||||
|
return rpc.call(blocking, method, request, null, timestamp, status).getFirst();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
return ClientService.newBlockingStub(channel);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void abort(String why, Throwable e) {
|
||||||
|
delegate.abort(why, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isAborted() {
|
||||||
|
return delegate.isAborted();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Configuration getConfiguration() {
|
||||||
|
return delegate.getConfiguration();
|
||||||
|
}
|
||||||
|
|
||||||
|
public HTableInterface getTable(String tableName) throws IOException {
|
||||||
|
return delegate.getTable(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HTableInterface getTable(byte[] tableName) throws IOException {
|
||||||
|
return delegate.getTable(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HTableInterface getTable(TableName tableName) throws IOException {
|
||||||
|
return delegate.getTable(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
|
||||||
|
return delegate.getTable(tableName, pool);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
|
||||||
|
return delegate.getTable(tableName, pool);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
|
||||||
|
return delegate.getTable(tableName, pool);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isMasterRunning() throws MasterNotRunningException, ZooKeeperConnectionException {
|
||||||
|
return delegate.isMasterRunning();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isTableEnabled(TableName tableName) throws IOException {
|
||||||
|
return delegate.isTableEnabled(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isTableEnabled(byte[] tableName) throws IOException {
|
||||||
|
return delegate.isTableEnabled(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isTableDisabled(TableName tableName) throws IOException {
|
||||||
|
return delegate.isTableDisabled(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isTableDisabled(byte[] tableName) throws IOException {
|
||||||
|
return delegate.isTableDisabled(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isTableAvailable(TableName tableName) throws IOException {
|
||||||
|
return delegate.isTableAvailable(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isTableAvailable(byte[] tableName) throws IOException {
|
||||||
|
return delegate.isTableAvailable(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws IOException {
|
||||||
|
return delegate.isTableAvailable(tableName, splitKeys);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isTableAvailable(byte[] tableName, byte[][] splitKeys) throws IOException {
|
||||||
|
return delegate.isTableAvailable(tableName, splitKeys);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HTableDescriptor[] listTables() throws IOException {
|
||||||
|
return delegate.listTables();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String[] getTableNames() throws IOException {
|
||||||
|
return delegate.getTableNames();
|
||||||
|
}
|
||||||
|
|
||||||
|
public TableName[] listTableNames() throws IOException {
|
||||||
|
return delegate.listTableNames();
|
||||||
|
}
|
||||||
|
|
||||||
|
public HTableDescriptor getHTableDescriptor(TableName tableName) throws IOException {
|
||||||
|
return delegate.getHTableDescriptor(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HTableDescriptor getHTableDescriptor(byte[] tableName) throws IOException {
|
||||||
|
return delegate.getHTableDescriptor(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HRegionLocation locateRegion(TableName tableName, byte[] row) throws IOException {
|
||||||
|
return delegate.locateRegion(tableName, row);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HRegionLocation locateRegion(byte[] tableName, byte[] row) throws IOException {
|
||||||
|
return delegate.locateRegion(tableName, row);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clearRegionCache() {
|
||||||
|
delegate.clearRegionCache();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clearRegionCache(TableName tableName) {
|
||||||
|
delegate.clearRegionCache(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clearRegionCache(byte[] tableName) {
|
||||||
|
delegate.clearRegionCache(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HRegionLocation relocateRegion(TableName tableName, byte[] row) throws IOException {
|
||||||
|
return delegate.relocateRegion(tableName, row);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HRegionLocation relocateRegion(byte[] tableName, byte[] row) throws IOException {
|
||||||
|
return delegate.relocateRegion(tableName, row);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateCachedLocations(TableName tableName, byte[] rowkey, Object exception,
|
||||||
|
HRegionLocation source) {
|
||||||
|
delegate.updateCachedLocations(tableName, rowkey, exception, source);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void updateCachedLocations(byte[] tableName, byte[] rowkey, Object exception,
|
||||||
|
HRegionLocation source) {
|
||||||
|
delegate.updateCachedLocations(tableName, rowkey, exception, source);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HRegionLocation locateRegion(byte[] regionName) throws IOException {
|
||||||
|
return delegate.locateRegion(regionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<HRegionLocation> locateRegions(TableName tableName) throws IOException {
|
||||||
|
return delegate.locateRegions(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<HRegionLocation> locateRegions(byte[] tableName) throws IOException {
|
||||||
|
return delegate.locateRegions(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<HRegionLocation>
|
||||||
|
locateRegions(TableName tableName, boolean useCache, boolean offlined) throws IOException {
|
||||||
|
return delegate.locateRegions(tableName, useCache, offlined);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<HRegionLocation> locateRegions(byte[] tableName, boolean useCache, boolean offlined)
|
||||||
|
throws IOException {
|
||||||
|
return delegate.locateRegions(tableName, useCache, offlined);
|
||||||
|
}
|
||||||
|
|
||||||
|
public BlockingInterface getMasterAdmin() throws IOException {
|
||||||
|
return delegate.getMasterAdmin();
|
||||||
|
}
|
||||||
|
|
||||||
|
public
|
||||||
|
org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.MasterMonitorService.BlockingInterface
|
||||||
|
getMasterMonitor() throws IOException {
|
||||||
|
return delegate.getMasterMonitor();
|
||||||
|
}
|
||||||
|
|
||||||
|
public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface
|
||||||
|
getAdmin(ServerName serverName) throws IOException {
|
||||||
|
return delegate.getAdmin(serverName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface
|
||||||
|
getAdmin(ServerName serverName, boolean getMaster) throws IOException {
|
||||||
|
return delegate.getAdmin(serverName, getMaster);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HRegionLocation getRegionLocation(TableName tableName, byte[] row, boolean reload)
|
||||||
|
throws IOException {
|
||||||
|
return delegate.getRegionLocation(tableName, row, reload);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HRegionLocation getRegionLocation(byte[] tableName, byte[] row, boolean reload)
|
||||||
|
throws IOException {
|
||||||
|
return delegate.getRegionLocation(tableName, row, reload);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void processBatch(List<? extends Row> actions, TableName tableName, ExecutorService pool,
|
||||||
|
Object[] results) throws IOException, InterruptedException {
|
||||||
|
delegate.processBatch(actions, tableName, pool, results);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void processBatch(List<? extends Row> actions, byte[] tableName, ExecutorService pool,
|
||||||
|
Object[] results) throws IOException, InterruptedException {
|
||||||
|
delegate.processBatch(actions, tableName, pool, results);
|
||||||
|
}
|
||||||
|
|
||||||
|
public <R> void processBatchCallback(List<? extends Row> list, TableName tableName,
|
||||||
|
ExecutorService pool, Object[] results, Callback<R> callback) throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
delegate.processBatchCallback(list, tableName, pool, results, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
public <R> void processBatchCallback(List<? extends Row> list, byte[] tableName,
|
||||||
|
ExecutorService pool, Object[] results, Callback<R> callback) throws IOException,
|
||||||
|
InterruptedException {
|
||||||
|
delegate.processBatchCallback(list, tableName, pool, results, callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRegionCachePrefetch(TableName tableName, boolean enable) {
|
||||||
|
delegate.setRegionCachePrefetch(tableName, enable);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRegionCachePrefetch(byte[] tableName, boolean enable) {
|
||||||
|
delegate.setRegionCachePrefetch(tableName, enable);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getRegionCachePrefetch(TableName tableName) {
|
||||||
|
return delegate.getRegionCachePrefetch(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getRegionCachePrefetch(byte[] tableName) {
|
||||||
|
return delegate.getRegionCachePrefetch(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getCurrentNrHRS() throws IOException {
|
||||||
|
return delegate.getCurrentNrHRS();
|
||||||
|
}
|
||||||
|
|
||||||
|
public HTableDescriptor[] getHTableDescriptorsByTableName(List<TableName> tableNames)
|
||||||
|
throws IOException {
|
||||||
|
return delegate.getHTableDescriptorsByTableName(tableNames);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HTableDescriptor[] getHTableDescriptors(List<String> tableNames) throws IOException {
|
||||||
|
return delegate.getHTableDescriptors(tableNames);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isClosed() {
|
||||||
|
return delegate.isClosed();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clearCaches(ServerName sn) {
|
||||||
|
delegate.clearCaches(sn);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() throws IOException {
|
||||||
|
delegate.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void deleteCachedRegionLocation(HRegionLocation location) {
|
||||||
|
delegate.deleteCachedRegionLocation(location);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MasterMonitorKeepAliveConnection getKeepAliveMasterMonitorService()
|
||||||
|
throws MasterNotRunningException {
|
||||||
|
return delegate.getKeepAliveMasterMonitorService();
|
||||||
|
}
|
||||||
|
|
||||||
|
public MasterAdminKeepAliveConnection getKeepAliveMasterAdminService()
|
||||||
|
throws MasterNotRunningException {
|
||||||
|
return delegate.getKeepAliveMasterAdminService();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isDeadServer(ServerName serverName) {
|
||||||
|
return delegate.isDeadServer(serverName);
|
||||||
|
}
|
||||||
|
}
|
|
@ -39,14 +39,16 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Coprocessor;
|
import org.apache.hadoop.hbase.Coprocessor;
|
||||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.Server;
|
import org.apache.hadoop.hbase.Server;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Append;
|
import org.apache.hadoop.hbase.client.Append;
|
||||||
|
import org.apache.hadoop.hbase.client.CoprocessorHConnection;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Durability;
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.HConnection;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
import org.apache.hadoop.hbase.client.HTableInterface;
|
import org.apache.hadoop.hbase.client.HTableInterface;
|
||||||
import org.apache.hadoop.hbase.client.Increment;
|
import org.apache.hadoop.hbase.client.Increment;
|
||||||
|
@ -62,6 +64,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
|
import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
|
||||||
import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
|
import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
|
||||||
import org.apache.hadoop.hbase.util.VersionInfo;
|
import org.apache.hadoop.hbase.util.VersionInfo;
|
||||||
|
import org.apache.hadoop.io.MultipleIOException;
|
||||||
|
|
||||||
import com.google.protobuf.Service;
|
import com.google.protobuf.Service;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
|
@ -369,15 +372,33 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
||||||
|
|
||||||
private TableName tableName;
|
private TableName tableName;
|
||||||
private HTable table;
|
private HTable table;
|
||||||
|
private HConnection connection;
|
||||||
|
|
||||||
public HTableWrapper(TableName tableName) throws IOException {
|
public HTableWrapper(TableName tableName, HConnection connection) throws IOException {
|
||||||
this.tableName = tableName;
|
this.tableName = tableName;
|
||||||
this.table = new HTable(conf, tableName);
|
this.table = new HTable(tableName, connection);
|
||||||
|
this.connection = connection;
|
||||||
openTables.add(this);
|
openTables.add(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
void internalClose() throws IOException {
|
void internalClose() throws IOException {
|
||||||
|
List<IOException> exceptions = new ArrayList<IOException>(2);
|
||||||
|
try {
|
||||||
table.close();
|
table.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
exceptions.add(e);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
// have to self-manage our connection, as per the HTable contract
|
||||||
|
if (this.connection != null) {
|
||||||
|
this.connection.close();
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
exceptions.add(e);
|
||||||
|
}
|
||||||
|
if (!exceptions.isEmpty()) {
|
||||||
|
throw MultipleIOException.createIOException(exceptions);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public Configuration getConfiguration() {
|
public Configuration getConfiguration() {
|
||||||
|
@ -686,7 +707,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public HTableInterface getTable(TableName tableName) throws IOException {
|
public HTableInterface getTable(TableName tableName) throws IOException {
|
||||||
return new HTableWrapper(tableName);
|
return new HTableWrapper(tableName, CoprocessorHConnection.getConnectionForEnvironment(this));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue