HBASE-19043 Purge TableWrapper and CoprocessorHConnnection

Also purge Coprocessor#getTable... Let Coprocessors manage their
Table Connections in hbase2.0.0.
This commit is contained in:
Michael Stack 2017-10-18 21:45:39 -07:00
parent d59ed234ef
commit d798541261
9 changed files with 70 additions and 940 deletions

View File

@ -20,11 +20,9 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Table;
/**
* Coprocessor environment state.
@ -50,19 +48,6 @@ public interface CoprocessorEnvironment<C extends Coprocessor> {
/** @return the configuration */
Configuration getConfiguration();
/**
* @return an interface for accessing the given table
* @throws IOException
*/
Table getTable(TableName tableName) throws IOException;
/**
* @return an interface for accessing the given table using the passed executor to run batch
* operations
* @throws IOException
*/
Table getTable(TableName tableName, ExecutorService service) throws IOException;
/**
* @return the classloader for the loaded coprocessor instance
*/

View File

@ -1,105 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.CoprocessorRegionServerServices;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.UserProvider;
/**
* Connection to an HTable from within a Coprocessor. We can do some nice tricks since we know we
* are on a regionserver, for instance skipping the full serialization/deserialization of objects
* when talking to the server.
* <p>
* You should not use this class from any client - its an internal class meant for use by the
* coprocessor framework.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class CoprocessorHConnection extends ConnectionImplementation {
/**
* Create a {@link ClusterConnection} based on the environment in which we are running the
* coprocessor. The {@link ClusterConnection} must be externally cleaned up
* (we bypass the usual HTable cleanup mechanisms since we own everything).
* @param env environment hosting the {@link ClusterConnection}
* @return instance of {@link ClusterConnection}.
* @throws IOException if we cannot create the connection
*/
public static ClusterConnection getConnectionForEnvironment(CoprocessorEnvironment env)
throws IOException {
// this bit is a little hacky - just trying to get it going for the moment
if (env instanceof RegionCoprocessorEnvironment) {
RegionCoprocessorEnvironment e = (RegionCoprocessorEnvironment) env;
CoprocessorRegionServerServices services = e.getCoprocessorRegionServerServices();
if (services instanceof HRegionServer) {
return new CoprocessorHConnection((HRegionServer) services);
}
}
return (ClusterConnection) ConnectionFactory.createConnection(env.getConfiguration());
}
private final ServerName serverName;
private final HRegionServer server;
/**
* Constructor that uses server configuration
* @param server
* @throws IOException if we cannot create the connection
*/
public CoprocessorHConnection(HRegionServer server) throws IOException {
this(server.getConfiguration(), server);
}
/**
* Constructor that accepts custom configuration
* @param conf
* @param server
* @throws IOException if we cannot create the connection
*/
public CoprocessorHConnection(Configuration conf, HRegionServer server) throws IOException {
super(conf, null, UserProvider.instantiate(conf).getCurrent());
this.server = server;
this.serverName = server.getServerName();
}
@Override
public org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.
ClientService.BlockingInterface getClient(ServerName serverName) throws IOException {
// client is trying to reach off-server, so we can't do anything special
if (!this.serverName.equals(serverName)) {
return super.getClient(serverName);
}
// the client is attempting to write to the same regionserver, we can short-circuit to our
// local regionserver
return server.getRSRpcServices();
}
@Override
public NonceGenerator getNonceGenerator() {
return ConnectionUtils.NO_NONCE_GENERATOR; // don't use nonces for coprocessor connection
}
}

View File

@ -1,346 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.io.MultipleIOException;
/**
* A wrapper for HTable. Can be used to restrict privilege.
*
* Currently it just helps to track tables opened by a Coprocessor and
* facilitate close of them if it is aborted.
*
* We also disallow row locking.
*
* There is nothing now that will stop a coprocessor from using HTable
* objects directly instead of this API, but in the future we intend to
* analyze coprocessor implementations as they are loaded and reject those
* which attempt to use objects and methods outside the Environment
* sandbox.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Stable
public final class HTableWrapper implements Table {
private final Table table;
private final ClusterConnection connection;
private final List<Table> openTables;
/**
* @param openTables External list of tables used for tracking wrappers.
* @throws IOException
*/
public static Table createWrapper(List<Table> openTables,
TableName tableName, BaseEnvironment env, ExecutorService pool) throws IOException {
return new HTableWrapper(openTables, tableName,
CoprocessorHConnection.getConnectionForEnvironment(env), pool);
}
private HTableWrapper(List<Table> openTables, TableName tableName,
ClusterConnection connection, ExecutorService pool)
throws IOException {
this.table = connection.getTable(tableName, pool);
this.connection = connection;
this.openTables = openTables;
this.openTables.add(this);
}
public void internalClose() throws IOException {
List<IOException> exceptions = new ArrayList<>(2);
try {
table.close();
} catch (IOException e) {
exceptions.add(e);
}
try {
// have to self-manage our connection, as per the HTable contract
if (this.connection != null) {
this.connection.close();
}
} catch (IOException e) {
exceptions.add(e);
}
if (!exceptions.isEmpty()) {
throw MultipleIOException.createIOException(exceptions);
}
}
public Configuration getConfiguration() {
return table.getConfiguration();
}
public void close() throws IOException {
try {
internalClose();
} finally {
openTables.remove(this);
}
}
public Result get(Get get) throws IOException {
return table.get(get);
}
public boolean exists(Get get) throws IOException {
return table.exists(get);
}
public boolean[] existsAll(List<Get> gets) throws IOException{
return table.existsAll(gets);
}
/**
* @deprecated Use {@link #existsAll(java.util.List)} instead. since 2.0. remove in 3.0
*/
@Deprecated
public Boolean[] exists(List<Get> gets) throws IOException {
// Do convertion.
boolean [] exists = table.existsAll(gets);
if (exists == null) {
return null;
}
Boolean [] results = new Boolean [exists.length];
for (int i = 0; i < exists.length; i++) {
results[i] = exists[i]? Boolean.TRUE: Boolean.FALSE;
}
return results;
}
public void put(Put put) throws IOException {
table.put(put);
}
public void put(List<Put> puts) throws IOException {
table.put(puts);
}
public void delete(Delete delete) throws IOException {
table.delete(delete);
}
public void delete(List<Delete> deletes) throws IOException {
table.delete(deletes);
}
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
byte[] value, Put put) throws IOException {
return table.checkAndPut(row, family, qualifier, value, put);
}
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, Put put) throws IOException {
return table.checkAndPut(row, family, qualifier, compareOp, value, put);
}
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
CompareOperator op, byte[] value, Put put) throws IOException {
return table.checkAndPut(row, family, qualifier, op, value, put);
}
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
byte[] value, Delete delete) throws IOException {
return table.checkAndDelete(row, family, qualifier, value, delete);
}
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, Delete delete) throws IOException {
return table.checkAndDelete(row, family, qualifier, compareOp, value, delete);
}
public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
CompareOperator op, byte[] value, Delete delete) throws IOException {
return table.checkAndDelete(row, family, qualifier, op, value, delete);
}
public long incrementColumnValue(byte[] row, byte[] family,
byte[] qualifier, long amount) throws IOException {
return table.incrementColumnValue(row, family, qualifier, amount);
}
public long incrementColumnValue(byte[] row, byte[] family,
byte[] qualifier, long amount, Durability durability)
throws IOException {
return table.incrementColumnValue(row, family, qualifier, amount,
durability);
}
@Override
public Result append(Append append) throws IOException {
return table.append(append);
}
@Override
public Result increment(Increment increment) throws IOException {
return table.increment(increment);
}
public ResultScanner getScanner(Scan scan) throws IOException {
return table.getScanner(scan);
}
public ResultScanner getScanner(byte[] family) throws IOException {
return table.getScanner(family);
}
public ResultScanner getScanner(byte[] family, byte[] qualifier)
throws IOException {
return table.getScanner(family, qualifier);
}
public HTableDescriptor getTableDescriptor() throws IOException {
return table.getTableDescriptor();
}
@Override
public TableDescriptor getDescriptor() throws IOException {
return table.getDescriptor();
}
@Override
public TableName getName() {
return table.getName();
}
@Override
public void batch(List<? extends Row> actions, Object[] results)
throws IOException, InterruptedException {
table.batch(actions, results);
}
@Override
public <R> void batchCallback(List<? extends Row> actions, Object[] results,
Batch.Callback<R> callback) throws IOException, InterruptedException {
table.batchCallback(actions, results, callback);
}
@Override
public Result[] get(List<Get> gets) throws IOException {
return table.get(gets);
}
@Override
public CoprocessorRpcChannel coprocessorService(byte[] row) {
return table.coprocessorService(row);
}
@Override
public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service,
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
throws ServiceException, Throwable {
return table.coprocessorService(service, startKey, endKey, callable);
}
@Override
public <T extends Service, R> void coprocessorService(Class<T> service,
byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback)
throws ServiceException, Throwable {
table.coprocessorService(service, startKey, endKey, callable, callback);
}
@Override
public void mutateRow(RowMutations rm) throws IOException {
table.mutateRow(rm);
}
@Override
public <R extends Message> Map<byte[], R> batchCoprocessorService(
MethodDescriptor methodDescriptor, Message request, byte[] startKey, byte[] endKey,
R responsePrototype) throws ServiceException, Throwable {
return table.batchCoprocessorService(methodDescriptor, request, startKey, endKey,
responsePrototype);
}
@Override
public <R extends Message> void batchCoprocessorService(MethodDescriptor methodDescriptor,
Message request, byte[] startKey, byte[] endKey, R responsePrototype, Callback<R> callback)
throws ServiceException, Throwable {
table.batchCoprocessorService(methodDescriptor, request, startKey, endKey, responsePrototype,
callback);
}
@Override
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, RowMutations rm) throws IOException {
return table.checkAndMutate(row, family, qualifier, compareOp, value, rm);
}
@Override
public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareOperator op, byte[] value, RowMutations rm)
throws IOException {
return table.checkAndMutate(row, family, qualifier, op, value, rm);
}
@Override
public void setOperationTimeout(int operationTimeout) {
table.setOperationTimeout(operationTimeout);
}
@Override
public int getOperationTimeout() {
return table.getOperationTimeout();
}
@Override
@Deprecated
public void setRpcTimeout(int rpcTimeout) {
table.setRpcTimeout(rpcTimeout);
}
@Override
public void setWriteRpcTimeout(int writeRpcTimeout) { table.setWriteRpcTimeout(writeRpcTimeout); }
@Override
public void setReadRpcTimeout(int readRpcTimeout) { table.setReadRpcTimeout(readRpcTimeout); }
@Override
@Deprecated
public int getRpcTimeout() {
return table.getRpcTimeout();
}
@Override
public int getWriteRpcTimeout() { return table.getWriteRpcTimeout(); }
@Override
public int getReadRpcTimeout() { return table.getReadRpcTimeout(); }
}

View File

@ -24,17 +24,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HTableWrapper;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.yetus.audience.InterfaceAudience;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
/**
* Encapsulation of the environment of each coprocessor
@ -49,9 +42,6 @@ public class BaseEnvironment<C extends Coprocessor> implements CoprocessorEnviro
protected int priority = Coprocessor.PRIORITY_USER;
/** Current coprocessor state */
Coprocessor.State state = Coprocessor.State.UNINSTALLED;
/** Accounting for tables opened by the coprocessor */
protected List<Table> openTables =
Collections.synchronizedList(new ArrayList<Table>());
private int seq;
private Configuration conf;
private ClassLoader classLoader;
@ -112,18 +102,6 @@ public class BaseEnvironment<C extends Coprocessor> implements CoprocessorEnviro
LOG.warn("Not stopping coprocessor "+impl.getClass().getName()+
" because not active (state="+state.toString()+")");
}
synchronized (openTables) {
// clean up any table references
for (Table table: openTables) {
try {
((HTableWrapper)table).internalClose();
} catch (IOException e) {
// nothing can be done here
LOG.warn("Failed to close " +
table.getName(), e);
}
}
}
}
@Override
@ -162,26 +140,4 @@ public class BaseEnvironment<C extends Coprocessor> implements CoprocessorEnviro
public Configuration getConfiguration() {
return conf;
}
/**
* Open a table from within the Coprocessor environment
* @param tableName the table name
* @return an interface for manipulating the table
* @exception IOException Exception
*/
@Override
public Table getTable(TableName tableName) throws IOException {
return this.getTable(tableName, null);
}
/**
* Open a table from within the Coprocessor environment
* @param tableName the table name
* @return an interface for manipulating the table
* @exception IOException Exception
*/
@Override
public Table getTable(TableName tableName, ExecutorService pool) throws IOException {
return HTableWrapper.createWrapper(openTables, tableName, this, pool);
}
}

View File

@ -277,18 +277,26 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
}
ZKPermissionWatcher zkw = this.authManager.getZKPermissionWatcher();
Configuration conf = regionEnv.getConfiguration();
byte [] currentEntry = null;
// TODO: Here we are already on the ACL region. (And it is single
// region) We can even just get the region from the env and do get
// directly. The short circuit connection would avoid the RPC overhead
// so no socket communication, req write/read .. But we have the PB
// to and fro conversion overhead. get req is converted to PB req
// and results are converted to PB results 1st and then to POJOs
// again. We could have avoided such at least in ACL table context..
try (Table t = e.getCoprocessorRegionServerServices().getConnection().
getTable(AccessControlLists.ACL_TABLE_NAME)) {
for (byte[] entry : entries) {
try {
try (Table t = regionEnv.getTable(AccessControlLists.ACL_TABLE_NAME)) {
currentEntry = entry;
ListMultimap<String, TablePermission> perms =
AccessControlLists.getPermissions(conf, entry, t);
byte[] serialized = AccessControlLists.writePermissionsAsBytes(perms, conf);
zkw.writeToZookeeper(entry, serialized);
}
} catch(IOException ex) {
LOG.error("Failed updating permissions mirror for '" + Bytes.toString(entry) + "'",
ex);
}
LOG.error("Failed updating permissions mirror for '" +
(currentEntry == null? "null": Bytes.toString(currentEntry)) + "'", ex);
}
}
@ -1072,8 +1080,11 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
try (Table table = c.getEnvironment().getMasterServices().getConnection().
getTable(AccessControlLists.ACL_TABLE_NAME)) {
AccessControlLists.addUserPermission(c.getEnvironment().getConfiguration(),
userperm, c.getEnvironment().getTable(AccessControlLists.ACL_TABLE_NAME));
userperm, table);
}
return null;
}
});
@ -1095,8 +1106,10 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
AccessControlLists.removeTablePermissions(conf, tableName,
c.getEnvironment().getTable(AccessControlLists.ACL_TABLE_NAME));
try (Table table = c.getEnvironment().getMasterServices().getConnection().
getTable(AccessControlLists.ACL_TABLE_NAME)) {
AccessControlLists.removeTablePermissions(conf, tableName, table);
}
return null;
}
});
@ -1132,8 +1145,10 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
List<UserPermission> perms = tableAcls.get(tableName);
if (perms != null) {
for (UserPermission perm : perms) {
AccessControlLists.addUserPermission(conf, perm,
ctx.getEnvironment().getTable(AccessControlLists.ACL_TABLE_NAME));
try (Table table = ctx.getEnvironment().getMasterServices().getConnection().
getTable(AccessControlLists.ACL_TABLE_NAME)) {
AccessControlLists.addUserPermission(conf, perm, table);
}
}
}
tableAcls.remove(tableName);
@ -1161,8 +1176,10 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
public Void run() throws Exception {
UserPermission userperm = new UserPermission(Bytes.toBytes(owner),
htd.getTableName(), null, Action.values());
AccessControlLists.addUserPermission(conf, userperm,
c.getEnvironment().getTable(AccessControlLists.ACL_TABLE_NAME));
try (Table table = c.getEnvironment().getMasterServices().getConnection().
getTable(AccessControlLists.ACL_TABLE_NAME)) {
AccessControlLists.addUserPermission(conf, userperm, table);
}
return null;
}
});
@ -1198,8 +1215,10 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
AccessControlLists.removeTablePermissions(conf, tableName, columnFamily,
ctx.getEnvironment().getTable(AccessControlLists.ACL_TABLE_NAME));
try (Table table = ctx.getEnvironment().getMasterServices().getConnection().
getTable(AccessControlLists.ACL_TABLE_NAME)) {
AccessControlLists.removeTablePermissions(conf, tableName, columnFamily, table);
}
return null;
}
});
@ -1444,8 +1463,10 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
AccessControlLists.removeNamespacePermissions(conf, namespace,
ctx.getEnvironment().getTable(AccessControlLists.ACL_TABLE_NAME));
try (Table table = ctx.getEnvironment().getMasterServices().getConnection().
getTable(AccessControlLists.ACL_TABLE_NAME)) {
AccessControlLists.removeNamespacePermissions(conf, namespace, table);
}
return null;
}
});
@ -2287,8 +2308,12 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm,
regionEnv.getTable(AccessControlLists.ACL_TABLE_NAME), request.getMergeExistingPermissions());
// regionEnv is set at #start. Hopefully not null at this point.
try (Table table = regionEnv.getCoprocessorRegionServerServices().getConnection().
getTable(AccessControlLists.ACL_TABLE_NAME)) {
AccessControlLists.addUserPermission(regionEnv.getConfiguration(), perm, table,
request.getMergeExistingPermissions());
}
return null;
}
});
@ -2340,8 +2365,11 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
User.runAsLoginUser(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm,
regionEnv.getTable(AccessControlLists.ACL_TABLE_NAME));
// regionEnv is set at #start. Hopefully not null here.
try (Table table = regionEnv.getCoprocessorRegionServerServices().getConnection().
getTable(AccessControlLists.ACL_TABLE_NAME)) {
AccessControlLists.removeUserPermission(regionEnv.getConfiguration(), perm, table);
}
return null;
}
});

