HBASE-19471 Fixed remaining Checkstyle errors in hbase-thrift
This commit is contained in:
parent
228d7a5a46
commit
830179600d
|
@ -36,4 +36,5 @@
|
||||||
<suppress checks="MagicNumberCheck" files=".*/src/test/.*\.java"/>
|
<suppress checks="MagicNumberCheck" files=".*/src/test/.*\.java"/>
|
||||||
<suppress checks="VisibilityModifier" files=".*/src/test/.*\.java"/>
|
<suppress checks="VisibilityModifier" files=".*/src/test/.*\.java"/>
|
||||||
<suppress checks="InterfaceIsTypeCheck" files=".*/src/main/.*\.java"/>
|
<suppress checks="InterfaceIsTypeCheck" files=".*/src/main/.*\.java"/>
|
||||||
|
<suppress checks="EmptyBlockCheck" files="TBoundedThreadPoolServer.java"/>
|
||||||
</suppressions>
|
</suppressions>
|
||||||
|
|
|
@ -139,6 +139,22 @@
|
||||||
<groupId>org.apache.maven.plugins</groupId>
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
<artifactId>maven-source-plugin</artifactId>
|
<artifactId>maven-source-plugin</artifactId>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-checkstyle-plugin</artifactId>
|
||||||
|
<executions>
|
||||||
|
<execution>
|
||||||
|
<id>checkstyle</id>
|
||||||
|
<phase>validate</phase>
|
||||||
|
<goals>
|
||||||
|
<goal>check</goal>
|
||||||
|
</goals>
|
||||||
|
<configuration>
|
||||||
|
<failOnViolation>true</failOnViolation>
|
||||||
|
</configuration>
|
||||||
|
</execution>
|
||||||
|
</executions>
|
||||||
|
</plugin>
|
||||||
</plugins>
|
</plugins>
|
||||||
<pluginManagement>
|
<pluginManagement>
|
||||||
<plugins>
|
<plugins>
|
||||||
|
|
|
@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
|
||||||
* time of each call to ThriftMetrics.
|
* time of each call to ThriftMetrics.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class HbaseHandlerMetricsProxy implements InvocationHandler {
|
public final class HbaseHandlerMetricsProxy implements InvocationHandler {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
HbaseHandlerMetricsProxy.class);
|
HbaseHandlerMetricsProxy.class);
|
||||||
|
|
|
@ -117,14 +117,30 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object obj) {
|
public boolean equals(Object obj) {
|
||||||
if (this == obj) return true;
|
if (this == obj) {
|
||||||
if (obj == null) return false;
|
return true;
|
||||||
if (getClass() != obj.getClass()) return false;
|
}
|
||||||
|
if (obj == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (getClass() != obj.getClass()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
FullyQualifiedRow other = (FullyQualifiedRow) obj;
|
FullyQualifiedRow other = (FullyQualifiedRow) obj;
|
||||||
if (!Arrays.equals(family, other.family)) return false;
|
|
||||||
if (!Arrays.equals(qualifier, other.qualifier)) return false;
|
if (!Arrays.equals(family, other.family)) {
|
||||||
if (!Arrays.equals(rowKey, other.rowKey)) return false;
|
return false;
|
||||||
if (!Arrays.equals(table, other.table)) return false;
|
}
|
||||||
|
if (!Arrays.equals(qualifier, other.qualifier)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!Arrays.equals(rowKey, other.rowKey)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!Arrays.equals(table, other.table)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -144,8 +160,14 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
||||||
|
|
||||||
public Thread newThread(Runnable r) {
|
public Thread newThread(Runnable r) {
|
||||||
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
|
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
|
||||||
if (!t.isDaemon()) t.setDaemon(true);
|
|
||||||
if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);
|
if (!t.isDaemon()) {
|
||||||
|
t.setDaemon(true);
|
||||||
|
}
|
||||||
|
if (t.getPriority() != Thread.NORM_PRIORITY) {
|
||||||
|
t.setPriority(Thread.NORM_PRIORITY);
|
||||||
|
}
|
||||||
|
|
||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -191,13 +213,16 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
||||||
for (TIncrement tinc : incs) {
|
for (TIncrement tinc : incs) {
|
||||||
internalQueueTincrement(tinc);
|
internalQueueTincrement(tinc);
|
||||||
}
|
}
|
||||||
return true;
|
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean internalQueueTincrement(TIncrement inc) throws TException {
|
private boolean internalQueueTincrement(TIncrement inc) throws TException {
|
||||||
byte[][] famAndQf = CellUtil.parseColumn(inc.getColumn());
|
byte[][] famAndQf = CellUtil.parseColumn(inc.getColumn());
|
||||||
if (famAndQf.length != 2) return false;
|
|
||||||
|
if (famAndQf.length != 2) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
return internalQueueIncrement(inc.getTable(), inc.getRow(), famAndQf[0], famAndQf[1],
|
return internalQueueIncrement(inc.getTable(), inc.getRow(), famAndQf[0], famAndQf[1],
|
||||||
inc.getAmmount());
|
inc.getAmmount());
|
||||||
|
@ -207,7 +232,6 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
||||||
byte[] qual, long ammount) throws TException {
|
byte[] qual, long ammount) throws TException {
|
||||||
int countersMapSize = countersMap.size();
|
int countersMapSize = countersMap.size();
|
||||||
|
|
||||||
|
|
||||||
//Make sure that the number of threads is scaled.
|
//Make sure that the number of threads is scaled.
|
||||||
dynamicallySetCoreSize(countersMapSize);
|
dynamicallySetCoreSize(countersMapSize);
|
||||||
|
|
||||||
|
@ -293,7 +317,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
||||||
/**
|
/**
|
||||||
* This method samples the incoming requests and, if selected, will check if
|
* This method samples the incoming requests and, if selected, will check if
|
||||||
* the corePoolSize should be changed.
|
* the corePoolSize should be changed.
|
||||||
* @param countersMapSize
|
* @param countersMapSize the size of the counters map
|
||||||
*/
|
*/
|
||||||
private void dynamicallySetCoreSize(int countersMapSize) {
|
private void dynamicallySetCoreSize(int countersMapSize) {
|
||||||
// Here we are using countersMapSize as a random number, meaning this
|
// Here we are using countersMapSize as a random number, meaning this
|
||||||
|
@ -302,9 +326,10 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
double currentRatio = (double) countersMapSize / (double) maxQueueSize;
|
double currentRatio = (double) countersMapSize / (double) maxQueueSize;
|
||||||
int newValue = 1;
|
int newValue;
|
||||||
|
|
||||||
if (currentRatio < 0.1) {
|
if (currentRatio < 0.1) {
|
||||||
// it's 1
|
newValue = 1;
|
||||||
} else if (currentRatio < 0.3) {
|
} else if (currentRatio < 0.3) {
|
||||||
newValue = 2;
|
newValue = 2;
|
||||||
} else if (currentRatio < 0.5) {
|
} else if (currentRatio < 0.5) {
|
||||||
|
@ -316,6 +341,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
||||||
} else {
|
} else {
|
||||||
newValue = 22;
|
newValue = 22;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pool.getCorePoolSize() != newValue) {
|
if (pool.getCorePoolSize() != newValue) {
|
||||||
pool.setCorePoolSize(newValue);
|
pool.setCorePoolSize(newValue);
|
||||||
}
|
}
|
||||||
|
|
|
@ -258,7 +258,7 @@ public class TBoundedThreadPoolServer extends TServer {
|
||||||
serverTransport_.interrupt();
|
serverTransport_.interrupt();
|
||||||
}
|
}
|
||||||
|
|
||||||
private class ClientConnnection implements Runnable {
|
private final class ClientConnnection implements Runnable {
|
||||||
|
|
||||||
private TTransport client;
|
private TTransport client;
|
||||||
|
|
||||||
|
|
|
@ -64,12 +64,14 @@ public class ThriftMetrics {
|
||||||
|
|
||||||
|
|
||||||
public ThriftMetrics(Configuration conf, ThriftServerType t) {
|
public ThriftMetrics(Configuration conf, ThriftServerType t) {
|
||||||
slowResponseTime = conf.getLong( SLOW_RESPONSE_NANO_SEC, DEFAULT_SLOW_RESPONSE_NANO_SEC);
|
slowResponseTime = conf.getLong(SLOW_RESPONSE_NANO_SEC, DEFAULT_SLOW_RESPONSE_NANO_SEC);
|
||||||
|
|
||||||
if (t == ThriftServerType.ONE) {
|
if (t == ThriftServerType.ONE) {
|
||||||
source = CompatibilitySingletonFactory.getInstance(MetricsThriftServerSourceFactory.class).createThriftOneSource();
|
source = CompatibilitySingletonFactory.getInstance(MetricsThriftServerSourceFactory.class)
|
||||||
|
.createThriftOneSource();
|
||||||
} else if (t == ThriftServerType.TWO) {
|
} else if (t == ThriftServerType.TWO) {
|
||||||
source = CompatibilitySingletonFactory.getInstance(MetricsThriftServerSourceFactory.class).createThriftTwoSource();
|
source = CompatibilitySingletonFactory.getInstance(MetricsThriftServerSourceFactory.class)
|
||||||
|
.createThriftTwoSource();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -87,23 +87,24 @@ public class ThriftServer {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start up or shuts down the Thrift server, depending on the arguments.
|
* Start up or shuts down the Thrift server, depending on the arguments.
|
||||||
* @param args
|
* @param args the arguments to pass in when starting the Thrift server
|
||||||
*/
|
*/
|
||||||
void doMain(final String[] args) throws Exception {
|
void doMain(final String[] args) throws Exception {
|
||||||
processOptions(args);
|
processOptions(args);
|
||||||
|
serverRunner = new ThriftServerRunner(conf);
|
||||||
|
|
||||||
serverRunner = new ThriftServerRunner(conf);
|
// Put up info server.
|
||||||
|
int port = conf.getInt("hbase.thrift.info.port", 9095);
|
||||||
|
|
||||||
// Put up info server.
|
if (port >= 0) {
|
||||||
int port = conf.getInt("hbase.thrift.info.port", 9095);
|
conf.setLong("startcode", System.currentTimeMillis());
|
||||||
if (port >= 0) {
|
String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
|
||||||
conf.setLong("startcode", System.currentTimeMillis());
|
infoServer = new InfoServer("thrift", a, port, false, conf);
|
||||||
String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
|
infoServer.setAttribute("hbase.conf", conf);
|
||||||
infoServer = new InfoServer("thrift", a, port, false, conf);
|
infoServer.start();
|
||||||
infoServer.setAttribute("hbase.conf", conf);
|
}
|
||||||
infoServer.start();
|
|
||||||
}
|
serverRunner.run();
|
||||||
serverRunner.run();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -230,10 +231,6 @@ public class ThriftServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* @param args
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
|
||||||
public static void main(String [] args) throws Exception {
|
public static void main(String [] args) throws Exception {
|
||||||
LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****");
|
LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****");
|
||||||
VersionInfo.logVersion();
|
VersionInfo.logVersion();
|
||||||
|
|
|
@ -161,7 +161,8 @@ public class ThriftServerRunner implements Runnable {
|
||||||
static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
|
static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
|
||||||
static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
|
static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
|
||||||
static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
|
static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
|
||||||
static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
|
static final String MAX_FRAME_SIZE_CONF_KEY =
|
||||||
|
"hbase.regionserver.thrift.framed.max_frame_size_in_mb";
|
||||||
static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
|
static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
|
||||||
static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
|
static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
|
||||||
static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
|
static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
|
||||||
|
@ -347,7 +348,8 @@ public class ThriftServerRunner implements Runnable {
|
||||||
doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER, false);
|
doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER, false);
|
||||||
if (doAsEnabled) {
|
if (doAsEnabled) {
|
||||||
if (!conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
|
if (!conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
|
||||||
LOG.warn("Fail to enable the doAs feature. hbase.regionserver.thrift.http is not configured ");
|
LOG.warn("Fail to enable the doAs feature. hbase.regionserver.thrift.http is not " +
|
||||||
|
"configured ");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (qop != null) {
|
if (qop != null) {
|
||||||
|
@ -433,7 +435,8 @@ public class ThriftServerRunner implements Runnable {
|
||||||
httpServer = new Server(threadPool);
|
httpServer = new Server(threadPool);
|
||||||
|
|
||||||
// Context handler
|
// Context handler
|
||||||
ServletContextHandler ctxHandler = new ServletContextHandler(httpServer, "/", ServletContextHandler.SESSIONS);
|
ServletContextHandler ctxHandler = new ServletContextHandler(httpServer, "/",
|
||||||
|
ServletContextHandler.SESSIONS);
|
||||||
ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), "/*");
|
ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), "/*");
|
||||||
|
|
||||||
// set up Jetty and run the embedded server
|
// set up Jetty and run the embedded server
|
||||||
|
@ -508,14 +511,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
*/
|
*/
|
||||||
private void setupServer() throws Exception {
|
private void setupServer() throws Exception {
|
||||||
// Construct correct ProtocolFactory
|
// Construct correct ProtocolFactory
|
||||||
TProtocolFactory protocolFactory;
|
TProtocolFactory protocolFactory = getProtocolFactory();
|
||||||
if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
|
|
||||||
LOG.debug("Using compact protocol");
|
|
||||||
protocolFactory = new TCompactProtocol.Factory();
|
|
||||||
} else {
|
|
||||||
LOG.debug("Using binary protocol");
|
|
||||||
protocolFactory = new TBinaryProtocol.Factory();
|
|
||||||
}
|
|
||||||
|
|
||||||
final TProcessor p = new Hbase.Processor<>(handler);
|
final TProcessor p = new Hbase.Processor<>(handler);
|
||||||
ImplType implType = ImplType.getServerImpl(conf);
|
ImplType implType = ImplType.getServerImpl(conf);
|
||||||
|
@ -614,10 +610,8 @@ public class ThriftServerRunner implements Runnable {
|
||||||
CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(), metrics);
|
CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(), metrics);
|
||||||
ExecutorService executorService = createExecutor(
|
ExecutorService executorService = createExecutor(
|
||||||
callQueue, serverArgs.getMaxWorkerThreads(), serverArgs.getMaxWorkerThreads());
|
callQueue, serverArgs.getMaxWorkerThreads(), serverArgs.getMaxWorkerThreads());
|
||||||
serverArgs.executorService(executorService)
|
serverArgs.executorService(executorService).processor(processor)
|
||||||
.processor(processor)
|
.transportFactory(transportFactory).protocolFactory(protocolFactory);
|
||||||
.transportFactory(transportFactory)
|
|
||||||
.protocolFactory(protocolFactory);
|
|
||||||
tserver = new THsHaServer(serverArgs);
|
tserver = new THsHaServer(serverArgs);
|
||||||
} else { // THREADED_SELECTOR
|
} else { // THREADED_SELECTOR
|
||||||
TThreadedSelectorServer.Args serverArgs =
|
TThreadedSelectorServer.Args serverArgs =
|
||||||
|
@ -625,10 +619,8 @@ public class ThriftServerRunner implements Runnable {
|
||||||
CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(), metrics);
|
CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(), metrics);
|
||||||
ExecutorService executorService = createExecutor(
|
ExecutorService executorService = createExecutor(
|
||||||
callQueue, serverArgs.getWorkerThreads(), serverArgs.getWorkerThreads());
|
callQueue, serverArgs.getWorkerThreads(), serverArgs.getWorkerThreads());
|
||||||
serverArgs.executorService(executorService)
|
serverArgs.executorService(executorService).processor(processor)
|
||||||
.processor(processor)
|
.transportFactory(transportFactory).protocolFactory(protocolFactory);
|
||||||
.transportFactory(transportFactory)
|
|
||||||
.protocolFactory(protocolFactory);
|
|
||||||
tserver = new TThreadedSelectorServer(serverArgs);
|
tserver = new TThreadedSelectorServer(serverArgs);
|
||||||
}
|
}
|
||||||
LOG.info("starting HBase " + implType.simpleClassName() +
|
LOG.info("starting HBase " + implType.simpleClassName() +
|
||||||
|
@ -640,21 +632,17 @@ public class ThriftServerRunner implements Runnable {
|
||||||
THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
|
THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
|
||||||
TServerTransport serverTransport = new TServerSocket(
|
TServerTransport serverTransport = new TServerSocket(
|
||||||
new TServerSocket.ServerSocketTransportArgs().
|
new TServerSocket.ServerSocketTransportArgs().
|
||||||
bindAddr(new InetSocketAddress(listenAddress, listenPort)).
|
bindAddr(new InetSocketAddress(listenAddress, listenPort)).backlog(backlog).
|
||||||
backlog(backlog).
|
|
||||||
clientTimeout(readTimeout));
|
clientTimeout(readTimeout));
|
||||||
|
|
||||||
TBoundedThreadPoolServer.Args serverArgs =
|
TBoundedThreadPoolServer.Args serverArgs =
|
||||||
new TBoundedThreadPoolServer.Args(serverTransport, conf);
|
new TBoundedThreadPoolServer.Args(serverTransport, conf);
|
||||||
serverArgs.processor(processor)
|
serverArgs.processor(processor).transportFactory(transportFactory)
|
||||||
.transportFactory(transportFactory)
|
.protocolFactory(protocolFactory);
|
||||||
.protocolFactory(protocolFactory);
|
|
||||||
LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
|
LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
|
||||||
+ listenAddress + ":" + Integer.toString(listenPort)
|
+ listenAddress + ":" + Integer.toString(listenPort)
|
||||||
+ " with readTimeout " + readTimeout + "ms; " + serverArgs);
|
+ " with readTimeout " + readTimeout + "ms; " + serverArgs);
|
||||||
TBoundedThreadPoolServer tserver =
|
this.tserver = new TBoundedThreadPoolServer(serverArgs, metrics);
|
||||||
new TBoundedThreadPoolServer(serverArgs, metrics);
|
|
||||||
this.tserver = tserver;
|
|
||||||
} else {
|
} else {
|
||||||
throw new AssertionError("Unsupported Thrift server implementation: " +
|
throw new AssertionError("Unsupported Thrift server implementation: " +
|
||||||
implType.simpleClassName());
|
implType.simpleClassName());
|
||||||
|
@ -672,6 +660,20 @@ public class ThriftServerRunner implements Runnable {
|
||||||
registerFilters(conf);
|
registerFilters(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private TProtocolFactory getProtocolFactory() {
|
||||||
|
TProtocolFactory protocolFactory;
|
||||||
|
|
||||||
|
if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
|
||||||
|
LOG.debug("Using compact protocol");
|
||||||
|
protocolFactory = new TCompactProtocol.Factory();
|
||||||
|
} else {
|
||||||
|
LOG.debug("Using binary protocol");
|
||||||
|
protocolFactory = new TBinaryProtocol.Factory();
|
||||||
|
}
|
||||||
|
|
||||||
|
return protocolFactory;
|
||||||
|
}
|
||||||
|
|
||||||
ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
|
ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
|
||||||
int minWorkers, int maxWorkers) {
|
int minWorkers, int maxWorkers) {
|
||||||
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
|
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
|
||||||
|
@ -697,7 +699,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
boolean sortResultColumns) {
|
boolean sortResultColumns) {
|
||||||
scanner = resultScanner;
|
scanner = resultScanner;
|
||||||
sortColumns = sortResultColumns;
|
sortColumns = sortResultColumns;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ResultScanner getScanner() {
|
public ResultScanner getScanner() {
|
||||||
return scanner;
|
return scanner;
|
||||||
|
@ -749,10 +751,9 @@ public class ThriftServerRunner implements Runnable {
|
||||||
* @param tableName
|
* @param tableName
|
||||||
* name of table
|
* name of table
|
||||||
* @return Table object
|
* @return Table object
|
||||||
* @throws IOException
|
* @throws IOException if getting the table fails
|
||||||
*/
|
*/
|
||||||
public Table getTable(final byte[] tableName) throws
|
public Table getTable(final byte[] tableName) throws IOException {
|
||||||
IOException {
|
|
||||||
String table = Bytes.toString(tableName);
|
String table = Bytes.toString(tableName);
|
||||||
return connectionCache.getTable(table);
|
return connectionCache.getTable(table);
|
||||||
}
|
}
|
||||||
|
@ -765,10 +766,10 @@ public class ThriftServerRunner implements Runnable {
|
||||||
* Assigns a unique ID to the scanner and adds the mapping to an internal
|
* Assigns a unique ID to the scanner and adds the mapping to an internal
|
||||||
* hash-map.
|
* hash-map.
|
||||||
*
|
*
|
||||||
* @param scanner
|
* @param scanner the {@link ResultScanner} to add
|
||||||
* @return integer scanner id
|
* @return integer scanner id
|
||||||
*/
|
*/
|
||||||
protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
|
protected synchronized int addScanner(ResultScanner scanner, boolean sortColumns) {
|
||||||
int id = nextScannerId++;
|
int id = nextScannerId++;
|
||||||
ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
|
ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
|
||||||
scannerMap.put(id, resultScannerWrapper);
|
scannerMap.put(id, resultScannerWrapper);
|
||||||
|
@ -778,7 +779,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
/**
|
/**
|
||||||
* Returns the scanner associated with the specified ID.
|
* Returns the scanner associated with the specified ID.
|
||||||
*
|
*
|
||||||
* @param id
|
* @param id the ID of the scanner to get
|
||||||
* @return a Scanner, or null if ID was invalid.
|
* @return a Scanner, or null if ID was invalid.
|
||||||
*/
|
*/
|
||||||
protected synchronized ResultScannerWrapper getScanner(int id) {
|
protected synchronized ResultScannerWrapper getScanner(int id) {
|
||||||
|
@ -789,7 +790,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
* Removes the scanner associated with the specified ID from the internal
|
* Removes the scanner associated with the specified ID from the internal
|
||||||
* id->scanner hash-map.
|
* id->scanner hash-map.
|
||||||
*
|
*
|
||||||
* @param id
|
* @param id the ID of the scanner to remove
|
||||||
* @return a Scanner, or null if ID was invalid.
|
* @return a Scanner, or null if ID was invalid.
|
||||||
*/
|
*/
|
||||||
protected synchronized ResultScannerWrapper removeScanner(int id) {
|
protected synchronized ResultScannerWrapper removeScanner(int id) {
|
||||||
|
@ -1116,9 +1117,9 @@ public class ThriftServerRunner implements Runnable {
|
||||||
for(ByteBuffer column : columns) {
|
for(ByteBuffer column : columns) {
|
||||||
byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
|
byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
|
||||||
if (famAndQf.length == 1) {
|
if (famAndQf.length == 1) {
|
||||||
get.addFamily(famAndQf[0]);
|
get.addFamily(famAndQf[0]);
|
||||||
} else {
|
} else {
|
||||||
get.addColumn(famAndQf[0], famAndQf[1]);
|
get.addColumn(famAndQf[0], famAndQf[1]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
get.setTimeRange(0, timestamp);
|
get.setTimeRange(0, timestamp);
|
||||||
|
@ -1361,10 +1362,12 @@ public class ThriftServerRunner implements Runnable {
|
||||||
put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
|
put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!delete.isEmpty())
|
if (!delete.isEmpty()) {
|
||||||
table.delete(delete);
|
table.delete(delete);
|
||||||
if (!put.isEmpty())
|
}
|
||||||
|
if (!put.isEmpty()) {
|
||||||
table.put(put);
|
table.put(put);
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn(e.getMessage(), e);
|
LOG.warn(e.getMessage(), e);
|
||||||
throw getIOError(e);
|
throw getIOError(e);
|
||||||
|
@ -1434,19 +1437,23 @@ public class ThriftServerRunner implements Runnable {
|
||||||
put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
|
put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!delete.isEmpty())
|
if (!delete.isEmpty()) {
|
||||||
deletes.add(delete);
|
deletes.add(delete);
|
||||||
if (!put.isEmpty())
|
}
|
||||||
|
if (!put.isEmpty()) {
|
||||||
puts.add(put);
|
puts.add(put);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Table table = null;
|
Table table = null;
|
||||||
try {
|
try {
|
||||||
table = getTable(tableName);
|
table = getTable(tableName);
|
||||||
if (!puts.isEmpty())
|
if (!puts.isEmpty()) {
|
||||||
table.put(puts);
|
table.put(puts);
|
||||||
if (!deletes.isEmpty())
|
}
|
||||||
|
if (!deletes.isEmpty()) {
|
||||||
table.delete(deletes);
|
table.delete(deletes);
|
||||||
|
}
|
||||||
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn(e.getMessage(), e);
|
LOG.warn(e.getMessage(), e);
|
||||||
|
@ -1762,8 +1769,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void closeTable(Table table) throws IOError
|
private void closeTable(Table table) throws IOError {
|
||||||
{
|
|
||||||
try{
|
try{
|
||||||
if(table != null){
|
if(table != null){
|
||||||
table.close();
|
table.close();
|
||||||
|
@ -1885,7 +1891,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
LOG.warn(e.getMessage(), e);
|
LOG.warn(e.getMessage(), e);
|
||||||
throw getIOError(e);
|
throw getIOError(e);
|
||||||
} finally{
|
} finally{
|
||||||
closeTable(table);
|
closeTable(table);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1932,7 +1938,7 @@ public class ThriftServerRunner implements Runnable {
|
||||||
LOG.warn(e.getMessage(), e);
|
LOG.warn(e.getMessage(), e);
|
||||||
throw new IllegalArgument(Throwables.getStackTraceAsString(e));
|
throw new IllegalArgument(Throwables.getStackTraceAsString(e));
|
||||||
} finally {
|
} finally {
|
||||||
closeTable(table);
|
closeTable(table);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,16 +46,17 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ThriftUtilities {
|
public final class ThriftUtilities {
|
||||||
|
private ThriftUtilities() {
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This utility method creates a new Hbase HColumnDescriptor object based on a
|
* This utility method creates a new Hbase HColumnDescriptor object based on a
|
||||||
* Thrift ColumnDescriptor "struct".
|
* Thrift ColumnDescriptor "struct".
|
||||||
*
|
*
|
||||||
* @param in
|
* @param in Thrift ColumnDescriptor object
|
||||||
* Thrift ColumnDescriptor object
|
|
||||||
* @return HColumnDescriptor
|
* @return HColumnDescriptor
|
||||||
* @throws IllegalArgument
|
* @throws IllegalArgument if the column name is empty
|
||||||
*/
|
*/
|
||||||
static public HColumnDescriptor colDescFromThrift(ColumnDescriptor in)
|
static public HColumnDescriptor colDescFromThrift(ColumnDescriptor in)
|
||||||
throws IllegalArgument {
|
throws IllegalArgument {
|
||||||
|
@ -150,31 +151,35 @@ public class ThriftUtilities {
|
||||||
*/
|
*/
|
||||||
static public List<TRowResult> rowResultFromHBase(Result[] in, boolean sortColumns) {
|
static public List<TRowResult> rowResultFromHBase(Result[] in, boolean sortColumns) {
|
||||||
List<TRowResult> results = new ArrayList<>(in.length);
|
List<TRowResult> results = new ArrayList<>(in.length);
|
||||||
for ( Result result_ : in) {
|
for (Result result_ : in) {
|
||||||
if(result_ == null || result_.isEmpty()) {
|
if(result_ == null || result_.isEmpty()) {
|
||||||
continue;
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
TRowResult result = new TRowResult();
|
||||||
|
result.row = ByteBuffer.wrap(result_.getRow());
|
||||||
|
|
||||||
|
if (sortColumns) {
|
||||||
|
result.sortedColumns = new ArrayList<>();
|
||||||
|
for (Cell kv : result_.rawCells()) {
|
||||||
|
result.sortedColumns.add(new TColumn(
|
||||||
|
ByteBuffer.wrap(CellUtil.makeColumn(CellUtil.cloneFamily(kv),
|
||||||
|
CellUtil.cloneQualifier(kv))),
|
||||||
|
new TCell(ByteBuffer.wrap(CellUtil.cloneValue(kv)), kv.getTimestamp())));
|
||||||
}
|
}
|
||||||
TRowResult result = new TRowResult();
|
} else {
|
||||||
result.row = ByteBuffer.wrap(result_.getRow());
|
result.columns = new TreeMap<>();
|
||||||
if (sortColumns) {
|
for (Cell kv : result_.rawCells()) {
|
||||||
result.sortedColumns = new ArrayList<>();
|
result.columns.put(
|
||||||
for (Cell kv : result_.rawCells()) {
|
ByteBuffer.wrap(CellUtil.makeColumn(CellUtil.cloneFamily(kv),
|
||||||
result.sortedColumns.add(new TColumn(
|
CellUtil.cloneQualifier(kv))),
|
||||||
ByteBuffer.wrap(CellUtil.makeColumn(CellUtil.cloneFamily(kv),
|
new TCell(ByteBuffer.wrap(CellUtil.cloneValue(kv)), kv.getTimestamp()));
|
||||||
CellUtil.cloneQualifier(kv))),
|
|
||||||
new TCell(ByteBuffer.wrap(CellUtil.cloneValue(kv)), kv.getTimestamp())));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
result.columns = new TreeMap<>();
|
|
||||||
for (Cell kv : result_.rawCells()) {
|
|
||||||
result.columns.put(
|
|
||||||
ByteBuffer.wrap(CellUtil.makeColumn(CellUtil.cloneFamily(kv),
|
|
||||||
CellUtil.cloneQualifier(kv))),
|
|
||||||
new TCell(ByteBuffer.wrap(CellUtil.cloneValue(kv)), kv.getTimestamp()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
results.add(result);
|
results.add(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,7 +209,11 @@ public class ThriftUtilities {
|
||||||
public static Increment incrementFromThrift(TIncrement tincrement) {
|
public static Increment incrementFromThrift(TIncrement tincrement) {
|
||||||
Increment inc = new Increment(tincrement.getRow());
|
Increment inc = new Increment(tincrement.getRow());
|
||||||
byte[][] famAndQf = CellUtil.parseColumn(tincrement.getColumn());
|
byte[][] famAndQf = CellUtil.parseColumn(tincrement.getColumn());
|
||||||
if (famAndQf.length != 2) return null;
|
|
||||||
|
if (famAndQf.length != 2) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
inc.addColumn(famAndQf[0], famAndQf[1], tincrement.getAmmount());
|
inc.addColumn(famAndQf[0], famAndQf[1], tincrement.getAmmount());
|
||||||
return inc;
|
return inc;
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,8 +89,8 @@ import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ThriftServer - this class starts up a Thrift server which implements the HBase API specified in the
|
* ThriftServer - this class starts up a Thrift server which implements the HBase API specified in
|
||||||
* HbaseClient.thrift IDL file.
|
* the HbaseClient.thrift IDL file.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
|
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
|
||||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||||
|
@ -155,11 +155,14 @@ public class ThriftServer extends Configured implements Tool {
|
||||||
options.addOption("ro", "readonly", false,
|
options.addOption("ro", "readonly", false,
|
||||||
"Respond only to read method requests [default: false]");
|
"Respond only to read method requests [default: false]");
|
||||||
OptionGroup servers = new OptionGroup();
|
OptionGroup servers = new OptionGroup();
|
||||||
servers.addOption(
|
servers.addOption(new Option("nonblocking", false,
|
||||||
new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
|
"Use the TNonblockingServer. This implies the framed transport."));
|
||||||
servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
|
servers.addOption(new Option("hsha", false,
|
||||||
servers.addOption(new Option("selector", false, "Use the TThreadedSelectorServer. This implies the framed transport."));
|
"Use the THsHaServer. This implies the framed transport."));
|
||||||
servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
|
servers.addOption(new Option("selector", false,
|
||||||
|
"Use the TThreadedSelectorServer. This implies the framed transport."));
|
||||||
|
servers.addOption(new Option("threadpool", false,
|
||||||
|
"Use the TThreadPoolServer. This is the default."));
|
||||||
options.addOptionGroup(servers);
|
options.addOptionGroup(servers);
|
||||||
return options;
|
return options;
|
||||||
}
|
}
|
||||||
|
@ -243,8 +246,9 @@ public class ThriftServer extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, TProcessor processor,
|
private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory,
|
||||||
TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
|
TProcessor processor, TTransportFactory transportFactory, InetSocketAddress inetSocketAddress)
|
||||||
|
throws TTransportException {
|
||||||
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
|
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
|
||||||
log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
|
log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
|
||||||
TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
|
TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
|
||||||
|
@ -284,10 +288,10 @@ public class ThriftServer extends Configured implements Tool {
|
||||||
log.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
|
log.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
|
||||||
TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport);
|
TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport);
|
||||||
if (workerThreads > 0) {
|
if (workerThreads > 0) {
|
||||||
serverArgs.workerThreads(workerThreads);
|
serverArgs.workerThreads(workerThreads);
|
||||||
}
|
}
|
||||||
if (selectorThreads > 0) {
|
if (selectorThreads > 0) {
|
||||||
serverArgs.selectorThreads(selectorThreads);
|
serverArgs.selectorThreads(selectorThreads);
|
||||||
}
|
}
|
||||||
|
|
||||||
ExecutorService executorService = createExecutor(
|
ExecutorService executorService = createExecutor(
|
||||||
|
@ -378,31 +382,18 @@ public class ThriftServer extends Configured implements Tool {
|
||||||
@Override
|
@Override
|
||||||
public int run(String[] args) throws Exception {
|
public int run(String[] args) throws Exception {
|
||||||
final Configuration conf = getConf();
|
final Configuration conf = getConf();
|
||||||
TServer server = null;
|
|
||||||
Options options = getOptions();
|
Options options = getOptions();
|
||||||
CommandLine cmd = parseArguments(conf, options, args);
|
CommandLine cmd = parseArguments(conf, options, args);
|
||||||
int workerThreads = 0;
|
int workerThreads = 0;
|
||||||
int selectorThreads = 0;
|
int selectorThreads = 0;
|
||||||
int maxCallQueueSize = -1; // use unbounded queue by default
|
int maxCallQueueSize = -1; // use unbounded queue by default
|
||||||
|
|
||||||
/**
|
if (checkArguments(cmd)) {
|
||||||
* This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase
|
|
||||||
* should print the help if no argument is provided
|
|
||||||
*/
|
|
||||||
List<?> argList = cmd.getArgList();
|
|
||||||
if (cmd.hasOption("help") || !argList.contains("start") || argList.contains("stop")) {
|
|
||||||
printUsage();
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get address to bind
|
// Get address to bind
|
||||||
String bindAddress;
|
String bindAddress = getBindAddress(conf, cmd);
|
||||||
if (cmd.hasOption("bind")) {
|
|
||||||
bindAddress = cmd.getOptionValue("bind");
|
|
||||||
conf.set("hbase.thrift.info.bindAddress", bindAddress);
|
|
||||||
} else {
|
|
||||||
bindAddress = conf.get("hbase.thrift.info.bindAddress");
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if server should only process read requests, if so override the conf
|
// check if server should only process read requests, if so override the conf
|
||||||
if (cmd.hasOption("readonly")) {
|
if (cmd.hasOption("readonly")) {
|
||||||
|
@ -413,35 +404,13 @@ public class ThriftServer extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get read timeout
|
// Get read timeout
|
||||||
int readTimeout = THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT;
|
int readTimeout = getReadTimeout(conf, cmd);
|
||||||
if (cmd.hasOption(READ_TIMEOUT_OPTION)) {
|
|
||||||
try {
|
|
||||||
readTimeout = Integer.parseInt(cmd.getOptionValue(READ_TIMEOUT_OPTION));
|
|
||||||
} catch (NumberFormatException e) {
|
|
||||||
throw new RuntimeException("Could not parse the value provided for the timeout option", e);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
|
|
||||||
THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get port to bind to
|
// Get port to bind to
|
||||||
int listenPort = 0;
|
int listenPort = getListenPort(conf, cmd);
|
||||||
try {
|
|
||||||
if (cmd.hasOption("port")) {
|
|
||||||
listenPort = Integer.parseInt(cmd.getOptionValue("port"));
|
|
||||||
} else {
|
|
||||||
listenPort = conf.getInt("hbase.regionserver.thrift.port", DEFAULT_LISTEN_PORT);
|
|
||||||
}
|
|
||||||
} catch (NumberFormatException e) {
|
|
||||||
throw new RuntimeException("Could not parse the value provided for the port option", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Thrift's implementation uses '0' as a placeholder for 'use the default.'
|
// Thrift's implementation uses '0' as a placeholder for 'use the default.'
|
||||||
int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
|
int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
|
||||||
|
|
||||||
// Local hostname and user name,
|
// Local hostname and user name, used only if QOP is configured.
|
||||||
// used only if QOP is configured.
|
|
||||||
String host = null;
|
String host = null;
|
||||||
String name = null;
|
String name = null;
|
||||||
|
|
||||||
|
@ -453,8 +422,7 @@ public class ThriftServer extends Configured implements Tool {
|
||||||
host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
|
host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
|
||||||
conf.get("hbase.thrift.dns.interface", "default"),
|
conf.get("hbase.thrift.dns.interface", "default"),
|
||||||
conf.get("hbase.thrift.dns.nameserver", "default")));
|
conf.get("hbase.thrift.dns.nameserver", "default")));
|
||||||
userProvider.login("hbase.thrift.keytab.file",
|
userProvider.login("hbase.thrift.keytab.file", "hbase.thrift.kerberos.principal", host);
|
||||||
"hbase.thrift.kerberos.principal", host);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
UserGroupInformation realUser = userProvider.getCurrent().getUGI();
|
UserGroupInformation realUser = userProvider.getCurrent().getUGI();
|
||||||
|
@ -463,12 +431,10 @@ public class ThriftServer extends Configured implements Tool {
|
||||||
if (stringQop != null) {
|
if (stringQop != null) {
|
||||||
qop = SaslUtil.getQop(stringQop);
|
qop = SaslUtil.getQop(stringQop);
|
||||||
if (!securityEnabled) {
|
if (!securityEnabled) {
|
||||||
throw new IOException("Thrift server must"
|
throw new IOException("Thrift server must run in secure mode to support authentication");
|
||||||
+ " run in secure mode to support authentication");
|
|
||||||
}
|
}
|
||||||
// Extract the name from the principal
|
// Extract the name from the principal
|
||||||
name = SecurityUtil.getUserFromPrincipal(
|
name = SecurityUtil.getUserFromPrincipal(conf.get("hbase.thrift.kerberos.principal"));
|
||||||
conf.get("hbase.thrift.kerberos.principal"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean nonblocking = cmd.hasOption("nonblocking");
|
boolean nonblocking = cmd.hasOption("nonblocking");
|
||||||
|
@ -478,14 +444,7 @@ public class ThriftServer extends Configured implements Tool {
|
||||||
ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
|
ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
|
||||||
final JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(conf, metrics.getSource());
|
final JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(conf, metrics.getSource());
|
||||||
|
|
||||||
String implType = "threadpool";
|
String implType = getImplType(nonblocking, hsha, selector);
|
||||||
if (nonblocking) {
|
|
||||||
implType = "nonblocking";
|
|
||||||
} else if (hsha) {
|
|
||||||
implType = "hsha";
|
|
||||||
} else if (selector) {
|
|
||||||
implType = "selector";
|
|
||||||
}
|
|
||||||
|
|
||||||
conf.set("hbase.regionserver.thrift.server.type", implType);
|
conf.set("hbase.regionserver.thrift.server.type", implType);
|
||||||
conf.setInt("hbase.regionserver.thrift.port", listenPort);
|
conf.setInt("hbase.regionserver.thrift.port", listenPort);
|
||||||
|
@ -549,49 +508,12 @@ public class ThriftServer extends Configured implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put up info server.
|
// Put up info server.
|
||||||
int port = conf.getInt("hbase.thrift.info.port", 9095);
|
startInfoServer(conf);
|
||||||
if (port >= 0) {
|
|
||||||
conf.setLong("startcode", System.currentTimeMillis());
|
|
||||||
String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
|
|
||||||
InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
|
|
||||||
infoServer.setAttribute("hbase.conf", conf);
|
|
||||||
infoServer.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nonblocking) {
|
final TServer tserver = getServer(workerThreads, selectorThreads, maxCallQueueSize, readTimeout,
|
||||||
server = getTNonBlockingServer(protocolFactory,
|
backlog, nonblocking, hsha, selector, metrics, protocolFactory, processor,
|
||||||
processor,
|
transportFactory, inetSocketAddress);
|
||||||
transportFactory,
|
|
||||||
inetSocketAddress);
|
|
||||||
} else if (hsha) {
|
|
||||||
server = getTHsHaServer(protocolFactory,
|
|
||||||
processor,
|
|
||||||
transportFactory,
|
|
||||||
workerThreads,
|
|
||||||
maxCallQueueSize,
|
|
||||||
inetSocketAddress,
|
|
||||||
metrics);
|
|
||||||
} else if (selector) {
|
|
||||||
server = getTThreadedSelectorServer(protocolFactory,
|
|
||||||
processor,
|
|
||||||
transportFactory,
|
|
||||||
workerThreads,
|
|
||||||
selectorThreads,
|
|
||||||
maxCallQueueSize,
|
|
||||||
inetSocketAddress,
|
|
||||||
metrics);
|
|
||||||
} else {
|
|
||||||
server = getTThreadPoolServer(protocolFactory,
|
|
||||||
processor,
|
|
||||||
transportFactory,
|
|
||||||
workerThreads,
|
|
||||||
inetSocketAddress,
|
|
||||||
backlog,
|
|
||||||
readTimeout,
|
|
||||||
metrics);
|
|
||||||
}
|
|
||||||
|
|
||||||
final TServer tserver = server;
|
|
||||||
realUser.doAs(
|
realUser.doAs(
|
||||||
new PrivilegedAction<Object>() {
|
new PrivilegedAction<Object>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -608,4 +530,106 @@ public class ThriftServer extends Configured implements Tool {
|
||||||
// when tserver.stop eventually happens we'll get here.
|
// when tserver.stop eventually happens we'll get here.
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String getImplType(boolean nonblocking, boolean hsha, boolean selector) {
|
||||||
|
String implType = "threadpool";
|
||||||
|
|
||||||
|
if (nonblocking) {
|
||||||
|
implType = "nonblocking";
|
||||||
|
} else if (hsha) {
|
||||||
|
implType = "hsha";
|
||||||
|
} else if (selector) {
|
||||||
|
implType = "selector";
|
||||||
|
}
|
||||||
|
|
||||||
|
return implType;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean checkArguments(CommandLine cmd) {
|
||||||
|
/*
|
||||||
|
* This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and
|
||||||
|
* "stop" arguments hbase should print the help if no argument is provided
|
||||||
|
*/
|
||||||
|
List<?> argList = cmd.getArgList();
|
||||||
|
if (cmd.hasOption("help") || !argList.contains("start") || argList.contains("stop")) {
|
||||||
|
printUsage();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getBindAddress(Configuration conf, CommandLine cmd) {
|
||||||
|
String bindAddress;
|
||||||
|
if (cmd.hasOption("bind")) {
|
||||||
|
bindAddress = cmd.getOptionValue("bind");
|
||||||
|
conf.set("hbase.thrift.info.bindAddress", bindAddress);
|
||||||
|
} else {
|
||||||
|
bindAddress = conf.get("hbase.thrift.info.bindAddress");
|
||||||
|
}
|
||||||
|
return bindAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getListenPort(Configuration conf, CommandLine cmd) {
|
||||||
|
int listenPort;
|
||||||
|
try {
|
||||||
|
if (cmd.hasOption("port")) {
|
||||||
|
listenPort = Integer.parseInt(cmd.getOptionValue("port"));
|
||||||
|
} else {
|
||||||
|
listenPort = conf.getInt("hbase.regionserver.thrift.port", DEFAULT_LISTEN_PORT);
|
||||||
|
}
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
throw new RuntimeException("Could not parse the value provided for the port option", e);
|
||||||
|
}
|
||||||
|
return listenPort;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getReadTimeout(Configuration conf, CommandLine cmd) {
|
||||||
|
int readTimeout;
|
||||||
|
if (cmd.hasOption(READ_TIMEOUT_OPTION)) {
|
||||||
|
try {
|
||||||
|
readTimeout = Integer.parseInt(cmd.getOptionValue(READ_TIMEOUT_OPTION));
|
||||||
|
} catch (NumberFormatException e) {
|
||||||
|
throw new RuntimeException("Could not parse the value provided for the timeout option", e);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
|
||||||
|
THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
|
||||||
|
}
|
||||||
|
return readTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startInfoServer(Configuration conf) throws IOException {
|
||||||
|
int port = conf.getInt("hbase.thrift.info.port", 9095);
|
||||||
|
|
||||||
|
if (port >= 0) {
|
||||||
|
conf.setLong("startcode", System.currentTimeMillis());
|
||||||
|
String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
|
||||||
|
InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
|
||||||
|
infoServer.setAttribute("hbase.conf", conf);
|
||||||
|
infoServer.start();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private TServer getServer(int workerThreads, int selectorThreads, int maxCallQueueSize,
|
||||||
|
int readTimeout, int backlog, boolean nonblocking, boolean hsha, boolean selector,
|
||||||
|
ThriftMetrics metrics, TProtocolFactory protocolFactory, TProcessor processor,
|
||||||
|
TTransportFactory transportFactory, InetSocketAddress inetSocketAddress)
|
||||||
|
throws TTransportException {
|
||||||
|
TServer server;
|
||||||
|
|
||||||
|
if (nonblocking) {
|
||||||
|
server = getTNonBlockingServer(protocolFactory, processor, transportFactory,
|
||||||
|
inetSocketAddress);
|
||||||
|
} else if (hsha) {
|
||||||
|
server = getTHsHaServer(protocolFactory, processor, transportFactory, workerThreads,
|
||||||
|
maxCallQueueSize, inetSocketAddress, metrics);
|
||||||
|
} else if (selector) {
|
||||||
|
server = getTThreadedSelectorServer(protocolFactory, processor, transportFactory,
|
||||||
|
workerThreads, selectorThreads, maxCallQueueSize, inetSocketAddress, metrics);
|
||||||
|
} else {
|
||||||
|
server = getTThreadPoolServer(protocolFactory, processor, transportFactory, workerThreads,
|
||||||
|
inetSocketAddress, backlog, readTimeout, metrics);
|
||||||
|
}
|
||||||
|
return server;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,7 +74,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ThriftUtilities {
|
public final class ThriftUtilities {
|
||||||
|
|
||||||
private ThriftUtilities() {
|
private ThriftUtilities() {
|
||||||
throw new UnsupportedOperationException("Can't initialize class");
|
throw new UnsupportedOperationException("Can't initialize class");
|
||||||
|
@ -287,35 +287,37 @@ public class ThriftUtilities {
|
||||||
for (TColumn column : in.getColumns()) {
|
for (TColumn column : in.getColumns()) {
|
||||||
if (in.isSetDeleteType()) {
|
if (in.isSetDeleteType()) {
|
||||||
switch (in.getDeleteType()) {
|
switch (in.getDeleteType()) {
|
||||||
case DELETE_COLUMN:
|
case DELETE_COLUMN:
|
||||||
if (column.isSetTimestamp()) {
|
if (column.isSetTimestamp()) {
|
||||||
out.addColumn(column.getFamily(), column.getQualifier(), column.getTimestamp());
|
out.addColumn(column.getFamily(), column.getQualifier(), column.getTimestamp());
|
||||||
} else {
|
} else {
|
||||||
out.addColumn(column.getFamily(), column.getQualifier());
|
out.addColumn(column.getFamily(), column.getQualifier());
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case DELETE_COLUMNS:
|
case DELETE_COLUMNS:
|
||||||
if (column.isSetTimestamp()) {
|
if (column.isSetTimestamp()) {
|
||||||
out.addColumns(column.getFamily(), column.getQualifier(), column.getTimestamp());
|
out.addColumns(column.getFamily(), column.getQualifier(), column.getTimestamp());
|
||||||
} else {
|
} else {
|
||||||
out.addColumns(column.getFamily(), column.getQualifier());
|
out.addColumns(column.getFamily(), column.getQualifier());
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case DELETE_FAMILY:
|
case DELETE_FAMILY:
|
||||||
if (column.isSetTimestamp()) {
|
if (column.isSetTimestamp()) {
|
||||||
out.addFamily(column.getFamily(), column.getTimestamp());
|
out.addFamily(column.getFamily(), column.getTimestamp());
|
||||||
} else {
|
} else {
|
||||||
out.addFamily(column.getFamily());
|
out.addFamily(column.getFamily());
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case DELETE_FAMILY_VERSION:
|
case DELETE_FAMILY_VERSION:
|
||||||
if (column.isSetTimestamp()) {
|
if (column.isSetTimestamp()) {
|
||||||
out.addFamilyVersion(column.getFamily(), column.getTimestamp());
|
out.addFamilyVersion(column.getFamily(), column.getTimestamp());
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"Timestamp is required for TDelete with DeleteFamilyVersion type");
|
"Timestamp is required for TDelete with DeleteFamilyVersion type");
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException("DeleteType is required for TDelete");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException("DeleteType is required for TDelete");
|
throw new IllegalArgumentException("DeleteType is required for TDelete");
|
||||||
|
@ -416,12 +418,15 @@ public class ThriftUtilities {
|
||||||
public static Scan scanFromThrift(TScan in) throws IOException {
|
public static Scan scanFromThrift(TScan in) throws IOException {
|
||||||
Scan out = new Scan();
|
Scan out = new Scan();
|
||||||
|
|
||||||
if (in.isSetStartRow())
|
if (in.isSetStartRow()) {
|
||||||
out.setStartRow(in.getStartRow());
|
out.setStartRow(in.getStartRow());
|
||||||
if (in.isSetStopRow())
|
}
|
||||||
|
if (in.isSetStopRow()) {
|
||||||
out.setStopRow(in.getStopRow());
|
out.setStopRow(in.getStopRow());
|
||||||
if (in.isSetCaching())
|
}
|
||||||
|
if (in.isSetCaching()) {
|
||||||
out.setCaching(in.getCaching());
|
out.setCaching(in.getCaching());
|
||||||
|
}
|
||||||
if (in.isSetMaxVersions()) {
|
if (in.isSetMaxVersions()) {
|
||||||
out.setMaxVersions(in.getMaxVersions());
|
out.setMaxVersions(in.getMaxVersions());
|
||||||
}
|
}
|
||||||
|
|
|
@ -109,7 +109,7 @@ public class TestCallQueue {
|
||||||
|
|
||||||
private static void verifyMetrics(ThriftMetrics metrics, String name, int expectValue)
|
private static void verifyMetrics(ThriftMetrics metrics, String name, int expectValue)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
metricsHelper.assertCounter(name, expectValue, metrics.getSource());
|
metricsHelper.assertCounter(name, expectValue, metrics.getSource());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Runnable createDummyRunnable() {
|
private static Runnable createDummyRunnable() {
|
||||||
|
|
|
@ -138,7 +138,7 @@ public class TestThriftHttpServer {
|
||||||
|
|
||||||
// wait up to 10s for the server to start
|
// wait up to 10s for the server to start
|
||||||
for (int i = 0; i < 100
|
for (int i = 0; i < 100
|
||||||
&& ( thriftServer.serverRunner == null || thriftServer.serverRunner.httpServer ==
|
&& (thriftServer.serverRunner == null || thriftServer.serverRunner.httpServer ==
|
||||||
null); i++) {
|
null); i++) {
|
||||||
Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
|
|
|
@ -101,7 +101,7 @@ public class TestThriftServer {
|
||||||
private static ByteBuffer valueBname = asByteBuffer("valueB");
|
private static ByteBuffer valueBname = asByteBuffer("valueB");
|
||||||
private static ByteBuffer valueCname = asByteBuffer("valueC");
|
private static ByteBuffer valueCname = asByteBuffer("valueC");
|
||||||
private static ByteBuffer valueDname = asByteBuffer("valueD");
|
private static ByteBuffer valueDname = asByteBuffer("valueD");
|
||||||
private static ByteBuffer valueEname = asByteBuffer(100l);
|
private static ByteBuffer valueEname = asByteBuffer(100L);
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public TestName name = new TestName();
|
public TestName name = new TestName();
|
||||||
|
@ -124,8 +124,6 @@ public class TestThriftServer {
|
||||||
* consolidate all testing to one method because HBaseClusterTestCase
|
* consolidate all testing to one method because HBaseClusterTestCase
|
||||||
* is prone to OutOfMemoryExceptions when there are three or more
|
* is prone to OutOfMemoryExceptions when there are three or more
|
||||||
* JUnit test methods.
|
* JUnit test methods.
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testAll() throws Exception {
|
public void testAll() throws Exception {
|
||||||
|
@ -147,8 +145,6 @@ public class TestThriftServer {
|
||||||
* Tests for creating, enabling, disabling, and deleting tables. Also
|
* Tests for creating, enabling, disabling, and deleting tables. Also
|
||||||
* tests that creating a table with an invalid column name yields an
|
* tests that creating a table with an invalid column name yields an
|
||||||
* IllegalArgument exception.
|
* IllegalArgument exception.
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
public void doTestTableCreateDrop() throws Exception {
|
public void doTestTableCreateDrop() throws Exception {
|
||||||
ThriftServerRunner.HBaseHandler handler =
|
ThriftServerRunner.HBaseHandler handler =
|
||||||
|
@ -181,10 +177,6 @@ public class TestThriftServer {
|
||||||
* TODO: These counts are supposed to be zero but sometimes they are not, they are equal to the
|
* TODO: These counts are supposed to be zero but sometimes they are not, they are equal to the
|
||||||
* passed in maybe. Investigate why. My guess is they are set by the test that runs just
|
* passed in maybe. Investigate why. My guess is they are set by the test that runs just
|
||||||
* previous to this one. Sometimes they are cleared. Sometimes not.
|
* previous to this one. Sometimes they are cleared. Sometimes not.
|
||||||
* @param name
|
|
||||||
* @param maybe
|
|
||||||
* @param metrics
|
|
||||||
* @return
|
|
||||||
*/
|
*/
|
||||||
private int getCurrentCount(final String name, final int maybe, final ThriftMetrics metrics) {
|
private int getCurrentCount(final String name, final int maybe, final ThriftMetrics metrics) {
|
||||||
int currentCount = 0;
|
int currentCount = 0;
|
||||||
|
@ -220,12 +212,14 @@ public class TestThriftServer {
|
||||||
handler.getTableNames(); // This will have an artificial delay.
|
handler.getTableNames(); // This will have an artificial delay.
|
||||||
|
|
||||||
// 3 to 6 seconds (to account for potential slowness), measured in nanoseconds
|
// 3 to 6 seconds (to account for potential slowness), measured in nanoseconds
|
||||||
try {
|
try {
|
||||||
metricsHelper.assertGaugeGt("getTableNames_avg_time", 3L * 1000 * 1000 * 1000, metrics.getSource());
|
metricsHelper.assertGaugeGt("getTableNames_avg_time", 3L * 1000 * 1000 * 1000,
|
||||||
metricsHelper.assertGaugeLt("getTableNames_avg_time",6L * 1000 * 1000 * 1000, metrics.getSource());
|
metrics.getSource());
|
||||||
} catch (AssertionError e) {
|
metricsHelper.assertGaugeLt("getTableNames_avg_time",6L * 1000 * 1000 * 1000,
|
||||||
LOG.info("Fix me! Why does this happen? A concurrent cluster running?", e);
|
metrics.getSource());
|
||||||
}
|
} catch (AssertionError e) {
|
||||||
|
LOG.info("Fix me! Why does this happen? A concurrent cluster running?", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Hbase.Iface getHandlerForMetricsTest(ThriftMetrics metrics, Configuration conf)
|
private static Hbase.Iface getHandlerForMetricsTest(ThriftMetrics metrics, Configuration conf)
|
||||||
|
@ -235,7 +229,7 @@ public class TestThriftServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ThriftMetrics getMetrics(Configuration conf) throws Exception {
|
private static ThriftMetrics getMetrics(Configuration conf) throws Exception {
|
||||||
return new ThriftMetrics( conf, ThriftMetrics.ThriftServerType.ONE);
|
return new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -300,9 +294,11 @@ public class TestThriftServer {
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
long lv = handler.get(tableAname, rowAname, columnAname, null).get(0).value.getLong();
|
long lv = handler.get(tableAname, rowAname, columnAname, null).get(0).value.getLong();
|
||||||
// Wait on all increments being flushed
|
// Wait on all increments being flushed
|
||||||
while (handler.coalescer.getQueueSize() != 0) Threads.sleep(10);
|
while (handler.coalescer.getQueueSize() != 0) {
|
||||||
assertEquals((100 + (2 * numIncrements)), lv );
|
Threads.sleep(10);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals((100 + (2 * numIncrements)), lv);
|
||||||
|
|
||||||
lv = handler.get(tableAname, rowBname, columnAAname, null).get(0).value.getLong();
|
lv = handler.get(tableAname, rowBname, columnAAname, null).get(0).value.getLong();
|
||||||
assertEquals((100 + (3 * 7 * numIncrements)), lv);
|
assertEquals((100 + (3 * 7 * numIncrements)), lv);
|
||||||
|
@ -315,8 +311,6 @@ public class TestThriftServer {
|
||||||
* Tests adding a series of Mutations and BatchMutations, including a
|
* Tests adding a series of Mutations and BatchMutations, including a
|
||||||
* delete mutation. Also tests data retrieval, and getting back multiple
|
* delete mutation. Also tests data retrieval, and getting back multiple
|
||||||
* versions.
|
* versions.
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
public void doTestTableMutations() throws Exception {
|
public void doTestTableMutations() throws Exception {
|
||||||
ThriftServerRunner.HBaseHandler handler =
|
ThriftServerRunner.HBaseHandler handler =
|
||||||
|
@ -393,8 +387,6 @@ public class TestThriftServer {
|
||||||
* Similar to testTableMutations(), except Mutations are applied with
|
* Similar to testTableMutations(), except Mutations are applied with
|
||||||
* specific timestamps and data retrieval uses these timestamps to
|
* specific timestamps and data retrieval uses these timestamps to
|
||||||
* extract specific versions of data.
|
* extract specific versions of data.
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
public void doTestTableTimestampsAndColumns() throws Exception {
|
public void doTestTableTimestampsAndColumns() throws Exception {
|
||||||
// Setup
|
// Setup
|
||||||
|
@ -473,8 +465,6 @@ public class TestThriftServer {
|
||||||
/**
|
/**
|
||||||
* Tests the four different scanner-opening methods (with and without
|
* Tests the four different scanner-opening methods (with and without
|
||||||
* a stoprow, with and without a timestamp).
|
* a stoprow, with and without a timestamp).
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
public void doTestTableScanners() throws Exception {
|
public void doTestTableScanners() throws Exception {
|
||||||
// Setup
|
// Setup
|
||||||
|
@ -515,7 +505,8 @@ public class TestThriftServer {
|
||||||
closeScanner(scanner1, handler);
|
closeScanner(scanner1, handler);
|
||||||
|
|
||||||
// Test a scanner on all rows and all columns, with timestamp
|
// Test a scanner on all rows and all columns, with timestamp
|
||||||
int scanner2 = handler.scannerOpenTs(tableAname, rowAname, getColumnList(true, true), time1, null);
|
int scanner2 = handler.scannerOpenTs(tableAname, rowAname, getColumnList(true, true), time1,
|
||||||
|
null);
|
||||||
TRowResult rowResult2a = handler.scannerGet(scanner2).get(0);
|
TRowResult rowResult2a = handler.scannerGet(scanner2).get(0);
|
||||||
assertEquals(rowResult2a.columns.size(), 1);
|
assertEquals(rowResult2a.columns.size(), 1);
|
||||||
// column A deleted, does not exist.
|
// column A deleted, does not exist.
|
||||||
|
@ -594,8 +585,6 @@ public class TestThriftServer {
|
||||||
/**
|
/**
|
||||||
* For HBASE-2556
|
* For HBASE-2556
|
||||||
* Tests for GetTableRegions
|
* Tests for GetTableRegions
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
public void doTestGetTableRegions() throws Exception {
|
public void doTestGetTableRegions() throws Exception {
|
||||||
ThriftServerRunner.HBaseHandler handler =
|
ThriftServerRunner.HBaseHandler handler =
|
||||||
|
@ -659,8 +648,6 @@ public class TestThriftServer {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Appends the value to a cell and checks that the cell value is updated properly.
|
* Appends the value to a cell and checks that the cell value is updated properly.
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
public static void doTestAppend() throws Exception {
|
public static void doTestAppend() throws Exception {
|
||||||
ThriftServerRunner.HBaseHandler handler =
|
ThriftServerRunner.HBaseHandler handler =
|
||||||
|
@ -693,8 +680,6 @@ public class TestThriftServer {
|
||||||
/**
|
/**
|
||||||
* Check that checkAndPut fails if the cell does not exist, then put in the cell, then check that
|
* Check that checkAndPut fails if the cell does not exist, then put in the cell, then check that
|
||||||
* the checkAndPut succeeds.
|
* the checkAndPut succeeds.
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
public static void doTestCheckAndPut() throws Exception {
|
public static void doTestCheckAndPut() throws Exception {
|
||||||
ThriftServerRunner.HBaseHandler handler =
|
ThriftServerRunner.HBaseHandler handler =
|
||||||
|
@ -791,9 +776,8 @@ public class TestThriftServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
|
||||||
* @return a List of ColumnDescriptors for use in creating a table. Has one
|
* @return a List of ColumnDescriptors for use in creating a table. Has one
|
||||||
* default ColumnDescriptor and one ColumnDescriptor with fewer versions
|
* default ColumnDescriptor and one ColumnDescriptor with fewer versions
|
||||||
*/
|
*/
|
||||||
private static List<ColumnDescriptor> getColumnDescriptors() {
|
private static List<ColumnDescriptor> getColumnDescriptors() {
|
||||||
ArrayList<ColumnDescriptor> cDescriptors = new ArrayList<>(2);
|
ArrayList<ColumnDescriptor> cDescriptors = new ArrayList<>(2);
|
||||||
|
@ -819,15 +803,20 @@ public class TestThriftServer {
|
||||||
*/
|
*/
|
||||||
private List<ByteBuffer> getColumnList(boolean includeA, boolean includeB) {
|
private List<ByteBuffer> getColumnList(boolean includeA, boolean includeB) {
|
||||||
List<ByteBuffer> columnList = new ArrayList<>();
|
List<ByteBuffer> columnList = new ArrayList<>();
|
||||||
if (includeA) columnList.add(columnAname);
|
|
||||||
if (includeB) columnList.add(columnBname);
|
if (includeA) {
|
||||||
|
columnList.add(columnAname);
|
||||||
|
}
|
||||||
|
if (includeB) {
|
||||||
|
columnList.add(columnBname);
|
||||||
|
}
|
||||||
|
|
||||||
return columnList;
|
return columnList;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
|
||||||
* @return a List of Mutations for a row, with columnA having valueA
|
* @return a List of Mutations for a row, with columnA having valueA
|
||||||
* and columnB having valueB
|
* and columnB having valueB
|
||||||
*/
|
*/
|
||||||
private static List<Mutation> getMutations() {
|
private static List<Mutation> getMutations() {
|
||||||
List<Mutation> mutations = new ArrayList<>(2);
|
List<Mutation> mutations = new ArrayList<>(2);
|
||||||
|
@ -837,12 +826,11 @@ public class TestThriftServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
|
||||||
* @return a List of BatchMutations with the following effects:
|
* @return a List of BatchMutations with the following effects:
|
||||||
* (rowA, columnA): delete
|
* (rowA, columnA): delete
|
||||||
* (rowA, columnB): place valueC
|
* (rowA, columnB): place valueC
|
||||||
* (rowB, columnA): place valueC
|
* (rowB, columnA): place valueC
|
||||||
* (rowB, columnB): place valueD
|
* (rowB, columnB): place valueD
|
||||||
*/
|
*/
|
||||||
private static List<BatchMutation> getBatchMutations() {
|
private static List<BatchMutation> getBatchMutations() {
|
||||||
List<BatchMutation> batchMutations = new ArrayList<>(3);
|
List<BatchMutation> batchMutations = new ArrayList<>(3);
|
||||||
|
@ -871,7 +859,6 @@ public class TestThriftServer {
|
||||||
*
|
*
|
||||||
* @param scannerId the scanner to close
|
* @param scannerId the scanner to close
|
||||||
* @param handler the HBaseHandler interfacing to HBase
|
* @param handler the HBaseHandler interfacing to HBase
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
private void closeScanner(
|
private void closeScanner(
|
||||||
int scannerId, ThriftServerRunner.HBaseHandler handler) throws Exception {
|
int scannerId, ThriftServerRunner.HBaseHandler handler) throws Exception {
|
||||||
|
|
|
@ -89,8 +89,9 @@ public class TestThriftServerCmdLine {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
for (boolean specifyCompact : new boolean[] {false, true}) {
|
for (boolean specifyCompact : new boolean[] {false, true}) {
|
||||||
parameters.add(new Object[]{implType, specifyFramed,
|
parameters.add(new Object[] {
|
||||||
specifyBindIP, specifyCompact});
|
implType, specifyFramed, specifyBindIP, specifyCompact
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -238,7 +238,7 @@ public class TestThriftHBaseServiceHandler {
|
||||||
puts.add(new TPut(wrap(rowName2), columnValues));
|
puts.add(new TPut(wrap(rowName2), columnValues));
|
||||||
|
|
||||||
handler.putMultiple(table, puts);
|
handler.putMultiple(table, puts);
|
||||||
List<Boolean> existsResult2 = handler.existsAll(table,gets );
|
List<Boolean> existsResult2 = handler.existsAll(table, gets);
|
||||||
|
|
||||||
assertTrue(existsResult2.get(0));
|
assertTrue(existsResult2.get(0));
|
||||||
assertTrue(existsResult2.get(1));
|
assertTrue(existsResult2.get(1));
|
||||||
|
@ -590,8 +590,6 @@ public class TestThriftHBaseServiceHandler {
|
||||||
/**
|
/**
|
||||||
* check that checkAndPut fails if the cell does not exist, then put in the cell, then check
|
* check that checkAndPut fails if the cell does not exist, then put in the cell, then check
|
||||||
* that the checkAndPut succeeds.
|
* that the checkAndPut succeeds.
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testCheckAndPut() throws Exception {
|
public void testCheckAndPut() throws Exception {
|
||||||
|
@ -637,8 +635,6 @@ public class TestThriftHBaseServiceHandler {
|
||||||
/**
|
/**
|
||||||
* check that checkAndDelete fails if the cell does not exist, then put in the cell, then
|
* check that checkAndDelete fails if the cell does not exist, then put in the cell, then
|
||||||
* check that the checkAndDelete succeeds.
|
* check that the checkAndDelete succeeds.
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testCheckAndDelete() throws Exception {
|
public void testCheckAndDelete() throws Exception {
|
||||||
|
@ -733,8 +729,7 @@ public class TestThriftHBaseServiceHandler {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests keeping a HBase scanner alive for long periods of time. Each call to getScannerRow()
|
* Tests keeping a HBase scanner alive for long periods of time. Each call to getScannerRow()
|
||||||
* should reset the ConnectionCache timeout for the scanner's connection
|
* should reset the ConnectionCache timeout for the scanner's connection.
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testLongLivedScan() throws Exception {
|
public void testLongLivedScan() throws Exception {
|
||||||
|
@ -1047,7 +1042,11 @@ public class TestThriftHBaseServiceHandler {
|
||||||
*/
|
*/
|
||||||
private String pad(int n, byte pad) {
|
private String pad(int n, byte pad) {
|
||||||
String res = Integer.toString(n);
|
String res = Integer.toString(n);
|
||||||
while (res.length() < pad) res = "0" + res;
|
|
||||||
|
while (res.length() < pad) {
|
||||||
|
res = "0" + res;
|
||||||
|
}
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1179,7 +1178,7 @@ public class TestThriftHBaseServiceHandler {
|
||||||
assertArrayEquals(("testGetScannerResults" + pad(19 - i, (byte) 2)).getBytes(), results.get(i)
|
assertArrayEquals(("testGetScannerResults" + pad(19 - i, (byte) 2)).getBytes(), results.get(i)
|
||||||
.getRow());
|
.getRow());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFilterRegistration() throws Exception {
|
public void testFilterRegistration() throws Exception {
|
||||||
|
@ -1213,7 +1212,7 @@ public class TestThriftHBaseServiceHandler {
|
||||||
|
|
||||||
assertTrue(handler.exists(table, get));
|
assertTrue(handler.exists(table, get));
|
||||||
metricsHelper.assertCounter("put_num_ops", 1, metrics.getSource());
|
metricsHelper.assertCounter("put_num_ops", 1, metrics.getSource());
|
||||||
metricsHelper.assertCounter( "exists_num_ops", 2, metrics.getSource());
|
metricsHelper.assertCounter("exists_num_ops", 2, metrics.getSource());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ThriftMetrics getMetrics(Configuration conf) throws Exception {
|
private static ThriftMetrics getMetrics(Configuration conf) throws Exception {
|
||||||
|
@ -1265,7 +1264,7 @@ public class TestThriftHBaseServiceHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testExceptionType(THBaseService.Iface handler, ThriftMetrics metrics,
|
private void testExceptionType(THBaseService.Iface handler, ThriftMetrics metrics,
|
||||||
ByteBuffer tTableName, byte[] rowkey, ErrorThrowingGetObserver.ErrorType errorType) {
|
ByteBuffer tTableName, byte[] rowkey, ErrorThrowingGetObserver.ErrorType errorType) {
|
||||||
long preGetCounter = metricsHelper.getCounter("get_num_ops", metrics.getSource());
|
long preGetCounter = metricsHelper.getCounter("get_num_ops", metrics.getSource());
|
||||||
String exceptionKey = errorType.getMetricName();
|
String exceptionKey = errorType.getMetricName();
|
||||||
long preExceptionCounter = metricsHelper.checkCounterExists(exceptionKey, metrics.getSource()) ?
|
long preExceptionCounter = metricsHelper.checkCounterExists(exceptionKey, metrics.getSource()) ?
|
||||||
|
@ -1381,8 +1380,6 @@ public class TestThriftHBaseServiceHandler {
|
||||||
/**
|
/**
|
||||||
* Put valueA to a row, make sure put has happened, then create a mutation object to put valueB
|
* Put valueA to a row, make sure put has happened, then create a mutation object to put valueB
|
||||||
* and delete ValueA, then check that the row value is only valueB.
|
* and delete ValueA, then check that the row value is only valueB.
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testMutateRow() throws Exception {
|
public void testMutateRow() throws Exception {
|
||||||
|
@ -1445,8 +1442,6 @@ public class TestThriftHBaseServiceHandler {
|
||||||
* Create TPut, TDelete , TIncrement objects, set durability then call ThriftUtility
|
* Create TPut, TDelete , TIncrement objects, set durability then call ThriftUtility
|
||||||
* functions to get Put , Delete and Increment respectively. Use getDurability to make sure
|
* functions to get Put , Delete and Increment respectively. Use getDurability to make sure
|
||||||
* the returned objects have the appropriate durability setting.
|
* the returned objects have the appropriate durability setting.
|
||||||
*
|
|
||||||
* @throws Exception
|
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testDurability() throws Exception {
|
public void testDurability() throws Exception {
|
||||||
|
|
|
@ -78,389 +78,392 @@ public class TestThriftHBaseServiceHandlerWithLabels {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory
|
private static final Logger LOG = LoggerFactory
|
||||||
.getLogger(TestThriftHBaseServiceHandlerWithLabels.class);
|
.getLogger(TestThriftHBaseServiceHandlerWithLabels.class);
|
||||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
// Static names for tables, columns, rows, and values
|
// Static names for tables, columns, rows, and values
|
||||||
private static byte[] tableAname = Bytes.toBytes("tableA");
|
private static byte[] tableAname = Bytes.toBytes("tableA");
|
||||||
private static byte[] familyAname = Bytes.toBytes("familyA");
|
private static byte[] familyAname = Bytes.toBytes("familyA");
|
||||||
private static byte[] familyBname = Bytes.toBytes("familyB");
|
private static byte[] familyBname = Bytes.toBytes("familyB");
|
||||||
private static byte[] qualifierAname = Bytes.toBytes("qualifierA");
|
private static byte[] qualifierAname = Bytes.toBytes("qualifierA");
|
||||||
private static byte[] qualifierBname = Bytes.toBytes("qualifierB");
|
private static byte[] qualifierBname = Bytes.toBytes("qualifierB");
|
||||||
private static byte[] valueAname = Bytes.toBytes("valueA");
|
private static byte[] valueAname = Bytes.toBytes("valueA");
|
||||||
private static byte[] valueBname = Bytes.toBytes("valueB");
|
private static byte[] valueBname = Bytes.toBytes("valueB");
|
||||||
private static HColumnDescriptor[] families = new HColumnDescriptor[] {
|
private static HColumnDescriptor[] families = new HColumnDescriptor[] {
|
||||||
new HColumnDescriptor(familyAname).setMaxVersions(3),
|
new HColumnDescriptor(familyAname).setMaxVersions(3),
|
||||||
new HColumnDescriptor(familyBname).setMaxVersions(2) };
|
new HColumnDescriptor(familyBname).setMaxVersions(2) };
|
||||||
|
|
||||||
private final static String TOPSECRET = "topsecret";
|
private final static String TOPSECRET = "topsecret";
|
||||||
private final static String PUBLIC = "public";
|
private final static String PUBLIC = "public";
|
||||||
private final static String PRIVATE = "private";
|
private final static String PRIVATE = "private";
|
||||||
private final static String CONFIDENTIAL = "confidential";
|
private final static String CONFIDENTIAL = "confidential";
|
||||||
private final static String SECRET = "secret";
|
private final static String SECRET = "secret";
|
||||||
private static User SUPERUSER;
|
private static User SUPERUSER;
|
||||||
|
|
||||||
private static Configuration conf;
|
private static Configuration conf;
|
||||||
|
|
||||||
public void assertTColumnValuesEqual(List<TColumnValue> columnValuesA,
|
public void assertTColumnValuesEqual(List<TColumnValue> columnValuesA,
|
||||||
List<TColumnValue> columnValuesB) {
|
List<TColumnValue> columnValuesB) {
|
||||||
assertEquals(columnValuesA.size(), columnValuesB.size());
|
assertEquals(columnValuesA.size(), columnValuesB.size());
|
||||||
Comparator<TColumnValue> comparator = new Comparator<TColumnValue>() {
|
Comparator<TColumnValue> comparator = new Comparator<TColumnValue>() {
|
||||||
@Override
|
@Override
|
||||||
public int compare(TColumnValue o1, TColumnValue o2) {
|
public int compare(TColumnValue o1, TColumnValue o2) {
|
||||||
return Bytes.compareTo(Bytes.add(o1.getFamily(), o1.getQualifier()),
|
return Bytes.compareTo(Bytes.add(o1.getFamily(), o1.getQualifier()),
|
||||||
Bytes.add(o2.getFamily(), o2.getQualifier()));
|
Bytes.add(o2.getFamily(), o2.getQualifier()));
|
||||||
}
|
|
||||||
};
|
|
||||||
Collections.sort(columnValuesA, comparator);
|
|
||||||
Collections.sort(columnValuesB, comparator);
|
|
||||||
|
|
||||||
for (int i = 0; i < columnValuesA.size(); i++) {
|
|
||||||
TColumnValue a = columnValuesA.get(i);
|
|
||||||
TColumnValue b = columnValuesB.get(i);
|
|
||||||
assertArrayEquals(a.getFamily(), b.getFamily());
|
|
||||||
assertArrayEquals(a.getQualifier(), b.getQualifier());
|
|
||||||
assertArrayEquals(a.getValue(), b.getValue());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void beforeClass() throws Exception {
|
|
||||||
SUPERUSER = User.createUserForTesting(conf, "admin",
|
|
||||||
new String[] { "supergroup" });
|
|
||||||
conf = UTIL.getConfiguration();
|
|
||||||
conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS,
|
|
||||||
SimpleScanLabelGenerator.class, ScanLabelGenerator.class);
|
|
||||||
conf.set("hbase.superuser", SUPERUSER.getShortName());
|
|
||||||
conf.set("hbase.coprocessor.master.classes",
|
|
||||||
VisibilityController.class.getName());
|
|
||||||
conf.set("hbase.coprocessor.region.classes",
|
|
||||||
VisibilityController.class.getName());
|
|
||||||
conf.setInt("hfile.format.version", 3);
|
|
||||||
UTIL.startMiniCluster(1);
|
|
||||||
// Wait for the labels table to become available
|
|
||||||
UTIL.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME.getName(), 50000);
|
|
||||||
createLabels();
|
|
||||||
Admin admin = UTIL.getAdmin();
|
|
||||||
HTableDescriptor tableDescriptor = new HTableDescriptor(
|
|
||||||
TableName.valueOf(tableAname));
|
|
||||||
for (HColumnDescriptor family : families) {
|
|
||||||
tableDescriptor.addFamily(family);
|
|
||||||
}
|
|
||||||
admin.createTable(tableDescriptor);
|
|
||||||
admin.close();
|
|
||||||
setAuths();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void createLabels() throws IOException, InterruptedException {
|
|
||||||
PrivilegedExceptionAction<VisibilityLabelsResponse> action =
|
|
||||||
new PrivilegedExceptionAction<VisibilityLabelsResponse>() {
|
|
||||||
public VisibilityLabelsResponse run() throws Exception {
|
|
||||||
String[] labels = { SECRET, CONFIDENTIAL, PRIVATE, PUBLIC, TOPSECRET };
|
|
||||||
try (Connection conn = ConnectionFactory.createConnection(conf)) {
|
|
||||||
VisibilityClient.addLabels(conn, labels);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
throw new IOException(t);
|
|
||||||
}
|
}
|
||||||
return null;
|
};
|
||||||
}
|
Collections.sort(columnValuesA, comparator);
|
||||||
};
|
Collections.sort(columnValuesB, comparator);
|
||||||
SUPERUSER.runAs(action);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void setAuths() throws IOException {
|
for (int i = 0; i < columnValuesA.size(); i++) {
|
||||||
String[] labels = { SECRET, CONFIDENTIAL, PRIVATE, PUBLIC, TOPSECRET };
|
TColumnValue a = columnValuesA.get(i);
|
||||||
try {
|
TColumnValue b = columnValuesB.get(i);
|
||||||
VisibilityClient.setAuths(UTIL.getConnection(), labels, User.getCurrent().getShortName());
|
assertArrayEquals(a.getFamily(), b.getFamily());
|
||||||
} catch (Throwable t) {
|
assertArrayEquals(a.getQualifier(), b.getQualifier());
|
||||||
throw new IOException(t);
|
assertArrayEquals(a.getValue(), b.getValue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
@BeforeClass
|
||||||
public static void afterClass() throws Exception {
|
public static void beforeClass() throws Exception {
|
||||||
UTIL.shutdownMiniCluster();
|
SUPERUSER = User.createUserForTesting(conf, "admin",
|
||||||
}
|
new String[] { "supergroup" });
|
||||||
|
conf = UTIL.getConfiguration();
|
||||||
@Before
|
conf.setClass(VisibilityUtils.VISIBILITY_LABEL_GENERATOR_CLASS,
|
||||||
public void setup() throws Exception {
|
SimpleScanLabelGenerator.class, ScanLabelGenerator.class);
|
||||||
|
conf.set("hbase.superuser", SUPERUSER.getShortName());
|
||||||
}
|
conf.set("hbase.coprocessor.master.classes",
|
||||||
|
VisibilityController.class.getName());
|
||||||
private ThriftHBaseServiceHandler createHandler() throws IOException {
|
conf.set("hbase.coprocessor.region.classes",
|
||||||
return new ThriftHBaseServiceHandler(conf, UserProvider.instantiate(conf));
|
VisibilityController.class.getName());
|
||||||
}
|
conf.setInt("hfile.format.version", 3);
|
||||||
|
UTIL.startMiniCluster(1);
|
||||||
@Test
|
// Wait for the labels table to become available
|
||||||
public void testScanWithVisibilityLabels() throws Exception {
|
UTIL.waitTableEnabled(VisibilityConstants.LABELS_TABLE_NAME.getName(), 50000);
|
||||||
ThriftHBaseServiceHandler handler = createHandler();
|
createLabels();
|
||||||
ByteBuffer table = wrap(tableAname);
|
Admin admin = UTIL.getAdmin();
|
||||||
|
HTableDescriptor tableDescriptor = new HTableDescriptor(
|
||||||
// insert data
|
TableName.valueOf(tableAname));
|
||||||
TColumnValue columnValue = new TColumnValue(wrap(familyAname),
|
for (HColumnDescriptor family : families) {
|
||||||
wrap(qualifierAname), wrap(valueAname));
|
tableDescriptor.addFamily(family);
|
||||||
List<TColumnValue> columnValues = new ArrayList<>(1);
|
|
||||||
columnValues.add(columnValue);
|
|
||||||
for (int i = 0; i < 10; i++) {
|
|
||||||
TPut put = new TPut(wrap(("testScan" + i).getBytes()), columnValues);
|
|
||||||
if (i == 5) {
|
|
||||||
put.setCellVisibility(new TCellVisibility().setExpression(PUBLIC));
|
|
||||||
} else {
|
|
||||||
put.setCellVisibility(new TCellVisibility().setExpression("(" + SECRET
|
|
||||||
+ "|" + CONFIDENTIAL + ")" + "&" + "!" + TOPSECRET));
|
|
||||||
}
|
}
|
||||||
|
admin.createTable(tableDescriptor);
|
||||||
|
admin.close();
|
||||||
|
setAuths();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void createLabels() throws IOException, InterruptedException {
|
||||||
|
PrivilegedExceptionAction<VisibilityLabelsResponse> action =
|
||||||
|
new PrivilegedExceptionAction<VisibilityLabelsResponse>() {
|
||||||
|
public VisibilityLabelsResponse run() throws Exception {
|
||||||
|
String[] labels = { SECRET, CONFIDENTIAL, PRIVATE, PUBLIC, TOPSECRET };
|
||||||
|
try (Connection conn = ConnectionFactory.createConnection(conf)) {
|
||||||
|
VisibilityClient.addLabels(conn, labels);
|
||||||
|
} catch (Throwable t) {
|
||||||
|
throw new IOException(t);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
SUPERUSER.runAs(action);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void setAuths() throws IOException {
|
||||||
|
String[] labels = { SECRET, CONFIDENTIAL, PRIVATE, PUBLIC, TOPSECRET };
|
||||||
|
try {
|
||||||
|
VisibilityClient.setAuths(UTIL.getConnection(), labels, User.getCurrent().getShortName());
|
||||||
|
} catch (Throwable t) {
|
||||||
|
throw new IOException(t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void afterClass() throws Exception {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private ThriftHBaseServiceHandler createHandler() throws IOException {
|
||||||
|
return new ThriftHBaseServiceHandler(conf, UserProvider.instantiate(conf));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScanWithVisibilityLabels() throws Exception {
|
||||||
|
ThriftHBaseServiceHandler handler = createHandler();
|
||||||
|
ByteBuffer table = wrap(tableAname);
|
||||||
|
|
||||||
|
// insert data
|
||||||
|
TColumnValue columnValue = new TColumnValue(wrap(familyAname),
|
||||||
|
wrap(qualifierAname), wrap(valueAname));
|
||||||
|
List<TColumnValue> columnValues = new ArrayList<>(1);
|
||||||
|
columnValues.add(columnValue);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
TPut put = new TPut(wrap(("testScan" + i).getBytes()), columnValues);
|
||||||
|
if (i == 5) {
|
||||||
|
put.setCellVisibility(new TCellVisibility().setExpression(PUBLIC));
|
||||||
|
} else {
|
||||||
|
put.setCellVisibility(new TCellVisibility().setExpression("(" + SECRET
|
||||||
|
+ "|" + CONFIDENTIAL + ")" + "&" + "!" + TOPSECRET));
|
||||||
|
}
|
||||||
|
handler.put(table, put);
|
||||||
|
}
|
||||||
|
|
||||||
|
// create scan instance
|
||||||
|
TScan scan = new TScan();
|
||||||
|
List<TColumn> columns = new ArrayList<>(1);
|
||||||
|
TColumn column = new TColumn();
|
||||||
|
column.setFamily(familyAname);
|
||||||
|
column.setQualifier(qualifierAname);
|
||||||
|
columns.add(column);
|
||||||
|
scan.setColumns(columns);
|
||||||
|
scan.setStartRow("testScan".getBytes());
|
||||||
|
scan.setStopRow("testScan\uffff".getBytes());
|
||||||
|
|
||||||
|
TAuthorization tauth = new TAuthorization();
|
||||||
|
List<String> labels = new ArrayList<>(2);
|
||||||
|
labels.add(SECRET);
|
||||||
|
labels.add(PRIVATE);
|
||||||
|
tauth.setLabels(labels);
|
||||||
|
scan.setAuthorizations(tauth);
|
||||||
|
// get scanner and rows
|
||||||
|
int scanId = handler.openScanner(table, scan);
|
||||||
|
List<TResult> results = handler.getScannerRows(scanId, 10);
|
||||||
|
assertEquals(9, results.size());
|
||||||
|
Assert.assertFalse(Bytes.equals(results.get(5).getRow(),
|
||||||
|
("testScan" + 5).getBytes()));
|
||||||
|
for (int i = 0; i < 9; i++) {
|
||||||
|
if (i < 5) {
|
||||||
|
assertArrayEquals(("testScan" + i).getBytes(), results.get(i).getRow());
|
||||||
|
} else if (i == 5) {
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
assertArrayEquals(("testScan" + (i + 1)).getBytes(), results.get(i)
|
||||||
|
.getRow());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check that we are at the end of the scan
|
||||||
|
results = handler.getScannerRows(scanId, 9);
|
||||||
|
assertEquals(0, results.size());
|
||||||
|
|
||||||
|
// close scanner and check that it was indeed closed
|
||||||
|
handler.closeScanner(scanId);
|
||||||
|
try {
|
||||||
|
handler.getScannerRows(scanId, 9);
|
||||||
|
fail("Scanner id should be invalid");
|
||||||
|
} catch (TIllegalArgument e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetScannerResultsWithAuthorizations() throws Exception {
|
||||||
|
ThriftHBaseServiceHandler handler = createHandler();
|
||||||
|
ByteBuffer table = wrap(tableAname);
|
||||||
|
|
||||||
|
// insert data
|
||||||
|
TColumnValue columnValue = new TColumnValue(wrap(familyAname),
|
||||||
|
wrap(qualifierAname), wrap(valueAname));
|
||||||
|
List<TColumnValue> columnValues = new ArrayList<>(1);
|
||||||
|
columnValues.add(columnValue);
|
||||||
|
for (int i = 0; i < 20; i++) {
|
||||||
|
TPut put = new TPut(
|
||||||
|
wrap(("testGetScannerResults" + pad(i, (byte) 2)).getBytes()),
|
||||||
|
columnValues);
|
||||||
|
if (i == 3) {
|
||||||
|
put.setCellVisibility(new TCellVisibility().setExpression(PUBLIC));
|
||||||
|
} else {
|
||||||
|
put.setCellVisibility(new TCellVisibility().setExpression("(" + SECRET
|
||||||
|
+ "|" + CONFIDENTIAL + ")" + "&" + "!" + TOPSECRET));
|
||||||
|
}
|
||||||
|
handler.put(table, put);
|
||||||
|
}
|
||||||
|
|
||||||
|
// create scan instance
|
||||||
|
TScan scan = new TScan();
|
||||||
|
List<TColumn> columns = new ArrayList<>(1);
|
||||||
|
TColumn column = new TColumn();
|
||||||
|
column.setFamily(familyAname);
|
||||||
|
column.setQualifier(qualifierAname);
|
||||||
|
columns.add(column);
|
||||||
|
scan.setColumns(columns);
|
||||||
|
scan.setStartRow("testGetScannerResults".getBytes());
|
||||||
|
|
||||||
|
// get 5 rows and check the returned results
|
||||||
|
scan.setStopRow("testGetScannerResults05".getBytes());
|
||||||
|
TAuthorization tauth = new TAuthorization();
|
||||||
|
List<String> labels = new ArrayList<>(2);
|
||||||
|
labels.add(SECRET);
|
||||||
|
labels.add(PRIVATE);
|
||||||
|
tauth.setLabels(labels);
|
||||||
|
scan.setAuthorizations(tauth);
|
||||||
|
List<TResult> results = handler.getScannerResults(table, scan, 5);
|
||||||
|
assertEquals(4, results.size());
|
||||||
|
for (int i = 0; i < 4; i++) {
|
||||||
|
if (i < 3) {
|
||||||
|
assertArrayEquals(
|
||||||
|
("testGetScannerResults" + pad(i, (byte) 2)).getBytes(),
|
||||||
|
results.get(i).getRow());
|
||||||
|
} else if (i == 3) {
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
assertArrayEquals(
|
||||||
|
("testGetScannerResults" + pad(i + 1, (byte) 2)).getBytes(), results
|
||||||
|
.get(i).getRow());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetsWithLabels() throws Exception {
|
||||||
|
ThriftHBaseServiceHandler handler = createHandler();
|
||||||
|
byte[] rowName = "testPutGet".getBytes();
|
||||||
|
ByteBuffer table = wrap(tableAname);
|
||||||
|
|
||||||
|
List<TColumnValue> columnValues = new ArrayList<>(2);
|
||||||
|
columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname),
|
||||||
|
wrap(valueAname)));
|
||||||
|
columnValues.add(new TColumnValue(wrap(familyBname), wrap(qualifierBname),
|
||||||
|
wrap(valueBname)));
|
||||||
|
TPut put = new TPut(wrap(rowName), columnValues);
|
||||||
|
|
||||||
|
put.setColumnValues(columnValues);
|
||||||
|
put.setCellVisibility(new TCellVisibility().setExpression("(" + SECRET + "|"
|
||||||
|
+ CONFIDENTIAL + ")" + "&" + "!" + TOPSECRET));
|
||||||
handler.put(table, put);
|
handler.put(table, put);
|
||||||
|
TGet get = new TGet(wrap(rowName));
|
||||||
|
TAuthorization tauth = new TAuthorization();
|
||||||
|
List<String> labels = new ArrayList<>(2);
|
||||||
|
labels.add(SECRET);
|
||||||
|
labels.add(PRIVATE);
|
||||||
|
tauth.setLabels(labels);
|
||||||
|
get.setAuthorizations(tauth);
|
||||||
|
TResult result = handler.get(table, get);
|
||||||
|
assertArrayEquals(rowName, result.getRow());
|
||||||
|
List<TColumnValue> returnedColumnValues = result.getColumnValues();
|
||||||
|
assertTColumnValuesEqual(columnValues, returnedColumnValues);
|
||||||
}
|
}
|
||||||
|
|
||||||
// create scan instance
|
@Test
|
||||||
TScan scan = new TScan();
|
public void testIncrementWithTags() throws Exception {
|
||||||
List<TColumn> columns = new ArrayList<>(1);
|
ThriftHBaseServiceHandler handler = createHandler();
|
||||||
TColumn column = new TColumn();
|
byte[] rowName = "testIncrementWithTags".getBytes();
|
||||||
column.setFamily(familyAname);
|
ByteBuffer table = wrap(tableAname);
|
||||||
column.setQualifier(qualifierAname);
|
|
||||||
columns.add(column);
|
|
||||||
scan.setColumns(columns);
|
|
||||||
scan.setStartRow("testScan".getBytes());
|
|
||||||
scan.setStopRow("testScan\uffff".getBytes());
|
|
||||||
|
|
||||||
TAuthorization tauth = new TAuthorization();
|
List<TColumnValue> columnValues = new ArrayList<>(1);
|
||||||
List<String> labels = new ArrayList<>(2);
|
columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname),
|
||||||
labels.add(SECRET);
|
wrap(Bytes.toBytes(1L))));
|
||||||
labels.add(PRIVATE);
|
TPut put = new TPut(wrap(rowName), columnValues);
|
||||||
tauth.setLabels(labels);
|
put.setColumnValues(columnValues);
|
||||||
scan.setAuthorizations(tauth);
|
put.setCellVisibility(new TCellVisibility().setExpression(PRIVATE));
|
||||||
// get scanner and rows
|
|
||||||
int scanId = handler.openScanner(table, scan);
|
|
||||||
List<TResult> results = handler.getScannerRows(scanId, 10);
|
|
||||||
assertEquals(9, results.size());
|
|
||||||
Assert.assertFalse(Bytes.equals(results.get(5).getRow(),
|
|
||||||
("testScan" + 5).getBytes()));
|
|
||||||
for (int i = 0; i < 9; i++) {
|
|
||||||
if (i < 5) {
|
|
||||||
assertArrayEquals(("testScan" + i).getBytes(), results.get(i).getRow());
|
|
||||||
} else if (i == 5) {
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
assertArrayEquals(("testScan" + (i + 1)).getBytes(), results.get(i)
|
|
||||||
.getRow());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// check that we are at the end of the scan
|
|
||||||
results = handler.getScannerRows(scanId, 9);
|
|
||||||
assertEquals(0, results.size());
|
|
||||||
|
|
||||||
// close scanner and check that it was indeed closed
|
|
||||||
handler.closeScanner(scanId);
|
|
||||||
try {
|
|
||||||
handler.getScannerRows(scanId, 9);
|
|
||||||
fail("Scanner id should be invalid");
|
|
||||||
} catch (TIllegalArgument e) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetScannerResultsWithAuthorizations() throws Exception {
|
|
||||||
ThriftHBaseServiceHandler handler = createHandler();
|
|
||||||
ByteBuffer table = wrap(tableAname);
|
|
||||||
|
|
||||||
// insert data
|
|
||||||
TColumnValue columnValue = new TColumnValue(wrap(familyAname),
|
|
||||||
wrap(qualifierAname), wrap(valueAname));
|
|
||||||
List<TColumnValue> columnValues = new ArrayList<>(1);
|
|
||||||
columnValues.add(columnValue);
|
|
||||||
for (int i = 0; i < 20; i++) {
|
|
||||||
TPut put = new TPut(
|
|
||||||
wrap(("testGetScannerResults" + pad(i, (byte) 2)).getBytes()),
|
|
||||||
columnValues);
|
|
||||||
if (i == 3) {
|
|
||||||
put.setCellVisibility(new TCellVisibility().setExpression(PUBLIC));
|
|
||||||
} else {
|
|
||||||
put.setCellVisibility(new TCellVisibility().setExpression("(" + SECRET
|
|
||||||
+ "|" + CONFIDENTIAL + ")" + "&" + "!" + TOPSECRET));
|
|
||||||
}
|
|
||||||
handler.put(table, put);
|
handler.put(table, put);
|
||||||
|
|
||||||
|
List<TColumnIncrement> incrementColumns = new ArrayList<>(1);
|
||||||
|
incrementColumns.add(new TColumnIncrement(wrap(familyAname),
|
||||||
|
wrap(qualifierAname)));
|
||||||
|
TIncrement increment = new TIncrement(wrap(rowName), incrementColumns);
|
||||||
|
increment.setCellVisibility(new TCellVisibility().setExpression(SECRET));
|
||||||
|
handler.increment(table, increment);
|
||||||
|
|
||||||
|
TGet get = new TGet(wrap(rowName));
|
||||||
|
TAuthorization tauth = new TAuthorization();
|
||||||
|
List<String> labels = new ArrayList<>(1);
|
||||||
|
labels.add(SECRET);
|
||||||
|
tauth.setLabels(labels);
|
||||||
|
get.setAuthorizations(tauth);
|
||||||
|
TResult result = handler.get(table, get);
|
||||||
|
|
||||||
|
assertArrayEquals(rowName, result.getRow());
|
||||||
|
assertEquals(1, result.getColumnValuesSize());
|
||||||
|
TColumnValue columnValue = result.getColumnValues().get(0);
|
||||||
|
assertArrayEquals(Bytes.toBytes(2L), columnValue.getValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
// create scan instance
|
@Test
|
||||||
TScan scan = new TScan();
|
public void testIncrementWithTagsWithNotMatchLabels() throws Exception {
|
||||||
List<TColumn> columns = new ArrayList<>(1);
|
ThriftHBaseServiceHandler handler = createHandler();
|
||||||
TColumn column = new TColumn();
|
byte[] rowName = "testIncrementWithTagsWithNotMatchLabels".getBytes();
|
||||||
column.setFamily(familyAname);
|
ByteBuffer table = wrap(tableAname);
|
||||||
column.setQualifier(qualifierAname);
|
|
||||||
columns.add(column);
|
|
||||||
scan.setColumns(columns);
|
|
||||||
scan.setStartRow("testGetScannerResults".getBytes());
|
|
||||||
|
|
||||||
// get 5 rows and check the returned results
|
List<TColumnValue> columnValues = new ArrayList<>(1);
|
||||||
scan.setStopRow("testGetScannerResults05".getBytes());
|
columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname),
|
||||||
TAuthorization tauth = new TAuthorization();
|
wrap(Bytes.toBytes(1L))));
|
||||||
List<String> labels = new ArrayList<>(2);
|
TPut put = new TPut(wrap(rowName), columnValues);
|
||||||
labels.add(SECRET);
|
put.setColumnValues(columnValues);
|
||||||
labels.add(PRIVATE);
|
put.setCellVisibility(new TCellVisibility().setExpression(PRIVATE));
|
||||||
tauth.setLabels(labels);
|
handler.put(table, put);
|
||||||
scan.setAuthorizations(tauth);
|
|
||||||
List<TResult> results = handler.getScannerResults(table, scan, 5);
|
List<TColumnIncrement> incrementColumns = new ArrayList<>(1);
|
||||||
assertEquals(4, results.size());
|
incrementColumns.add(new TColumnIncrement(wrap(familyAname),
|
||||||
for (int i = 0; i < 4; i++) {
|
wrap(qualifierAname)));
|
||||||
if (i < 3) {
|
TIncrement increment = new TIncrement(wrap(rowName), incrementColumns);
|
||||||
assertArrayEquals(
|
increment.setCellVisibility(new TCellVisibility().setExpression(SECRET));
|
||||||
("testGetScannerResults" + pad(i, (byte) 2)).getBytes(),
|
handler.increment(table, increment);
|
||||||
results.get(i).getRow());
|
|
||||||
} else if (i == 3) {
|
TGet get = new TGet(wrap(rowName));
|
||||||
continue;
|
TAuthorization tauth = new TAuthorization();
|
||||||
} else {
|
List<String> labels = new ArrayList<>(1);
|
||||||
assertArrayEquals(
|
labels.add(PUBLIC);
|
||||||
("testGetScannerResults" + pad(i + 1, (byte) 2)).getBytes(), results
|
tauth.setLabels(labels);
|
||||||
.get(i).getRow());
|
get.setAuthorizations(tauth);
|
||||||
|
TResult result = handler.get(table, get);
|
||||||
|
assertNull(result.getRow());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAppend() throws Exception {
|
||||||
|
ThriftHBaseServiceHandler handler = createHandler();
|
||||||
|
byte[] rowName = "testAppend".getBytes();
|
||||||
|
ByteBuffer table = wrap(tableAname);
|
||||||
|
byte[] v1 = Bytes.toBytes(1L);
|
||||||
|
byte[] v2 = Bytes.toBytes(5L);
|
||||||
|
List<TColumnValue> columnValues = new ArrayList<>(1);
|
||||||
|
columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname),
|
||||||
|
wrap(Bytes.toBytes(1L))));
|
||||||
|
TPut put = new TPut(wrap(rowName), columnValues);
|
||||||
|
put.setColumnValues(columnValues);
|
||||||
|
put.setCellVisibility(new TCellVisibility().setExpression(PRIVATE));
|
||||||
|
handler.put(table, put);
|
||||||
|
|
||||||
|
List<TColumnValue> appendColumns = new ArrayList<>(1);
|
||||||
|
appendColumns.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname),
|
||||||
|
wrap(v2)));
|
||||||
|
TAppend append = new TAppend(wrap(rowName), appendColumns);
|
||||||
|
append.setCellVisibility(new TCellVisibility().setExpression(SECRET));
|
||||||
|
handler.append(table, append);
|
||||||
|
|
||||||
|
TGet get = new TGet(wrap(rowName));
|
||||||
|
TAuthorization tauth = new TAuthorization();
|
||||||
|
List<String> labels = new ArrayList<>(1);
|
||||||
|
labels.add(SECRET);
|
||||||
|
tauth.setLabels(labels);
|
||||||
|
get.setAuthorizations(tauth);
|
||||||
|
TResult result = handler.get(table, get);
|
||||||
|
|
||||||
|
assertArrayEquals(rowName, result.getRow());
|
||||||
|
assertEquals(1, result.getColumnValuesSize());
|
||||||
|
TColumnValue columnValue = result.getColumnValues().get(0);
|
||||||
|
assertArrayEquals(Bytes.add(v1, v2), columnValue.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Padding numbers to make comparison of sort order easier in a for loop
|
||||||
|
*
|
||||||
|
* @param n
|
||||||
|
* The number to pad.
|
||||||
|
* @param pad
|
||||||
|
* The length to pad up to.
|
||||||
|
* @return The padded number as a string.
|
||||||
|
*/
|
||||||
|
private String pad(int n, byte pad) {
|
||||||
|
String res = Integer.toString(n);
|
||||||
|
|
||||||
|
while (res.length() < pad) {
|
||||||
|
res = "0" + res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testGetsWithLabels() throws Exception {
|
|
||||||
ThriftHBaseServiceHandler handler = createHandler();
|
|
||||||
byte[] rowName = "testPutGet".getBytes();
|
|
||||||
ByteBuffer table = wrap(tableAname);
|
|
||||||
|
|
||||||
List<TColumnValue> columnValues = new ArrayList<>(2);
|
|
||||||
columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname),
|
|
||||||
wrap(valueAname)));
|
|
||||||
columnValues.add(new TColumnValue(wrap(familyBname), wrap(qualifierBname),
|
|
||||||
wrap(valueBname)));
|
|
||||||
TPut put = new TPut(wrap(rowName), columnValues);
|
|
||||||
|
|
||||||
put.setColumnValues(columnValues);
|
|
||||||
put.setCellVisibility(new TCellVisibility().setExpression("(" + SECRET + "|"
|
|
||||||
+ CONFIDENTIAL + ")" + "&" + "!" + TOPSECRET));
|
|
||||||
handler.put(table, put);
|
|
||||||
TGet get = new TGet(wrap(rowName));
|
|
||||||
TAuthorization tauth = new TAuthorization();
|
|
||||||
List<String> labels = new ArrayList<>(2);
|
|
||||||
labels.add(SECRET);
|
|
||||||
labels.add(PRIVATE);
|
|
||||||
tauth.setLabels(labels);
|
|
||||||
get.setAuthorizations(tauth);
|
|
||||||
TResult result = handler.get(table, get);
|
|
||||||
assertArrayEquals(rowName, result.getRow());
|
|
||||||
List<TColumnValue> returnedColumnValues = result.getColumnValues();
|
|
||||||
assertTColumnValuesEqual(columnValues, returnedColumnValues);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testIncrementWithTags() throws Exception {
|
|
||||||
ThriftHBaseServiceHandler handler = createHandler();
|
|
||||||
byte[] rowName = "testIncrementWithTags".getBytes();
|
|
||||||
ByteBuffer table = wrap(tableAname);
|
|
||||||
|
|
||||||
List<TColumnValue> columnValues = new ArrayList<>(1);
|
|
||||||
columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname),
|
|
||||||
wrap(Bytes.toBytes(1L))));
|
|
||||||
TPut put = new TPut(wrap(rowName), columnValues);
|
|
||||||
put.setColumnValues(columnValues);
|
|
||||||
put.setCellVisibility(new TCellVisibility().setExpression(PRIVATE));
|
|
||||||
handler.put(table, put);
|
|
||||||
|
|
||||||
List<TColumnIncrement> incrementColumns = new ArrayList<>(1);
|
|
||||||
incrementColumns.add(new TColumnIncrement(wrap(familyAname),
|
|
||||||
wrap(qualifierAname)));
|
|
||||||
TIncrement increment = new TIncrement(wrap(rowName), incrementColumns);
|
|
||||||
increment.setCellVisibility(new TCellVisibility().setExpression(SECRET));
|
|
||||||
handler.increment(table, increment);
|
|
||||||
|
|
||||||
TGet get = new TGet(wrap(rowName));
|
|
||||||
TAuthorization tauth = new TAuthorization();
|
|
||||||
List<String> labels = new ArrayList<>(1);
|
|
||||||
labels.add(SECRET);
|
|
||||||
tauth.setLabels(labels);
|
|
||||||
get.setAuthorizations(tauth);
|
|
||||||
TResult result = handler.get(table, get);
|
|
||||||
|
|
||||||
assertArrayEquals(rowName, result.getRow());
|
|
||||||
assertEquals(1, result.getColumnValuesSize());
|
|
||||||
TColumnValue columnValue = result.getColumnValues().get(0);
|
|
||||||
assertArrayEquals(Bytes.toBytes(2L), columnValue.getValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testIncrementWithTagsWithNotMatchLabels() throws Exception {
|
|
||||||
ThriftHBaseServiceHandler handler = createHandler();
|
|
||||||
byte[] rowName = "testIncrementWithTagsWithNotMatchLabels".getBytes();
|
|
||||||
ByteBuffer table = wrap(tableAname);
|
|
||||||
|
|
||||||
List<TColumnValue> columnValues = new ArrayList<>(1);
|
|
||||||
columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname),
|
|
||||||
wrap(Bytes.toBytes(1L))));
|
|
||||||
TPut put = new TPut(wrap(rowName), columnValues);
|
|
||||||
put.setColumnValues(columnValues);
|
|
||||||
put.setCellVisibility(new TCellVisibility().setExpression(PRIVATE));
|
|
||||||
handler.put(table, put);
|
|
||||||
|
|
||||||
List<TColumnIncrement> incrementColumns = new ArrayList<>(1);
|
|
||||||
incrementColumns.add(new TColumnIncrement(wrap(familyAname),
|
|
||||||
wrap(qualifierAname)));
|
|
||||||
TIncrement increment = new TIncrement(wrap(rowName), incrementColumns);
|
|
||||||
increment.setCellVisibility(new TCellVisibility().setExpression(SECRET));
|
|
||||||
handler.increment(table, increment);
|
|
||||||
|
|
||||||
TGet get = new TGet(wrap(rowName));
|
|
||||||
TAuthorization tauth = new TAuthorization();
|
|
||||||
List<String> labels = new ArrayList<>(1);
|
|
||||||
labels.add(PUBLIC);
|
|
||||||
tauth.setLabels(labels);
|
|
||||||
get.setAuthorizations(tauth);
|
|
||||||
TResult result = handler.get(table, get);
|
|
||||||
assertNull(result.getRow());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testAppend() throws Exception {
|
|
||||||
ThriftHBaseServiceHandler handler = createHandler();
|
|
||||||
byte[] rowName = "testAppend".getBytes();
|
|
||||||
ByteBuffer table = wrap(tableAname);
|
|
||||||
byte[] v1 = Bytes.toBytes(1L);
|
|
||||||
byte[] v2 = Bytes.toBytes(5L);
|
|
||||||
List<TColumnValue> columnValues = new ArrayList<>(1);
|
|
||||||
columnValues.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname),
|
|
||||||
wrap(Bytes.toBytes(1L))));
|
|
||||||
TPut put = new TPut(wrap(rowName), columnValues);
|
|
||||||
put.setColumnValues(columnValues);
|
|
||||||
put.setCellVisibility(new TCellVisibility().setExpression(PRIVATE));
|
|
||||||
handler.put(table, put);
|
|
||||||
|
|
||||||
List<TColumnValue> appendColumns = new ArrayList<>(1);
|
|
||||||
appendColumns.add(new TColumnValue(wrap(familyAname), wrap(qualifierAname),
|
|
||||||
wrap(v2)));
|
|
||||||
TAppend append = new TAppend(wrap(rowName), appendColumns);
|
|
||||||
append.setCellVisibility(new TCellVisibility().setExpression(SECRET));
|
|
||||||
handler.append(table, append);
|
|
||||||
|
|
||||||
TGet get = new TGet(wrap(rowName));
|
|
||||||
TAuthorization tauth = new TAuthorization();
|
|
||||||
List<String> labels = new ArrayList<>(1);
|
|
||||||
labels.add(SECRET);
|
|
||||||
tauth.setLabels(labels);
|
|
||||||
get.setAuthorizations(tauth);
|
|
||||||
TResult result = handler.get(table, get);
|
|
||||||
|
|
||||||
assertArrayEquals(rowName, result.getRow());
|
|
||||||
assertEquals(1, result.getColumnValuesSize());
|
|
||||||
TColumnValue columnValue = result.getColumnValues().get(0);
|
|
||||||
assertArrayEquals(Bytes.add(v1, v2), columnValue.getValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Padding numbers to make comparison of sort order easier in a for loop
|
|
||||||
*
|
|
||||||
* @param n
|
|
||||||
* The number to pad.
|
|
||||||
* @param pad
|
|
||||||
* The length to pad up to.
|
|
||||||
* @return The padded number as a string.
|
|
||||||
*/
|
|
||||||
private String pad(int n, byte pad) {
|
|
||||||
String res = Integer.toString(n);
|
|
||||||
while (res.length() < pad)
|
|
||||||
res = "0" + res;
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue