From c0d127fcb2c92db7760bd37b69bfbe7585e35dcd Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Tue, 17 Jan 2012 17:49:34 +0000 Subject: [PATCH] HBASE-5201 Add new files git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1232506 13f79535-47bb-0310-9956-ffa450edef68 --- .../thrift/HThreadedSelectorServerArgs.java | 96 ++ .../hbase/thrift/ThriftServerRunner.java | 1237 +++++++++++++++++ 2 files changed, 1333 insertions(+) create mode 100644 src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java create mode 100644 src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java diff --git a/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java b/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java new file mode 100644 index 00000000000..494f5a53c93 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.hbase.thrift; + +import org.apache.hadoop.conf.Configuration; +import org.apache.thrift.server.TThreadedSelectorServer; +import org.apache.thrift.transport.TNonblockingServerTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A TThreadedSelectorServer.Args that reads hadoop configuration + */ +public class HThreadedSelectorServerArgs extends TThreadedSelectorServer.Args { + + private static final Logger LOG = + LoggerFactory.getLogger(TThreadedSelectorServer.class); + + /** + * Number of selector threads for reading and writing socket + */ + public static final String SELECTOR_THREADS_CONF_KEY = + "hbase.thrift.selector.threads"; + + /** + * Number fo threads for processing the thrift calls + */ + public static final String WORKER_THREADS_CONF_KEY = + "hbase.thrift.worker.threads"; + + /** + * Time to wait for server to stop gracefully + */ + public static final String STOP_TIMEOUT_CONF_KEY = + "hbase.thrift.stop.timeout.seconds"; + + /** + * Maximum number of accepted elements per selector + */ + public static final String ACCEPT_QUEUE_SIZE_PER_THREAD_CONF_KEY = + "hbase.thrift.accept.queue.size.per.selector"; + + /** + * The strategy for handling new accepted connections. + */ + public static final String ACCEPT_POLICY_CONF_KEY = + "hbase.thrift.accept.policy"; + + public HThreadedSelectorServerArgs( + TNonblockingServerTransport transport, Configuration conf) { + super(transport); + readConf(conf); + } + + private void readConf(Configuration conf) { + int selectorThreads = conf.getInt( + SELECTOR_THREADS_CONF_KEY, getSelectorThreads()); + int workerThreads = conf.getInt( + WORKER_THREADS_CONF_KEY, getWorkerThreads()); + int stopTimeoutVal = conf.getInt( + STOP_TIMEOUT_CONF_KEY, getStopTimeoutVal()); + int acceptQueueSizePerThread = conf.getInt( + ACCEPT_QUEUE_SIZE_PER_THREAD_CONF_KEY, getAcceptQueueSizePerThread()); + AcceptPolicy acceptPolicy = AcceptPolicy.valueOf(conf.get( + ACCEPT_POLICY_CONF_KEY, getAcceptPolicy().toString()).toUpperCase()); + + super.selectorThreads(selectorThreads) + .workerThreads(workerThreads) + .stopTimeoutVal(stopTimeoutVal) + .acceptQueueSizePerThread(acceptQueueSizePerThread) + .acceptPolicy(acceptPolicy); + + LOG.info("Read configuration selectorThreads:" + selectorThreads + + " workerThreads:" + workerThreads + + " stopTimeoutVal:" + stopTimeoutVal + "sec" + + " acceptQueueSizePerThread:" + acceptQueueSizePerThread + + " acceptPolicy:" + acceptPolicy); + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java new file mode 100644 index 00000000000..b6a6349709f --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java @@ -0,0 +1,1237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.thrift; + +import static org.apache.hadoop.hbase.util.Bytes.getBytes; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionGroup; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.ParseFilter; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.filter.WhileMatchFilter; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.thrift.generated.AlreadyExists; +import org.apache.hadoop.hbase.thrift.generated.BatchMutation; +import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor; +import org.apache.hadoop.hbase.thrift.generated.Hbase; +import org.apache.hadoop.hbase.thrift.generated.IOError; +import org.apache.hadoop.hbase.thrift.generated.IllegalArgument; +import org.apache.hadoop.hbase.thrift.generated.Mutation; +import org.apache.hadoop.hbase.thrift.generated.TCell; +import org.apache.hadoop.hbase.thrift.generated.TRegionInfo; +import org.apache.hadoop.hbase.thrift.generated.TRowResult; +import org.apache.hadoop.hbase.thrift.generated.TScan; +import org.apache.hadoop.hbase.util.Addressing; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.net.DNS; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.protocol.TProtocolFactory; +import org.apache.thrift.server.THsHaServer; +import org.apache.thrift.server.TNonblockingServer; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadedSelectorServer; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TNonblockingServerSocket; +import org.apache.thrift.transport.TNonblockingServerTransport; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TServerTransport; +import org.apache.thrift.transport.TTransportFactory; + +import com.google.common.base.Joiner; + +/** + * ThriftServerRunner - this class starts up a Thrift server which implements + * the Hbase API specified in the Hbase.thrift IDL file. + */ +public class ThriftServerRunner implements Runnable { + + private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class); + + static final String SERVER_TYPE_CONF_KEY = + "hbase.regionserver.thrift.server.type"; + + static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress"; + static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact"; + static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed"; + static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port"; + + private static final String DEFAULT_BIND_ADDR = "0.0.0.0"; + private static final int DEFAULT_LISTEN_PORT = 9090; + + private Configuration conf; + volatile TServer tserver; + private final HBaseHandler handler; + + /** An enum of server implementation selections */ + enum ImplType { + HS_HA("hsha", true, THsHaServer.class, false), + NONBLOCKING("nonblocking", true, TNonblockingServer.class, false), + THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true), + THREADED_SELECTOR( + "threadedselector", true, TThreadedSelectorServer.class, false); + + public static final ImplType DEFAULT = THREAD_POOL; + + final String option; + final boolean isAlwaysFramed; + final Class serverClass; + final boolean canSpecifyBindIP; + + ImplType(String option, boolean isAlwaysFramed, + Class serverClass, boolean canSpecifyBindIP) { + this.option = option; + this.isAlwaysFramed = isAlwaysFramed; + this.serverClass = serverClass; + this.canSpecifyBindIP = canSpecifyBindIP; + } + + /** + * @return -option so we can get the list of options from + * {@link #values()} + */ + @Override + public String toString() { + return "-" + option; + } + + String getDescription() { + StringBuilder sb = new StringBuilder("Use the " + + serverClass.getSimpleName()); + if (isAlwaysFramed) { + sb.append(" This implies the framed transport."); + } + if (this == DEFAULT) { + sb.append("This is the default."); + } + return sb.toString(); + } + + static OptionGroup createOptionGroup() { + OptionGroup group = new OptionGroup(); + for (ImplType t : values()) { + group.addOption(new Option(t.option, t.getDescription())); + } + return group; + } + + static ImplType getServerImpl(Configuration conf) { + String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option); + for (ImplType t : values()) { + if (confType.equals(t.option)) { + return t; + } + } + throw new AssertionError("Unknown server ImplType.option:" + confType); + } + + static void setServerImpl(CommandLine cmd, Configuration conf) { + ImplType chosenType = null; + int numChosen = 0; + for (ImplType t : values()) { + if (cmd.hasOption(t.option)) { + chosenType = t; + ++numChosen; + } + } + if (numChosen != 1) { + throw new AssertionError("Exactly one option out of " + + Arrays.toString(values()) + " has to be specified"); + } + LOG.info("Setting thrift server to " + chosenType.option); + conf.set(SERVER_TYPE_CONF_KEY, chosenType.option); + } + + public String simpleClassName() { + return serverClass.getSimpleName(); + } + + public static List serversThatCannotSpecifyBindIP() { + List l = new ArrayList(); + for (ImplType t : values()) { + if (!t.canSpecifyBindIP) { + l.add(t.simpleClassName()); + } + } + return l; + } + + } + + public ThriftServerRunner(Configuration conf) throws IOException { + this(conf, new ThriftServerRunner.HBaseHandler(conf)); + } + + public ThriftServerRunner(Configuration conf, HBaseHandler handler) { + this.conf = HBaseConfiguration.create(conf); + this.handler = handler; + } + + /* + * Runs the Thrift server + */ + @Override + public void run() { + try { + setupServer(); + tserver.serve(); + } catch (Exception e) { + LOG.fatal("Cannot run ThriftServer"); + // Crash the process if the ThriftServer is not running + System.exit(-1); + } + } + + public void shutdown() { + if (tserver != null) { + tserver.stop(); + tserver = null; + } + } + + /** + * Setting up the thrift TServer + */ + private void setupServer() throws Exception { + // Get port to bind to + int listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT); + + // Construct correct ProtocolFactory + TProtocolFactory protocolFactory; + if (conf.getBoolean(COMPACT_CONF_KEY, false)) { + LOG.debug("Using compact protocol"); + protocolFactory = new TCompactProtocol.Factory(); + } else { + LOG.debug("Using binary protocol"); + protocolFactory = new TBinaryProtocol.Factory(); + } + + Hbase.Processor processor = + new Hbase.Processor(handler); + ImplType implType = ImplType.getServerImpl(conf); + + // Construct correct TransportFactory + TTransportFactory transportFactory; + if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) { + transportFactory = new TFramedTransport.Factory(); + LOG.debug("Using framed transport"); + } else { + transportFactory = new TTransportFactory(); + } + + if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) { + LOG.error("Server types " + Joiner.on(", ").join( + ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " + + "address binding at the moment. See " + + "https://issues.apache.org/jira/browse/HBASE-2155 for details."); + throw new RuntimeException( + "-" + BIND_CONF_KEY + " not supported with " + implType); + } + + if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING || + implType == ImplType.THREADED_SELECTOR) { + + TNonblockingServerTransport serverTransport = + new TNonblockingServerSocket(listenPort); + + if (implType == ImplType.NONBLOCKING) { + TNonblockingServer.Args serverArgs = + new TNonblockingServer.Args(serverTransport); + serverArgs.processor(processor) + .transportFactory(transportFactory) + .protocolFactory(protocolFactory); + tserver = new TNonblockingServer(serverArgs); + } else if (implType == ImplType.HS_HA) { + THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport); + serverArgs.processor(processor) + .transportFactory(transportFactory) + .protocolFactory(protocolFactory); + tserver = new THsHaServer(serverArgs); + } else { // THREADED_SELECTOR + TThreadedSelectorServer.Args serverArgs = + new HThreadedSelectorServerArgs(serverTransport, conf); + serverArgs.processor(processor) + .transportFactory(transportFactory) + .protocolFactory(protocolFactory); + tserver = new TThreadedSelectorServer(serverArgs); + } + LOG.info("starting HBase " + implType.simpleClassName() + + " server on " + Integer.toString(listenPort)); + } else if (implType == ImplType.THREAD_POOL) { + // Thread pool server. Get the IP address to bind to. + InetAddress listenAddress = getBindAddress(conf); + + TServerTransport serverTransport = new TServerSocket( + new InetSocketAddress(listenAddress, listenPort)); + + TBoundedThreadPoolServer.Args serverArgs = + new TBoundedThreadPoolServer.Args(serverTransport, conf); + serverArgs.processor(processor) + .transportFactory(transportFactory) + .protocolFactory(protocolFactory); + LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on " + + listenAddress + ":" + Integer.toString(listenPort) + + "; " + serverArgs); + tserver = new TBoundedThreadPoolServer(serverArgs); + } else { + throw new AssertionError("Unsupported Thrift server implementation: " + + implType.simpleClassName()); + } + + // A sanity check that we instantiated the right type of server. + if (tserver.getClass() != implType.serverClass) { + throw new AssertionError("Expected to create Thrift server class " + + implType.serverClass.getName() + " but got " + + tserver.getClass().getName()); + } + + // login the server principal (if using secure Hadoop) + Configuration conf = handler.conf; + if (User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf)) { + String machineName = Strings.domainNamePointerToHostName( + DNS.getDefaultHost(conf.get("hbase.thrift.dns.interface", "default"), + conf.get("hbase.thrift.dns.nameserver", "default"))); + User.login(conf, "hbase.thrift.keytab.file", + "hbase.thrift.kerberos.principal", machineName); + } + } + + private InetAddress getBindAddress(Configuration conf) + throws UnknownHostException { + String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR); + return InetAddress.getByName(bindAddressStr); + } + + /** + * The HBaseHandler is a glue object that connects Thrift RPC calls to the + * HBase client API primarily defined in the HBaseAdmin and HTable objects. + */ + public static class HBaseHandler implements Hbase.Iface { + protected Configuration conf; + protected HBaseAdmin admin = null; + protected final Log LOG = LogFactory.getLog(this.getClass().getName()); + + // nextScannerId and scannerMap are used to manage scanner state + protected int nextScannerId = 0; + protected HashMap scannerMap = null; + + private static ThreadLocal> threadLocalTables = + new ThreadLocal>() { + @Override + protected Map initialValue() { + return new TreeMap(); + } + }; + + /** + * Returns a list of all the column families for a given htable. + * + * @param table + * @return + * @throws IOException + */ + byte[][] getAllColumns(HTable table) throws IOException { + HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies(); + byte[][] columns = new byte[cds.length][]; + for (int i = 0; i < cds.length; i++) { + columns[i] = Bytes.add(cds[i].getName(), + KeyValue.COLUMN_FAMILY_DELIM_ARRAY); + } + return columns; + } + + /** + * Creates and returns an HTable instance from a given table name. + * + * @param tableName + * name of table + * @return HTable object + * @throws IOException + * @throws IOError + */ + protected HTable getTable(final byte[] tableName) throws + IOException { + String table = new String(tableName); + Map tables = threadLocalTables.get(); + if (!tables.containsKey(table)) { + tables.put(table, new HTable(conf, tableName)); + } + return tables.get(table); + } + + protected HTable getTable(final ByteBuffer tableName) throws IOException { + return getTable(getBytes(tableName)); + } + + /** + * Assigns a unique ID to the scanner and adds the mapping to an internal + * hash-map. + * + * @param scanner + * @return integer scanner id + */ + protected synchronized int addScanner(ResultScanner scanner) { + int id = nextScannerId++; + scannerMap.put(id, scanner); + return id; + } + + /** + * Returns the scanner associated with the specified ID. + * + * @param id + * @return a Scanner, or null if ID was invalid. + */ + protected synchronized ResultScanner getScanner(int id) { + return scannerMap.get(id); + } + + /** + * Removes the scanner associated with the specified ID from the internal + * id->scanner hash-map. + * + * @param id + * @return a Scanner, or null if ID was invalid. + */ + protected synchronized ResultScanner removeScanner(int id) { + return scannerMap.remove(id); + } + + /** + * Constructs an HBaseHandler object. + * @throws IOException + */ + protected HBaseHandler() + throws IOException { + this(HBaseConfiguration.create()); + } + + protected HBaseHandler(final Configuration c) + throws IOException { + this.conf = c; + admin = new HBaseAdmin(conf); + scannerMap = new HashMap(); + } + + @Override + public void enableTable(ByteBuffer tableName) throws IOError { + try{ + admin.enableTable(getBytes(tableName)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void disableTable(ByteBuffer tableName) throws IOError{ + try{ + admin.disableTable(getBytes(tableName)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public boolean isTableEnabled(ByteBuffer tableName) throws IOError { + try { + return HTable.isTableEnabled(this.conf, getBytes(tableName)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void compact(ByteBuffer tableNameOrRegionName) throws IOError { + try{ + admin.compact(getBytes(tableNameOrRegionName)); + } catch (InterruptedException e) { + throw new IOError(e.getMessage()); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError { + try{ + admin.majorCompact(getBytes(tableNameOrRegionName)); + } catch (InterruptedException e) { + throw new IOError(e.getMessage()); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public List getTableNames() throws IOError { + try { + HTableDescriptor[] tables = this.admin.listTables(); + ArrayList list = new ArrayList(tables.length); + for (int i = 0; i < tables.length; i++) { + list.add(ByteBuffer.wrap(tables[i].getName())); + } + return list; + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public List getTableRegions(ByteBuffer tableName) + throws IOError { + try{ + List hris = this.admin.getTableRegions(tableName.array()); + List regions = new ArrayList(); + + if (hris != null) { + for (HRegionInfo regionInfo : hris){ + TRegionInfo region = new TRegionInfo(); + region.startKey = ByteBuffer.wrap(regionInfo.getStartKey()); + region.endKey = ByteBuffer.wrap(regionInfo.getEndKey()); + region.id = regionInfo.getRegionId(); + region.name = ByteBuffer.wrap(regionInfo.getRegionName()); + region.version = regionInfo.getVersion(); + regions.add(region); + } + } + return regions; + } catch (IOException e){ + throw new IOError(e.getMessage()); + } + } + + @Deprecated + @Override + public List get( + ByteBuffer tableName, ByteBuffer row, ByteBuffer column) + throws IOError { + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if(famAndQf.length == 1) { + return get(tableName, row, famAndQf[0], new byte[0]); + } + return get(tableName, row, famAndQf[0], famAndQf[1]); + } + + protected List get(ByteBuffer tableName, + ByteBuffer row, + byte[] family, + byte[] qualifier) throws IOError { + try { + HTable table = getTable(tableName); + Get get = new Get(getBytes(row)); + if (qualifier == null || qualifier.length == 0) { + get.addFamily(family); + } else { + get.addColumn(family, qualifier); + } + Result result = table.get(get); + return ThriftUtilities.cellFromHBase(result.raw()); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Deprecated + @Override + public List getVer(ByteBuffer tableName, ByteBuffer row, + ByteBuffer column, int numVersions) throws IOError { + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if(famAndQf.length == 1) { + return getVer(tableName, row, famAndQf[0], + new byte[0], numVersions); + } + return getVer(tableName, row, + famAndQf[0], famAndQf[1], numVersions); + } + + public List getVer(ByteBuffer tableName, ByteBuffer row, + byte[] family, + byte[] qualifier, int numVersions) throws IOError { + try { + HTable table = getTable(tableName); + Get get = new Get(getBytes(row)); + get.addColumn(family, qualifier); + get.setMaxVersions(numVersions); + Result result = table.get(get); + return ThriftUtilities.cellFromHBase(result.raw()); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Deprecated + @Override + public List getVerTs(ByteBuffer tableName, + ByteBuffer row, + ByteBuffer column, + long timestamp, + int numVersions) throws IOError { + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if(famAndQf.length == 1) { + return getVerTs(tableName, row, famAndQf[0], new byte[0], timestamp, + numVersions); + } + return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, + numVersions); + } + + protected List getVerTs(ByteBuffer tableName, + ByteBuffer row, byte [] family, + byte [] qualifier, long timestamp, int numVersions) throws IOError { + try { + HTable table = getTable(tableName); + Get get = new Get(getBytes(row)); + get.addColumn(family, qualifier); + get.setTimeRange(Long.MIN_VALUE, timestamp); + get.setMaxVersions(numVersions); + Result result = table.get(get); + return ThriftUtilities.cellFromHBase(result.raw()); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public List getRow(ByteBuffer tableName, ByteBuffer row) + throws IOError { + return getRowWithColumnsTs(tableName, row, null, + HConstants.LATEST_TIMESTAMP); + } + + @Override + public List getRowWithColumns(ByteBuffer tableName, + ByteBuffer row, + List columns) throws IOError { + return getRowWithColumnsTs(tableName, row, columns, + HConstants.LATEST_TIMESTAMP); + } + + @Override + public List getRowTs(ByteBuffer tableName, ByteBuffer row, + long timestamp) throws IOError { + return getRowWithColumnsTs(tableName, row, null, + timestamp); + } + + @Override + public List getRowWithColumnsTs( + ByteBuffer tableName, ByteBuffer row, List columns, + long timestamp) throws IOError { + try { + HTable table = getTable(tableName); + if (columns == null) { + Get get = new Get(getBytes(row)); + get.setTimeRange(Long.MIN_VALUE, timestamp); + Result result = table.get(get); + return ThriftUtilities.rowResultFromHBase(result); + } + Get get = new Get(getBytes(row)); + for(ByteBuffer column : columns) { + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if (famAndQf.length == 1) { + get.addFamily(famAndQf[0]); + } else { + get.addColumn(famAndQf[0], famAndQf[1]); + } + } + get.setTimeRange(Long.MIN_VALUE, timestamp); + Result result = table.get(get); + return ThriftUtilities.rowResultFromHBase(result); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public List getRows(ByteBuffer tableName, + List rows) + throws IOError { + return getRowsWithColumnsTs(tableName, rows, null, + HConstants.LATEST_TIMESTAMP); + } + + @Override + public List getRowsWithColumns(ByteBuffer tableName, + List rows, + List columns) throws IOError { + return getRowsWithColumnsTs(tableName, rows, columns, + HConstants.LATEST_TIMESTAMP); + } + + @Override + public List getRowsTs(ByteBuffer tableName, + List rows, + long timestamp) throws IOError { + return getRowsWithColumnsTs(tableName, rows, null, + timestamp); + } + + @Override + public List getRowsWithColumnsTs(ByteBuffer tableName, + List rows, + List columns, long timestamp) throws IOError { + try { + List gets = new ArrayList(rows.size()); + HTable table = getTable(tableName); + for (ByteBuffer row : rows) { + Get get = new Get(getBytes(row)); + if (columns != null) { + + for(ByteBuffer column : columns) { + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if (famAndQf.length == 1) { + get.addFamily(famAndQf[0]); + } else { + get.addColumn(famAndQf[0], famAndQf[1]); + } + } + get.setTimeRange(Long.MIN_VALUE, timestamp); + } + gets.add(get); + } + Result[] result = table.get(gets); + return ThriftUtilities.rowResultFromHBase(result); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void deleteAll( + ByteBuffer tableName, ByteBuffer row, ByteBuffer column) + throws IOError { + deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP); + } + + @Override + public void deleteAllTs(ByteBuffer tableName, + ByteBuffer row, + ByteBuffer column, + long timestamp) throws IOError { + try { + HTable table = getTable(tableName); + Delete delete = new Delete(getBytes(row)); + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if (famAndQf.length == 1) { + delete.deleteFamily(famAndQf[0], timestamp); + } else { + delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp); + } + table.delete(delete); + + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void deleteAllRow( + ByteBuffer tableName, ByteBuffer row) throws IOError { + deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP); + } + + @Override + public void deleteAllRowTs( + ByteBuffer tableName, ByteBuffer row, long timestamp) throws IOError { + try { + HTable table = getTable(tableName); + Delete delete = new Delete(getBytes(row), timestamp, null); + table.delete(delete); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void createTable(ByteBuffer in_tableName, + List columnFamilies) throws IOError, + IllegalArgument, AlreadyExists { + byte [] tableName = getBytes(in_tableName); + try { + if (admin.tableExists(tableName)) { + throw new AlreadyExists("table name already in use"); + } + HTableDescriptor desc = new HTableDescriptor(tableName); + for (ColumnDescriptor col : columnFamilies) { + HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col); + desc.addFamily(colDesc); + } + admin.createTable(desc); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } catch (IllegalArgumentException e) { + throw new IllegalArgument(e.getMessage()); + } + } + + @Override + public void deleteTable(ByteBuffer in_tableName) throws IOError { + byte [] tableName = getBytes(in_tableName); + if (LOG.isDebugEnabled()) { + LOG.debug("deleteTable: table=" + Bytes.toString(tableName)); + } + try { + if (!admin.tableExists(tableName)) { + throw new IOError("table does not exist"); + } + admin.deleteTable(tableName); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public void mutateRow(ByteBuffer tableName, ByteBuffer row, + List mutations) throws IOError, IllegalArgument { + mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP); + } + + @Override + public void mutateRowTs(ByteBuffer tableName, ByteBuffer row, + List mutations, long timestamp) + throws IOError, IllegalArgument { + HTable table = null; + try { + table = getTable(tableName); + Put put = new Put(getBytes(row), timestamp, null); + + Delete delete = new Delete(getBytes(row)); + + // I apologize for all this mess :) + for (Mutation m : mutations) { + byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column)); + if (m.isDelete) { + if (famAndQf.length == 1) { + delete.deleteFamily(famAndQf[0], timestamp); + } else { + delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp); + } + } else { + if(famAndQf.length == 1) { + put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, + m.value != null ? m.value.array() + : HConstants.EMPTY_BYTE_ARRAY); + } else { + put.add(famAndQf[0], famAndQf[1], + m.value != null ? m.value.array() + : HConstants.EMPTY_BYTE_ARRAY); + } + } + } + if (!delete.isEmpty()) + table.delete(delete); + if (!put.isEmpty()) + table.put(put); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } catch (IllegalArgumentException e) { + throw new IllegalArgument(e.getMessage()); + } + } + + @Override + public void mutateRows(ByteBuffer tableName, List rowBatches) + throws IOError, IllegalArgument, TException { + mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP); + } + + @Override + public void mutateRowsTs( + ByteBuffer tableName, List rowBatches, long timestamp) + throws IOError, IllegalArgument, TException { + List puts = new ArrayList(); + List deletes = new ArrayList(); + + for (BatchMutation batch : rowBatches) { + byte[] row = getBytes(batch.row); + List mutations = batch.mutations; + Delete delete = new Delete(row); + Put put = new Put(row, timestamp, null); + for (Mutation m : mutations) { + byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column)); + if (m.isDelete) { + // no qualifier, family only. + if (famAndQf.length == 1) { + delete.deleteFamily(famAndQf[0], timestamp); + } else { + delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp); + } + } else { + if(famAndQf.length == 1) { + put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, + m.value != null ? m.value.array() + : HConstants.EMPTY_BYTE_ARRAY); + } else { + put.add(famAndQf[0], famAndQf[1], + m.value != null ? m.value.array() + : HConstants.EMPTY_BYTE_ARRAY); + } + } + } + if (!delete.isEmpty()) + deletes.add(delete); + if (!put.isEmpty()) + puts.add(put); + } + + HTable table = null; + try { + table = getTable(tableName); + if (!puts.isEmpty()) + table.put(puts); + for (Delete del : deletes) { + table.delete(del); + } + } catch (IOException e) { + throw new IOError(e.getMessage()); + } catch (IllegalArgumentException e) { + throw new IllegalArgument(e.getMessage()); + } + } + + @Deprecated + @Override + public long atomicIncrement( + ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount) + throws IOError, IllegalArgument, TException { + byte [][] famAndQf = KeyValue.parseColumn(getBytes(column)); + if(famAndQf.length == 1) { + return atomicIncrement(tableName, row, famAndQf[0], new byte[0], + amount); + } + return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount); + } + + protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row, + byte [] family, byte [] qualifier, long amount) + throws IOError, IllegalArgument, TException { + HTable table; + try { + table = getTable(tableName); + return table.incrementColumnValue( + getBytes(row), family, qualifier, amount); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + public void scannerClose(int id) throws IOError, IllegalArgument { + LOG.debug("scannerClose: id=" + id); + ResultScanner scanner = getScanner(id); + if (scanner == null) { + throw new IllegalArgument("scanner ID is invalid"); + } + scanner.close(); + removeScanner(id); + } + + @Override + public List scannerGetList(int id,int nbRows) + throws IllegalArgument, IOError { + LOG.debug("scannerGetList: id=" + id); + ResultScanner scanner = getScanner(id); + if (null == scanner) { + throw new IllegalArgument("scanner ID is invalid"); + } + + Result [] results = null; + try { + results = scanner.next(nbRows); + if (null == results) { + return new ArrayList(); + } + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + return ThriftUtilities.rowResultFromHBase(results); + } + + @Override + public List scannerGet(int id) throws IllegalArgument, IOError { + return scannerGetList(id,1); + } + + public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan) + throws IOError { + try { + HTable table = getTable(tableName); + Scan scan = new Scan(); + if (tScan.isSetStartRow()) { + scan.setStartRow(tScan.getStartRow()); + } + if (tScan.isSetStopRow()) { + scan.setStopRow(tScan.getStopRow()); + } + if (tScan.isSetTimestamp()) { + scan.setTimeRange(Long.MIN_VALUE, tScan.getTimestamp()); + } + if (tScan.isSetCaching()) { + scan.setCaching(tScan.getCaching()); + } + if (tScan.isSetColumns() && tScan.getColumns().size() != 0) { + for(ByteBuffer column : tScan.getColumns()) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + if (tScan.isSetFilterString()) { + ParseFilter parseFilter = new ParseFilter(); + scan.setFilter( + parseFilter.parseFilterString(tScan.getFilterString())); + } + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow, + List columns) throws IOError { + try { + HTable table = getTable(tableName); + Scan scan = new Scan(getBytes(startRow)); + if(columns != null && columns.size() != 0) { + for(ByteBuffer column : columns) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow, + ByteBuffer stopRow, List columns) + throws IOError, TException { + try { + HTable table = getTable(tableName); + Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); + if(columns != null && columns.size() != 0) { + for(ByteBuffer column : columns) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public int scannerOpenWithPrefix(ByteBuffer tableName, + ByteBuffer startAndPrefix, + List columns) + throws IOError, TException { + try { + HTable table = getTable(tableName); + Scan scan = new Scan(getBytes(startAndPrefix)); + Filter f = new WhileMatchFilter( + new PrefixFilter(getBytes(startAndPrefix))); + scan.setFilter(f); + if (columns != null && columns.size() != 0) { + for(ByteBuffer column : columns) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow, + List columns, long timestamp) throws IOError, TException { + try { + HTable table = getTable(tableName); + Scan scan = new Scan(getBytes(startRow)); + scan.setTimeRange(Long.MIN_VALUE, timestamp); + if (columns != null && columns.size() != 0) { + for (ByteBuffer column : columns) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow, + ByteBuffer stopRow, List columns, long timestamp) + throws IOError, TException { + try { + HTable table = getTable(tableName); + Scan scan = new Scan(getBytes(startRow), getBytes(stopRow)); + scan.setTimeRange(Long.MIN_VALUE, timestamp); + if (columns != null && columns.size() != 0) { + for (ByteBuffer column : columns) { + byte [][] famQf = KeyValue.parseColumn(getBytes(column)); + if(famQf.length == 1) { + scan.addFamily(famQf[0]); + } else { + scan.addColumn(famQf[0], famQf[1]); + } + } + } + scan.setTimeRange(Long.MIN_VALUE, timestamp); + return addScanner(table.getScanner(scan)); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public Map getColumnDescriptors( + ByteBuffer tableName) throws IOError, TException { + try { + TreeMap columns = + new TreeMap(); + + HTable table = getTable(tableName); + HTableDescriptor desc = table.getTableDescriptor(); + + for (HColumnDescriptor e : desc.getFamilies()) { + ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e); + columns.put(col.name, col); + } + return columns; + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public List getRowOrBefore(ByteBuffer tableName, ByteBuffer row, + ByteBuffer family) throws IOError { + try { + HTable table = getTable(getBytes(tableName)); + Result result = table.getRowOrBefore(getBytes(row), getBytes(family)); + return ThriftUtilities.cellFromHBase(result.raw()); + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + + @Override + public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError { + try { + HTable table = getTable(HConstants.META_TABLE_NAME); + Result startRowResult = table.getRowOrBefore( + searchRow.array(), HConstants.CATALOG_FAMILY); + + if (startRowResult == null) { + throw new IOException("Cannot find row in .META., row=" + + Bytes.toString(searchRow.array())); + } + + // find region start and end keys + byte[] value = startRowResult.getValue(HConstants.CATALOG_FAMILY, + HConstants.REGIONINFO_QUALIFIER); + if (value == null || value.length == 0) { + throw new IOException("HRegionInfo REGIONINFO was null or " + + " empty in Meta for row=" + + Bytes.toString(searchRow.array())); + } + HRegionInfo regionInfo = Writables.getHRegionInfo(value); + TRegionInfo region = new TRegionInfo(); + region.setStartKey(regionInfo.getStartKey()); + region.setEndKey(regionInfo.getEndKey()); + region.id = regionInfo.getRegionId(); + region.setName(regionInfo.getRegionName()); + region.version = regionInfo.getVersion(); + + // find region assignment to server + value = startRowResult.getValue(HConstants.CATALOG_FAMILY, + HConstants.SERVER_QUALIFIER); + if (value != null && value.length > 0) { + String hostAndPort = Bytes.toString(value); + region.setServerName(Bytes.toBytes( + Addressing.parseHostname(hostAndPort))); + region.port = Addressing.parsePort(hostAndPort); + } + return region; + } catch (IOException e) { + throw new IOError(e.getMessage()); + } + } + } +}