View File

@ -18,15 +18,12 @@
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Assert;
import static org.junit.Assert.assertEquals;
@ -106,16 +103,6 @@ public class TestCoprocessorHost {
return cpHostConf;
}
@Override
public Table getTable(TableName tableName) throws IOException {
return null;
}
@Override
public Table getTable(TableName tableName, ExecutorService service) throws IOException {
return null;
}
@Override
public void startup() throws IOException {}

View File

@ -1,362 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Tests class {@link org.apache.hadoop.hbase.client.HTableWrapper}
* by invoking its methods and briefly asserting the result is reasonable.
*/
@Category({CoprocessorTests.class, MediumTests.class})
public class TestHTableWrapper {
private static final HBaseTestingUtility util = new HBaseTestingUtility();
private static final TableName TEST_TABLE = TableName.valueOf("test");
private static final byte[] TEST_FAMILY = Bytes.toBytes("f1");
private static final byte[] ROW_A = Bytes.toBytes("aaa");
private static final byte[] ROW_B = Bytes.toBytes("bbb");
private static final byte[] ROW_C = Bytes.toBytes("ccc");
private static final byte[] ROW_D = Bytes.toBytes("ddd");
private static final byte[] ROW_E = Bytes.toBytes("eee");
private static final byte[] qualifierCol1 = Bytes.toBytes("col1");
private static final byte[] bytes1 = Bytes.toBytes(1);
private static final byte[] bytes2 = Bytes.toBytes(2);
private static final byte[] bytes3 = Bytes.toBytes(3);
private static final byte[] bytes4 = Bytes.toBytes(4);
private static final byte[] bytes5 = Bytes.toBytes(5);
public static class DummyRegionObserver implements MasterCoprocessor, MasterObserver {
@Override
public Optional<MasterObserver> getMasterObserver() {
return Optional.of(this);
}
}
private Table hTableInterface;
private Table table;
@BeforeClass
public static void setupBeforeClass() throws Exception {
util.startMiniCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();
}
@Before
public void before() throws Exception {
table = util.createTable(TEST_TABLE, TEST_FAMILY);
Put puta = new Put(ROW_A);
puta.addColumn(TEST_FAMILY, qualifierCol1, bytes1);
table.put(puta);
Put putb = new Put(ROW_B);
putb.addColumn(TEST_FAMILY, qualifierCol1, bytes2);
table.put(putb);
Put putc = new Put(ROW_C);
putc.addColumn(TEST_FAMILY, qualifierCol1, bytes3);
table.put(putc);
}
@After
public void after() throws Exception {
try {
if (table != null) {
table.close();
}
} finally {
util.deleteTable(TEST_TABLE);
}
}
@Test
public void testHTableInterfaceMethods() throws Exception {
Configuration conf = util.getConfiguration();
MasterCoprocessorHost cpHost = util.getMiniHBaseCluster().getMaster().getMasterCoprocessorHost();
Class<? extends MasterCoprocessor> implClazz = DummyRegionObserver.class;
cpHost.load(implClazz, Coprocessor.PRIORITY_HIGHEST, conf);
CoprocessorEnvironment env = cpHost.findCoprocessorEnvironment(implClazz.getName());
assertEquals(Coprocessor.VERSION, env.getVersion());
assertEquals(VersionInfo.getVersion(), env.getHBaseVersion());
hTableInterface = env.getTable(TEST_TABLE);
checkHTableInterfaceMethods();
cpHost.shutdown((MasterCoprocessorEnvironment) env);
}
private void checkHTableInterfaceMethods() throws Exception {
checkConf();
checkNameAndDescriptor();
checkExists();
checkAppend();
checkPutsAndDeletes();
checkCheckAndPut();
checkCheckAndDelete();
checkIncrementColumnValue();
checkIncrement();
checkBatch();
checkCoprocessorService();
checkMutateRow();
checkResultScanner();
hTableInterface.close();
}
private void checkConf() {
Configuration confExpected = util.getConfiguration();
Configuration confActual = hTableInterface.getConfiguration();
assertTrue(confExpected == confActual);
}
private void checkNameAndDescriptor() throws IOException {
assertEquals(TEST_TABLE, hTableInterface.getName());
assertEquals(table.getTableDescriptor(), hTableInterface.getTableDescriptor());
}
private void checkExists() throws IOException {
boolean ex = hTableInterface.exists(new Get(ROW_A).addColumn(TEST_FAMILY, qualifierCol1));
assertTrue(ex);
boolean[] exArray = hTableInterface.existsAll(Arrays.asList(new Get[]{
new Get(ROW_A).addColumn(TEST_FAMILY, qualifierCol1),
new Get(ROW_B).addColumn(TEST_FAMILY, qualifierCol1),
new Get(ROW_C).addColumn(TEST_FAMILY, qualifierCol1),
new Get(Bytes.toBytes("does not exist")).addColumn(TEST_FAMILY, qualifierCol1),}));
assertTrue(Arrays.equals(new boolean[]{true, true, true, false}, exArray));
}
private void checkAppend() throws IOException {
final byte[] appendValue = Bytes.toBytes("append");
Append append = new Append(qualifierCol1).addColumn(TEST_FAMILY, qualifierCol1, appendValue);
Result appendResult = hTableInterface.append(append);
byte[] appendedRow = appendResult.getRow();
checkRowValue(appendedRow, appendValue);
}
private void checkPutsAndDeletes() throws IOException {
// put:
Put putD = new Put(ROW_D).addColumn(TEST_FAMILY, qualifierCol1, bytes2);
hTableInterface.put(putD);
checkRowValue(ROW_D, bytes2);
// delete:
Delete delete = new Delete(ROW_D);
hTableInterface.delete(delete);
checkRowValue(ROW_D, null);
// multiple puts:
Put[] puts = new Put[] {new Put(ROW_D).addColumn(TEST_FAMILY, qualifierCol1, bytes2),
new Put(ROW_E).addColumn(TEST_FAMILY, qualifierCol1, bytes3)};
hTableInterface.put(Arrays.asList(puts));
checkRowsValues(new byte[][] { ROW_D, ROW_E }, new byte[][] { bytes2, bytes3 });
// multiple deletes:
Delete[] deletes = new Delete[] { new Delete(ROW_D), new Delete(ROW_E) };
hTableInterface.delete(new ArrayList<>(Arrays.asList(deletes)));
checkRowsValues(new byte[][] { ROW_D, ROW_E }, new byte[][] { null, null });
}
private void checkCheckAndPut() throws IOException {
Put putC = new Put(ROW_C).addColumn(TEST_FAMILY, qualifierCol1, bytes5);
assertFalse(hTableInterface.checkAndPut(ROW_C, TEST_FAMILY, qualifierCol1, /* expect */bytes4,
putC/* newValue */));
assertTrue(hTableInterface.checkAndPut(ROW_C, TEST_FAMILY, qualifierCol1, /* expect */bytes3,
putC/* newValue */));
checkRowValue(ROW_C, bytes5);
}
private void checkCheckAndDelete() throws IOException {
Delete delete = new Delete(ROW_C);
assertFalse(hTableInterface.checkAndDelete(ROW_C, TEST_FAMILY, qualifierCol1, bytes4, delete));
assertTrue(hTableInterface.checkAndDelete(ROW_C, TEST_FAMILY, qualifierCol1, bytes5, delete));
checkRowValue(ROW_C, null);
}
private void checkIncrementColumnValue() throws IOException {
hTableInterface.put(new Put(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, Bytes.toBytes(1L)));
checkRowValue(ROW_A, Bytes.toBytes(1L));
final long newVal = hTableInterface
.incrementColumnValue(ROW_A, TEST_FAMILY, qualifierCol1, 10L);
assertEquals(11L, newVal);
checkRowValue(ROW_A, Bytes.toBytes(11L));
final long newVal2 = hTableInterface.incrementColumnValue(ROW_A, TEST_FAMILY, qualifierCol1,
-10L, Durability.SYNC_WAL);
assertEquals(1L, newVal2);
checkRowValue(ROW_A, Bytes.toBytes(1L));
}
private void checkIncrement() throws IOException {
hTableInterface.increment(new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, -5L));
checkRowValue(ROW_A, Bytes.toBytes(-4L));
}
private void checkBatch() throws IOException, InterruptedException {
List<Row> actions =
Arrays.asList(new Row[] { new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L),
new Increment(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, 2L) });
Object[] results3 = new Object[actions.size()];
Object[] results1 = results3;
hTableInterface.batch(actions, results1);
assertEquals(2, results1.length);
for (Object r2 : results1) {
assertTrue(r2 instanceof Result);
}
checkRowValue(ROW_A, Bytes.toBytes(0L));
Object[] results2 = new Result[2];
hTableInterface.batch(
actions, results2);
for (Object r2 : results2) {
assertTrue(r2 instanceof Result);
}
checkRowValue(ROW_A, Bytes.toBytes(4L));
// with callbacks:
final long[] updateCounter = new long[] { 0L };
hTableInterface.batchCallback(actions, results3, new Batch.Callback<Result>() {
@Override
public void update(byte[] region, byte[] row, Result result) {
updateCounter[0]++;
}
});
assertEquals(2, updateCounter[0]);
assertEquals(2, results3.length);
for (Object r3 : results3) {
assertTrue(r3 instanceof Result);
}
checkRowValue(ROW_A, Bytes.toBytes(8L));
Object[] results4 = new Result[2];
updateCounter[0] = 0L;
hTableInterface.batchCallback(
actions, results4,
new Batch.Callback<Result>() {
@Override
public void update(byte[] region, byte[] row, Result result) {
updateCounter[0]++;
}
});
assertEquals(2, updateCounter[0]);
for (Object r2 : results4) {
assertTrue(r2 instanceof Result);
}
checkRowValue(ROW_A, Bytes.toBytes(12L));
}
private void checkCoprocessorService() {
CoprocessorRpcChannel crc = hTableInterface.coprocessorService(ROW_A);
assertNotNull(crc);
}
private void checkMutateRow() throws IOException {
Put put = new Put(ROW_A).addColumn(TEST_FAMILY, qualifierCol1, bytes1);
RowMutations rowMutations = new RowMutations(ROW_A);
rowMutations.add(put);
hTableInterface.mutateRow(rowMutations);
checkRowValue(ROW_A, bytes1);
}
private void checkResultScanner() throws IOException {
ResultScanner resultScanner = hTableInterface.getScanner(TEST_FAMILY);
Result[] results = resultScanner.next(10);
assertEquals(3, results.length);
resultScanner = hTableInterface.getScanner(TEST_FAMILY, qualifierCol1);
results = resultScanner.next(10);
assertEquals(3, results.length);
resultScanner = hTableInterface.getScanner(new Scan(ROW_A, ROW_C));
results = resultScanner.next(10);
assertEquals(2, results.length);
}
private void checkRowValue(byte[] row, byte[] expectedValue) throws IOException {
Get get = new Get(row).addColumn(TEST_FAMILY, qualifierCol1);
Result result = hTableInterface.get(get);
byte[] actualValue = result.getValue(TEST_FAMILY, qualifierCol1);
assertArrayEquals(expectedValue, actualValue);
}
private void checkRowsValues(byte[][] rows, byte[][] expectedValues) throws IOException {
if (rows.length != expectedValues.length) {
throw new IllegalArgumentException();
}
Get[] gets = new Get[rows.length];
for (int i = 0; i < gets.length; i++) {
gets[i] = new Get(rows[i]).addColumn(TEST_FAMILY, qualifierCol1);
}
Result[] results = hTableInterface.get(Arrays.asList(gets));
for (int i = 0; i < expectedValues.length; i++) {
byte[] actualValue = results[i].getValue(TEST_FAMILY, qualifierCol1);
assertArrayEquals(expectedValues[i], actualValue);
}
}
}

