HBASE-12993 Use HBase 1.0 interfaces in hbase-thrift (Solomon Duskis)
This commit is contained in:
parent
b0116398ff
commit
0f0f3f8493
|
@ -28,10 +28,12 @@ import org.apache.hadoop.hbase.ChoreService;
|
|||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.client.HTableInterface;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.security.UserProvider;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -150,6 +152,13 @@ public class ConnectionCache {
|
|||
return connInfo.connection.getTable(tableName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve a regionLocator for the table. The user should close the RegionLocator.
|
||||
*/
|
||||
public RegionLocator getRegionLocator(byte[] tableName) throws IOException {
|
||||
return getCurrentConnection().connection.getRegionLocator(TableName.valueOf(tableName));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the cached connection for the current user.
|
||||
* If none or timed out, create a new one.
|
||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
|||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
|
@ -67,13 +68,14 @@ import org.apache.hadoop.hbase.client.Delete;
|
|||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.OperationWithAttributes;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.ParseFilter;
|
||||
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||
|
@ -624,7 +626,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
|
||||
/**
|
||||
* The HBaseHandler is a glue object that connects Thrift RPC calls to the
|
||||
* HBase client API primarily defined in the HBaseAdmin and HTable objects.
|
||||
* HBase client API primarily defined in the Admin and Table objects.
|
||||
*/
|
||||
public static class HBaseHandler implements Hbase.Iface {
|
||||
protected Configuration conf;
|
||||
|
@ -637,11 +639,11 @@ public class ThriftServerRunner implements Runnable {
|
|||
|
||||
private final ConnectionCache connectionCache;
|
||||
|
||||
private static ThreadLocal<Map<String, HTable>> threadLocalTables =
|
||||
new ThreadLocal<Map<String, HTable>>() {
|
||||
private static ThreadLocal<Map<String, Table>> threadLocalTables =
|
||||
new ThreadLocal<Map<String, Table>>() {
|
||||
@Override
|
||||
protected Map<String, HTable> initialValue() {
|
||||
return new TreeMap<String, HTable>();
|
||||
protected Map<String, Table> initialValue() {
|
||||
return new TreeMap<String, Table>();
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -651,12 +653,12 @@ public class ThriftServerRunner implements Runnable {
|
|||
static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
|
||||
|
||||
/**
|
||||
* Returns a list of all the column families for a given htable.
|
||||
* Returns a list of all the column families for a given Table.
|
||||
*
|
||||
* @param table
|
||||
* @throws IOException
|
||||
*/
|
||||
byte[][] getAllColumns(HTable table) throws IOException {
|
||||
byte[][] getAllColumns(Table table) throws IOException {
|
||||
HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
|
||||
byte[][] columns = new byte[cds.length][];
|
||||
for (int i = 0; i < cds.length; i++) {
|
||||
|
@ -667,25 +669,25 @@ public class ThriftServerRunner implements Runnable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Creates and returns an HTable instance from a given table name.
|
||||
* Creates and returns a Table instance from a given table name.
|
||||
*
|
||||
* @param tableName
|
||||
* name of table
|
||||
* @return HTable object
|
||||
* @return Table object
|
||||
* @throws IOException
|
||||
* @throws IOError
|
||||
*/
|
||||
public HTable getTable(final byte[] tableName) throws
|
||||
public Table getTable(final byte[] tableName) throws
|
||||
IOException {
|
||||
String table = Bytes.toString(tableName);
|
||||
Map<String, HTable> tables = threadLocalTables.get();
|
||||
Map<String, Table> tables = threadLocalTables.get();
|
||||
if (!tables.containsKey(table)) {
|
||||
tables.put(table, (HTable)connectionCache.getTable(table));
|
||||
tables.put(table, (Table)connectionCache.getTable(table));
|
||||
}
|
||||
return tables.get(table);
|
||||
}
|
||||
|
||||
public HTable getTable(final ByteBuffer tableName) throws IOException {
|
||||
public Table getTable(final ByteBuffer tableName) throws IOException {
|
||||
return getTable(getBytes(tableName));
|
||||
}
|
||||
|
||||
|
@ -770,7 +772,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
@Override
|
||||
public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
|
||||
try {
|
||||
return HTable.isTableEnabled(this.conf, getBytes(tableName));
|
||||
return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
|
||||
} catch (IOException e) {
|
||||
LOG.warn(e.getMessage(), e);
|
||||
throw new IOError(e.getMessage());
|
||||
|
@ -824,20 +826,12 @@ public class ThriftServerRunner implements Runnable {
|
|||
@Override
|
||||
public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
|
||||
throws IOError {
|
||||
try {
|
||||
HTable table;
|
||||
try {
|
||||
table = getTable(tableName);
|
||||
} catch (TableNotFoundException ex) {
|
||||
return new ArrayList<TRegionInfo>();
|
||||
}
|
||||
Map<HRegionInfo, ServerName> regionLocations =
|
||||
table.getRegionLocations();
|
||||
try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
|
||||
List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
|
||||
List<TRegionInfo> results = new ArrayList<TRegionInfo>();
|
||||
for (Map.Entry<HRegionInfo, ServerName> entry :
|
||||
regionLocations.entrySet()) {
|
||||
HRegionInfo info = entry.getKey();
|
||||
ServerName serverName = entry.getValue();
|
||||
for (HRegionLocation regionLocation : regionLocations) {
|
||||
HRegionInfo info = regionLocation.getRegionInfo();
|
||||
ServerName serverName = regionLocation.getServerName();
|
||||
TRegionInfo region = new TRegionInfo();
|
||||
region.serverName = ByteBuffer.wrap(
|
||||
Bytes.toBytes(serverName.getHostname()));
|
||||
|
@ -888,7 +882,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
byte[] qualifier,
|
||||
Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
|
||||
try {
|
||||
HTable table = getTable(tableName);
|
||||
Table table = getTable(tableName);
|
||||
Get get = new Get(getBytes(row));
|
||||
addAttributes(get, attributes);
|
||||
if (qualifier == null) {
|
||||
|
@ -930,7 +924,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
|
||||
byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
|
||||
try {
|
||||
HTable table = getTable(tableName);
|
||||
Table table = getTable(tableName);
|
||||
Get get = new Get(getBytes(row));
|
||||
addAttributes(get, attributes);
|
||||
if (null == qualifier) {
|
||||
|
@ -973,7 +967,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
|
||||
throws IOError {
|
||||
try {
|
||||
HTable table = getTable(tableName);
|
||||
Table table = getTable(tableName);
|
||||
Get get = new Get(getBytes(row));
|
||||
addAttributes(get, attributes);
|
||||
if (null == qualifier) {
|
||||
|
@ -1021,7 +1015,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
|
||||
long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
|
||||
try {
|
||||
HTable table = getTable(tableName);
|
||||
Table table = getTable(tableName);
|
||||
if (columns == null) {
|
||||
Get get = new Get(getBytes(row));
|
||||
addAttributes(get, attributes);
|
||||
|
@ -1084,7 +1078,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
|
||||
try {
|
||||
List<Get> gets = new ArrayList<Get>(rows.size());
|
||||
HTable table = getTable(tableName);
|
||||
Table table = getTable(tableName);
|
||||
if (metrics != null) {
|
||||
metrics.incNumRowKeysInBatchGet(rows.size());
|
||||
}
|
||||
|
@ -1128,7 +1122,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
ByteBuffer column,
|
||||
long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
|
||||
try {
|
||||
HTable table = getTable(tableName);
|
||||
Table table = getTable(tableName);
|
||||
Delete delete = new Delete(getBytes(row));
|
||||
addAttributes(delete, attributes);
|
||||
byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
|
||||
|
@ -1157,7 +1151,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
ByteBuffer tableName, ByteBuffer row, long timestamp,
|
||||
Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
|
||||
try {
|
||||
HTable table = getTable(tableName);
|
||||
Table table = getTable(tableName);
|
||||
Delete delete = new Delete(getBytes(row), timestamp);
|
||||
addAttributes(delete, attributes);
|
||||
table.delete(delete);
|
||||
|
@ -1225,7 +1219,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
List<Mutation> mutations, long timestamp,
|
||||
Map<ByteBuffer, ByteBuffer> attributes)
|
||||
throws IOError, IllegalArgument {
|
||||
HTable table = null;
|
||||
Table table = null;
|
||||
try {
|
||||
table = getTable(tableName);
|
||||
Put put = new Put(getBytes(row), timestamp);
|
||||
|
@ -1327,7 +1321,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
puts.add(put);
|
||||
}
|
||||
|
||||
HTable table = null;
|
||||
Table table = null;
|
||||
try {
|
||||
table = getTable(tableName);
|
||||
if (!puts.isEmpty())
|
||||
|
@ -1359,7 +1353,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
|
||||
byte [] family, byte [] qualifier, long amount)
|
||||
throws IOError, IllegalArgument, TException {
|
||||
HTable table;
|
||||
Table table;
|
||||
try {
|
||||
table = getTable(tableName);
|
||||
return table.incrementColumnValue(
|
||||
|
@ -1417,7 +1411,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
Map<ByteBuffer, ByteBuffer> attributes)
|
||||
throws IOError {
|
||||
try {
|
||||
HTable table = getTable(tableName);
|
||||
Table table = getTable(tableName);
|
||||
Scan scan = new Scan();
|
||||
addAttributes(scan, attributes);
|
||||
if (tScan.isSetStartRow()) {
|
||||
|
@ -1465,7 +1459,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
List<ByteBuffer> columns,
|
||||
Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
|
||||
try {
|
||||
HTable table = getTable(tableName);
|
||||
Table table = getTable(tableName);
|
||||
Scan scan = new Scan(getBytes(startRow));
|
||||
addAttributes(scan, attributes);
|
||||
if(columns != null && columns.size() != 0) {
|
||||
|
@ -1491,7 +1485,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
Map<ByteBuffer, ByteBuffer> attributes)
|
||||
throws IOError, TException {
|
||||
try {
|
||||
HTable table = getTable(tableName);
|
||||
Table table = getTable(tableName);
|
||||
Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
|
||||
addAttributes(scan, attributes);
|
||||
if(columns != null && columns.size() != 0) {
|
||||
|
@ -1518,7 +1512,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
Map<ByteBuffer, ByteBuffer> attributes)
|
||||
throws IOError, TException {
|
||||
try {
|
||||
HTable table = getTable(tableName);
|
||||
Table table = getTable(tableName);
|
||||
Scan scan = new Scan(getBytes(startAndPrefix));
|
||||
addAttributes(scan, attributes);
|
||||
Filter f = new WhileMatchFilter(
|
||||
|
@ -1546,7 +1540,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
List<ByteBuffer> columns, long timestamp,
|
||||
Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
|
||||
try {
|
||||
HTable table = getTable(tableName);
|
||||
Table table = getTable(tableName);
|
||||
Scan scan = new Scan(getBytes(startRow));
|
||||
addAttributes(scan, attributes);
|
||||
scan.setTimeRange(0, timestamp);
|
||||
|
@ -1573,7 +1567,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
Map<ByteBuffer, ByteBuffer> attributes)
|
||||
throws IOError, TException {
|
||||
try {
|
||||
HTable table = getTable(tableName);
|
||||
Table table = getTable(tableName);
|
||||
Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
|
||||
addAttributes(scan, attributes);
|
||||
scan.setTimeRange(0, timestamp);
|
||||
|
@ -1602,7 +1596,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
TreeMap<ByteBuffer, ColumnDescriptor> columns =
|
||||
new TreeMap<ByteBuffer, ColumnDescriptor>();
|
||||
|
||||
HTable table = getTable(tableName);
|
||||
Table table = getTable(tableName);
|
||||
HTableDescriptor desc = table.getTableDescriptor();
|
||||
|
||||
for (HColumnDescriptor e : desc.getFamilies()) {
|
||||
|
@ -1621,8 +1615,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
|
||||
ByteBuffer family) throws IOError {
|
||||
try {
|
||||
HTable table = getTable(getBytes(tableName));
|
||||
Result result = table.getRowOrBefore(getBytes(row), getBytes(family));
|
||||
Result result = getRowOrBefore(getBytes(tableName), getBytes(row), getBytes(family));
|
||||
return ThriftUtilities.cellFromHBase(result.rawCells());
|
||||
} catch (IOException e) {
|
||||
LOG.warn(e.getMessage(), e);
|
||||
|
@ -1633,10 +1626,9 @@ public class ThriftServerRunner implements Runnable {
|
|||
@Override
|
||||
public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
|
||||
try {
|
||||
HTable table = getTable(TableName.META_TABLE_NAME.getName());
|
||||
byte[] row = getBytes(searchRow);
|
||||
Result startRowResult = table.getRowOrBefore(
|
||||
row, HConstants.CATALOG_FAMILY);
|
||||
Result startRowResult =
|
||||
getRowOrBefore(TableName.META_TABLE_NAME.getName(), row, HConstants.CATALOG_FAMILY);
|
||||
|
||||
if (startRowResult == null) {
|
||||
throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
|
||||
|
@ -1670,6 +1662,18 @@ public class ThriftServerRunner implements Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
private Result getRowOrBefore(byte[] tableName, byte[] row, byte[] family) throws IOException {
|
||||
Scan scan = new Scan(row);
|
||||
scan.setReversed(true);
|
||||
scan.addFamily(family);
|
||||
scan.setStartRow(row);
|
||||
|
||||
Table table = getTable(tableName);
|
||||
try (ResultScanner scanner = table.getScanner(scan)) {
|
||||
return scanner.next();
|
||||
}
|
||||
}
|
||||
|
||||
private void initMetrics(ThriftMetrics metrics) {
|
||||
this.metrics = metrics;
|
||||
}
|
||||
|
@ -1687,7 +1691,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
}
|
||||
|
||||
try {
|
||||
HTable table = getTable(tincrement.getTable());
|
||||
Table table = getTable(tincrement.getTable());
|
||||
Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
|
||||
table.increment(inc);
|
||||
} catch (IOException e) {
|
||||
|
@ -1714,7 +1718,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
}
|
||||
|
||||
try {
|
||||
HTable table = getTable(tappend.getTable());
|
||||
Table table = getTable(tappend.getTable());
|
||||
Append append = ThriftUtilities.appendFromThrift(tappend);
|
||||
Result result = table.append(append);
|
||||
return ThriftUtilities.cellFromHBase(result.rawCells());
|
||||
|
@ -1744,7 +1748,7 @@ public class ThriftServerRunner implements Runnable {
|
|||
throw new IllegalArgument(e.getMessage());
|
||||
}
|
||||
|
||||
HTable table = null;
|
||||
Table table = null;
|
||||
try {
|
||||
table = getTable(tableName);
|
||||
byte[][] famAndQf = KeyValue.parseColumn(getBytes(column));
|
||||
|
|
Loading…
Reference in New Issue