View File

@ -74,10 +74,11 @@ public class TestOpenTableInCoprocessor {
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
final WALEdit edit, final Durability durability) throws IOException {
Table table = e.getEnvironment().getTable(otherTable);
try (Table table = e.getEnvironment().getCoprocessorRegionServerServices().getConnection().
getTable(otherTable)) {
table.put(put);
completed[0] = true;
table.close();
}
}
}
@ -111,7 +112,8 @@ public class TestOpenTableInCoprocessor {
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
final WALEdit edit, final Durability durability) throws IOException {
Table table = e.getEnvironment().getTable(otherTable, getPool());
try (Table table = e.getEnvironment().getCoprocessorRegionServerServices().
getConnection().getTable(otherTable, getPool())) {
Put p = new Put(new byte[]{'a'});
p.addColumn(family, null, new byte[]{'a'});
try {
@ -120,7 +122,7 @@ public class TestOpenTableInCoprocessor {
throw new IOException(e1);
}
completedWithPool[0] = true;
table.close();
}
}
}

View File

@ -31,7 +31,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -39,19 +38,15 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
import org.apache.hadoop.hbase.ipc.NettyRpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer;
@ -303,16 +298,6 @@ public class TestTokenAuthentication {
@Override
public Configuration getConfiguration() { return conf; }
@Override
public Table getTable(TableName tableName) throws IOException
{ return null; }
@Override
public Table getTable(TableName tableName, ExecutorService service)
throws IOException {
return null;
}
@Override
public ClassLoader getClassLoader() {
return Thread.currentThread().getContextClassLoader();