diff --git a/hbase-checkstyle/src/main/resources/hbase/checkstyle-suppressions.xml b/hbase-checkstyle/src/main/resources/hbase/checkstyle-suppressions.xml
index 07261bf35c0..b4173e02536 100644
--- a/hbase-checkstyle/src/main/resources/hbase/checkstyle-suppressions.xml
+++ b/hbase-checkstyle/src/main/resources/hbase/checkstyle-suppressions.xml
@@ -36,4 +36,5 @@
+
diff --git a/hbase-thrift/pom.xml b/hbase-thrift/pom.xml
index 15a7a0cf0cd..a74d6b39fe9 100644
--- a/hbase-thrift/pom.xml
+++ b/hbase-thrift/pom.xml
@@ -139,6 +139,22 @@
org.apache.maven.plugins
maven-source-plugin
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+
+ checkstyle
+ validate
+
+ check
+
+
+ true
+
+
+
+
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java
index 46e394394ae..5a6e436467d 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
* time of each call to ThriftMetrics.
*/
@InterfaceAudience.Private
-public class HbaseHandlerMetricsProxy implements InvocationHandler {
+public final class HbaseHandlerMetricsProxy implements InvocationHandler {
private static final Logger LOG = LoggerFactory.getLogger(
HbaseHandlerMetricsProxy.class);
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
index 60a8b7faac4..0dacf8b2ce4 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
@@ -117,14 +117,30 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
@Override
public boolean equals(Object obj) {
- if (this == obj) return true;
- if (obj == null) return false;
- if (getClass() != obj.getClass()) return false;
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+
FullyQualifiedRow other = (FullyQualifiedRow) obj;
- if (!Arrays.equals(family, other.family)) return false;
- if (!Arrays.equals(qualifier, other.qualifier)) return false;
- if (!Arrays.equals(rowKey, other.rowKey)) return false;
- if (!Arrays.equals(table, other.table)) return false;
+
+ if (!Arrays.equals(family, other.family)) {
+ return false;
+ }
+ if (!Arrays.equals(qualifier, other.qualifier)) {
+ return false;
+ }
+ if (!Arrays.equals(rowKey, other.rowKey)) {
+ return false;
+ }
+ if (!Arrays.equals(table, other.table)) {
+ return false;
+ }
return true;
}
@@ -144,8 +160,14 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
- if (!t.isDaemon()) t.setDaemon(true);
- if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);
+
+ if (!t.isDaemon()) {
+ t.setDaemon(true);
+ }
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
+ t.setPriority(Thread.NORM_PRIORITY);
+ }
+
return t;
}
}
@@ -191,13 +213,16 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
for (TIncrement tinc : incs) {
internalQueueTincrement(tinc);
}
- return true;
+ return true;
}
private boolean internalQueueTincrement(TIncrement inc) throws TException {
byte[][] famAndQf = CellUtil.parseColumn(inc.getColumn());
- if (famAndQf.length != 2) return false;
+
+ if (famAndQf.length != 2) {
+ return false;
+ }
return internalQueueIncrement(inc.getTable(), inc.getRow(), famAndQf[0], famAndQf[1],
inc.getAmmount());
@@ -207,7 +232,6 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
byte[] qual, long ammount) throws TException {
int countersMapSize = countersMap.size();
-
//Make sure that the number of threads is scaled.
dynamicallySetCoreSize(countersMapSize);
@@ -293,7 +317,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
/**
* This method samples the incoming requests and, if selected, will check if
* the corePoolSize should be changed.
- * @param countersMapSize
+ * @param countersMapSize the size of the counters map
*/
private void dynamicallySetCoreSize(int countersMapSize) {
// Here we are using countersMapSize as a random number, meaning this
@@ -302,9 +326,10 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
return;
}
double currentRatio = (double) countersMapSize / (double) maxQueueSize;
- int newValue = 1;
+ int newValue;
+
if (currentRatio < 0.1) {
- // it's 1
+ newValue = 1;
} else if (currentRatio < 0.3) {
newValue = 2;
} else if (currentRatio < 0.5) {
@@ -316,6 +341,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
} else {
newValue = 22;
}
+
if (pool.getCorePoolSize() != newValue) {
pool.setCorePoolSize(newValue);
}
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
index 732e2821db0..4926c8b5ca0 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/TBoundedThreadPoolServer.java
@@ -258,7 +258,7 @@ public class TBoundedThreadPoolServer extends TServer {
serverTransport_.interrupt();
}
- private class ClientConnnection implements Runnable {
+ private final class ClientConnnection implements Runnable {
private TTransport client;
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
index 10092106f5c..f612eebfbad 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftMetrics.java
@@ -64,12 +64,14 @@ public class ThriftMetrics {
public ThriftMetrics(Configuration conf, ThriftServerType t) {
- slowResponseTime = conf.getLong( SLOW_RESPONSE_NANO_SEC, DEFAULT_SLOW_RESPONSE_NANO_SEC);
+ slowResponseTime = conf.getLong(SLOW_RESPONSE_NANO_SEC, DEFAULT_SLOW_RESPONSE_NANO_SEC);
if (t == ThriftServerType.ONE) {
- source = CompatibilitySingletonFactory.getInstance(MetricsThriftServerSourceFactory.class).createThriftOneSource();
+ source = CompatibilitySingletonFactory.getInstance(MetricsThriftServerSourceFactory.class)
+ .createThriftOneSource();
} else if (t == ThriftServerType.TWO) {
- source = CompatibilitySingletonFactory.getInstance(MetricsThriftServerSourceFactory.class).createThriftTwoSource();
+ source = CompatibilitySingletonFactory.getInstance(MetricsThriftServerSourceFactory.class)
+ .createThriftTwoSource();
}
}
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
index cd1993de6dc..b6051d8921c 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
@@ -87,23 +87,24 @@ public class ThriftServer {
/**
* Start up or shuts down the Thrift server, depending on the arguments.
- * @param args
+ * @param args the arguments to pass in when starting the Thrift server
*/
- void doMain(final String[] args) throws Exception {
- processOptions(args);
+ void doMain(final String[] args) throws Exception {
+ processOptions(args);
+ serverRunner = new ThriftServerRunner(conf);
- serverRunner = new ThriftServerRunner(conf);
+ // Put up info server.
+ int port = conf.getInt("hbase.thrift.info.port", 9095);
- // Put up info server.
- int port = conf.getInt("hbase.thrift.info.port", 9095);
- if (port >= 0) {
- conf.setLong("startcode", System.currentTimeMillis());
- String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
- infoServer = new InfoServer("thrift", a, port, false, conf);
- infoServer.setAttribute("hbase.conf", conf);
- infoServer.start();
- }
- serverRunner.run();
+ if (port >= 0) {
+ conf.setLong("startcode", System.currentTimeMillis());
+ String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
+ infoServer = new InfoServer("thrift", a, port, false, conf);
+ infoServer.setAttribute("hbase.conf", conf);
+ infoServer.start();
+ }
+
+ serverRunner.run();
}
/**
@@ -230,10 +231,6 @@ public class ThriftServer {
}
}
- /**
- * @param args
- * @throws Exception
- */
public static void main(String [] args) throws Exception {
LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****");
VersionInfo.logVersion();
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
index 00601812939..583a9e9bb86 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
@@ -161,7 +161,8 @@ public class ThriftServerRunner implements Runnable {
static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
- static final String MAX_FRAME_SIZE_CONF_KEY = "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
+ static final String MAX_FRAME_SIZE_CONF_KEY =
+ "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
@@ -347,7 +348,8 @@ public class ThriftServerRunner implements Runnable {
doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER, false);
if (doAsEnabled) {
if (!conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
- LOG.warn("Fail to enable the doAs feature. hbase.regionserver.thrift.http is not configured ");
+ LOG.warn("Fail to enable the doAs feature. hbase.regionserver.thrift.http is not " +
+ "configured ");
}
}
if (qop != null) {
@@ -433,7 +435,8 @@ public class ThriftServerRunner implements Runnable {
httpServer = new Server(threadPool);
// Context handler
- ServletContextHandler ctxHandler = new ServletContextHandler(httpServer, "/", ServletContextHandler.SESSIONS);
+ ServletContextHandler ctxHandler = new ServletContextHandler(httpServer, "/",
+ ServletContextHandler.SESSIONS);
ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), "/*");
// set up Jetty and run the embedded server
@@ -508,14 +511,7 @@ public class ThriftServerRunner implements Runnable {
*/
private void setupServer() throws Exception {
// Construct correct ProtocolFactory
- TProtocolFactory protocolFactory;
- if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
- LOG.debug("Using compact protocol");
- protocolFactory = new TCompactProtocol.Factory();
- } else {
- LOG.debug("Using binary protocol");
- protocolFactory = new TBinaryProtocol.Factory();
- }
+ TProtocolFactory protocolFactory = getProtocolFactory();
final TProcessor p = new Hbase.Processor<>(handler);
ImplType implType = ImplType.getServerImpl(conf);
@@ -614,10 +610,8 @@ public class ThriftServerRunner implements Runnable {
CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(), metrics);
ExecutorService executorService = createExecutor(
callQueue, serverArgs.getMaxWorkerThreads(), serverArgs.getMaxWorkerThreads());
- serverArgs.executorService(executorService)
- .processor(processor)
- .transportFactory(transportFactory)
- .protocolFactory(protocolFactory);
+ serverArgs.executorService(executorService).processor(processor)
+ .transportFactory(transportFactory).protocolFactory(protocolFactory);
tserver = new THsHaServer(serverArgs);
} else { // THREADED_SELECTOR
TThreadedSelectorServer.Args serverArgs =
@@ -625,10 +619,8 @@ public class ThriftServerRunner implements Runnable {
CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(), metrics);
ExecutorService executorService = createExecutor(
callQueue, serverArgs.getWorkerThreads(), serverArgs.getWorkerThreads());
- serverArgs.executorService(executorService)
- .processor(processor)
- .transportFactory(transportFactory)
- .protocolFactory(protocolFactory);
+ serverArgs.executorService(executorService).processor(processor)
+ .transportFactory(transportFactory).protocolFactory(protocolFactory);
tserver = new TThreadedSelectorServer(serverArgs);
}
LOG.info("starting HBase " + implType.simpleClassName() +
@@ -640,21 +632,17 @@ public class ThriftServerRunner implements Runnable {
THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
TServerTransport serverTransport = new TServerSocket(
new TServerSocket.ServerSocketTransportArgs().
- bindAddr(new InetSocketAddress(listenAddress, listenPort)).
- backlog(backlog).
+ bindAddr(new InetSocketAddress(listenAddress, listenPort)).backlog(backlog).
clientTimeout(readTimeout));
TBoundedThreadPoolServer.Args serverArgs =
new TBoundedThreadPoolServer.Args(serverTransport, conf);
- serverArgs.processor(processor)
- .transportFactory(transportFactory)
- .protocolFactory(protocolFactory);
+ serverArgs.processor(processor).transportFactory(transportFactory)
+ .protocolFactory(protocolFactory);
LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
+ listenAddress + ":" + Integer.toString(listenPort)
+ " with readTimeout " + readTimeout + "ms; " + serverArgs);
- TBoundedThreadPoolServer tserver =
- new TBoundedThreadPoolServer(serverArgs, metrics);
- this.tserver = tserver;
+ this.tserver = new TBoundedThreadPoolServer(serverArgs, metrics);
} else {
throw new AssertionError("Unsupported Thrift server implementation: " +
implType.simpleClassName());
@@ -672,6 +660,20 @@ public class ThriftServerRunner implements Runnable {
registerFilters(conf);
}
+ private TProtocolFactory getProtocolFactory() {
+ TProtocolFactory protocolFactory;
+
+ if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
+ LOG.debug("Using compact protocol");
+ protocolFactory = new TCompactProtocol.Factory();
+ } else {
+ LOG.debug("Using binary protocol");
+ protocolFactory = new TBinaryProtocol.Factory();
+ }
+
+ return protocolFactory;
+ }
+
ExecutorService createExecutor(BlockingQueue callQueue,
int minWorkers, int maxWorkers) {
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
@@ -697,7 +699,7 @@ public class ThriftServerRunner implements Runnable {
boolean sortResultColumns) {
scanner = resultScanner;
sortColumns = sortResultColumns;
- }
+ }
public ResultScanner getScanner() {
return scanner;
@@ -749,10 +751,9 @@ public class ThriftServerRunner implements Runnable {
* @param tableName
* name of table
* @return Table object
- * @throws IOException
+ * @throws IOException if getting the table fails
*/
- public Table getTable(final byte[] tableName) throws
- IOException {
+ public Table getTable(final byte[] tableName) throws IOException {
String table = Bytes.toString(tableName);
return connectionCache.getTable(table);
}
@@ -765,10 +766,10 @@ public class ThriftServerRunner implements Runnable {
* Assigns a unique ID to the scanner and adds the mapping to an internal
* hash-map.
*
- * @param scanner
+ * @param scanner the {@link ResultScanner} to add
* @return integer scanner id
*/
- protected synchronized int addScanner(ResultScanner scanner,boolean sortColumns) {
+ protected synchronized int addScanner(ResultScanner scanner, boolean sortColumns) {
int id = nextScannerId++;
ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
scannerMap.put(id, resultScannerWrapper);
@@ -778,7 +779,7 @@ public class ThriftServerRunner implements Runnable {
/**
* Returns the scanner associated with the specified ID.
*
- * @param id
+ * @param id the ID of the scanner to get
* @return a Scanner, or null if ID was invalid.
*/
protected synchronized ResultScannerWrapper getScanner(int id) {
@@ -789,7 +790,7 @@ public class ThriftServerRunner implements Runnable {
* Removes the scanner associated with the specified ID from the internal
* id->scanner hash-map.
*
- * @param id
+ * @param id the ID of the scanner to remove
* @return a Scanner, or null if ID was invalid.
*/
protected synchronized ResultScannerWrapper removeScanner(int id) {
@@ -1116,9 +1117,9 @@ public class ThriftServerRunner implements Runnable {
for(ByteBuffer column : columns) {
byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
if (famAndQf.length == 1) {
- get.addFamily(famAndQf[0]);
+ get.addFamily(famAndQf[0]);
} else {
- get.addColumn(famAndQf[0], famAndQf[1]);
+ get.addColumn(famAndQf[0], famAndQf[1]);
}
}
get.setTimeRange(0, timestamp);
@@ -1361,10 +1362,12 @@ public class ThriftServerRunner implements Runnable {
put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
}
}
- if (!delete.isEmpty())
+ if (!delete.isEmpty()) {
table.delete(delete);
- if (!put.isEmpty())
+ }
+ if (!put.isEmpty()) {
table.put(put);
+ }
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw getIOError(e);
@@ -1434,19 +1437,23 @@ public class ThriftServerRunner implements Runnable {
put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
}
}
- if (!delete.isEmpty())
+ if (!delete.isEmpty()) {
deletes.add(delete);
- if (!put.isEmpty())
+ }
+ if (!put.isEmpty()) {
puts.add(put);
+ }
}
Table table = null;
try {
table = getTable(tableName);
- if (!puts.isEmpty())
+ if (!puts.isEmpty()) {
table.put(puts);
- if (!deletes.isEmpty())
+ }
+ if (!deletes.isEmpty()) {
table.delete(deletes);
+ }
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
@@ -1762,8 +1769,7 @@ public class ThriftServerRunner implements Runnable {
}
}
- private void closeTable(Table table) throws IOError
- {
+ private void closeTable(Table table) throws IOError {
try{
if(table != null){
table.close();
@@ -1885,7 +1891,7 @@ public class ThriftServerRunner implements Runnable {
LOG.warn(e.getMessage(), e);
throw getIOError(e);
} finally{
- closeTable(table);
+ closeTable(table);
}
}
@@ -1932,7 +1938,7 @@ public class ThriftServerRunner implements Runnable {
LOG.warn(e.getMessage(), e);
throw new IllegalArgument(Throwables.getStackTraceAsString(e));
} finally {
- closeTable(table);
+ closeTable(table);
}
}
}
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java
index 4865ac354e1..90f11adc35e 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftUtilities.java
@@ -46,16 +46,17 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
-public class ThriftUtilities {
+public final class ThriftUtilities {
+ private ThriftUtilities() {
+ }
/**
* This utility method creates a new Hbase HColumnDescriptor object based on a
* Thrift ColumnDescriptor "struct".
*
- * @param in
- * Thrift ColumnDescriptor object
+ * @param in Thrift ColumnDescriptor object
* @return HColumnDescriptor
- * @throws IllegalArgument
+ * @throws IllegalArgument if the column name is empty
*/
static public HColumnDescriptor colDescFromThrift(ColumnDescriptor in)
throws IllegalArgument {
@@ -150,31 +151,35 @@ public class ThriftUtilities {
*/
static public List rowResultFromHBase(Result[] in, boolean sortColumns) {
List results = new ArrayList<>(in.length);
- for ( Result result_ : in) {
- if(result_ == null || result_.isEmpty()) {
- continue;
+ for (Result result_ : in) {
+ if(result_ == null || result_.isEmpty()) {
+ continue;
+ }
+
+ TRowResult result = new TRowResult();
+ result.row = ByteBuffer.wrap(result_.getRow());
+
+ if (sortColumns) {
+ result.sortedColumns = new ArrayList<>();
+ for (Cell kv : result_.rawCells()) {
+ result.sortedColumns.add(new TColumn(
+ ByteBuffer.wrap(CellUtil.makeColumn(CellUtil.cloneFamily(kv),
+ CellUtil.cloneQualifier(kv))),
+ new TCell(ByteBuffer.wrap(CellUtil.cloneValue(kv)), kv.getTimestamp())));
}
- TRowResult result = new TRowResult();
- result.row = ByteBuffer.wrap(result_.getRow());
- if (sortColumns) {
- result.sortedColumns = new ArrayList<>();
- for (Cell kv : result_.rawCells()) {
- result.sortedColumns.add(new TColumn(
- ByteBuffer.wrap(CellUtil.makeColumn(CellUtil.cloneFamily(kv),
- CellUtil.cloneQualifier(kv))),
- new TCell(ByteBuffer.wrap(CellUtil.cloneValue(kv)), kv.getTimestamp())));
- }
- } else {
- result.columns = new TreeMap<>();
- for (Cell kv : result_.rawCells()) {
- result.columns.put(
- ByteBuffer.wrap(CellUtil.makeColumn(CellUtil.cloneFamily(kv),
- CellUtil.cloneQualifier(kv))),
- new TCell(ByteBuffer.wrap(CellUtil.cloneValue(kv)), kv.getTimestamp()));
- }
+ } else {
+ result.columns = new TreeMap<>();
+ for (Cell kv : result_.rawCells()) {
+ result.columns.put(
+ ByteBuffer.wrap(CellUtil.makeColumn(CellUtil.cloneFamily(kv),
+ CellUtil.cloneQualifier(kv))),
+ new TCell(ByteBuffer.wrap(CellUtil.cloneValue(kv)), kv.getTimestamp()));
}
+ }
+
results.add(result);
}
+
return results;
}
@@ -204,7 +209,11 @@ public class ThriftUtilities {
public static Increment incrementFromThrift(TIncrement tincrement) {
Increment inc = new Increment(tincrement.getRow());
byte[][] famAndQf = CellUtil.parseColumn(tincrement.getColumn());
- if (famAndQf.length != 2) return null;
+
+ if (famAndQf.length != 2) {
+ return null;
+ }
+
inc.addColumn(famAndQf[0], famAndQf[1], tincrement.getAmmount());
return inc;
}
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
index b81c6f494b9..d98cc50f922 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
@@ -89,8 +89,8 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
- * ThriftServer - this class starts up a Thrift server which implements the HBase API specified in the
- * HbaseClient.thrift IDL file.
+ * ThriftServer - this class starts up a Thrift server which implements the HBase API specified in
+ * the HbaseClient.thrift IDL file.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -155,11 +155,14 @@ public class ThriftServer extends Configured implements Tool {
options.addOption("ro", "readonly", false,
"Respond only to read method requests [default: false]");
OptionGroup servers = new OptionGroup();
- servers.addOption(
- new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
- servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
- servers.addOption(new Option("selector", false, "Use the TThreadedSelectorServer. This implies the framed transport."));
- servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
+ servers.addOption(new Option("nonblocking", false,
+ "Use the TNonblockingServer. This implies the framed transport."));
+ servers.addOption(new Option("hsha", false,
+ "Use the THsHaServer. This implies the framed transport."));
+ servers.addOption(new Option("selector", false,
+ "Use the TThreadedSelectorServer. This implies the framed transport."));
+ servers.addOption(new Option("threadpool", false,
+ "Use the TThreadPoolServer. This is the default."));
options.addOptionGroup(servers);
return options;
}
@@ -243,8 +246,9 @@ public class ThriftServer extends Configured implements Tool {
}
}
- private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory, TProcessor processor,
- TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException {
+ private static TServer getTNonBlockingServer(TProtocolFactory protocolFactory,
+ TProcessor processor, TTransportFactory transportFactory, InetSocketAddress inetSocketAddress)
+ throws TTransportException {
TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
log.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
@@ -284,10 +288,10 @@ public class ThriftServer extends Configured implements Tool {
log.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport);
if (workerThreads > 0) {
- serverArgs.workerThreads(workerThreads);
+ serverArgs.workerThreads(workerThreads);
}
if (selectorThreads > 0) {
- serverArgs.selectorThreads(selectorThreads);
+ serverArgs.selectorThreads(selectorThreads);
}
ExecutorService executorService = createExecutor(
@@ -378,31 +382,18 @@ public class ThriftServer extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
final Configuration conf = getConf();
- TServer server = null;
Options options = getOptions();
CommandLine cmd = parseArguments(conf, options, args);
int workerThreads = 0;
int selectorThreads = 0;
int maxCallQueueSize = -1; // use unbounded queue by default
- /**
- * This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase
- * should print the help if no argument is provided
- */
- List> argList = cmd.getArgList();
- if (cmd.hasOption("help") || !argList.contains("start") || argList.contains("stop")) {
- printUsage();
+ if (checkArguments(cmd)) {
return 1;
}
// Get address to bind
- String bindAddress;
- if (cmd.hasOption("bind")) {
- bindAddress = cmd.getOptionValue("bind");
- conf.set("hbase.thrift.info.bindAddress", bindAddress);
- } else {
- bindAddress = conf.get("hbase.thrift.info.bindAddress");
- }
+ String bindAddress = getBindAddress(conf, cmd);
// check if server should only process read requests, if so override the conf
if (cmd.hasOption("readonly")) {
@@ -413,35 +404,13 @@ public class ThriftServer extends Configured implements Tool {
}
// Get read timeout
- int readTimeout = THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT;
- if (cmd.hasOption(READ_TIMEOUT_OPTION)) {
- try {
- readTimeout = Integer.parseInt(cmd.getOptionValue(READ_TIMEOUT_OPTION));
- } catch (NumberFormatException e) {
- throw new RuntimeException("Could not parse the value provided for the timeout option", e);
- }
- } else {
- readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
- THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
- }
-
+ int readTimeout = getReadTimeout(conf, cmd);
// Get port to bind to
- int listenPort = 0;
- try {
- if (cmd.hasOption("port")) {
- listenPort = Integer.parseInt(cmd.getOptionValue("port"));
- } else {
- listenPort = conf.getInt("hbase.regionserver.thrift.port", DEFAULT_LISTEN_PORT);
- }
- } catch (NumberFormatException e) {
- throw new RuntimeException("Could not parse the value provided for the port option", e);
- }
-
+ int listenPort = getListenPort(conf, cmd);
// Thrift's implementation uses '0' as a placeholder for 'use the default.'
int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
- // Local hostname and user name,
- // used only if QOP is configured.
+ // Local hostname and user name, used only if QOP is configured.
String host = null;
String name = null;
@@ -453,8 +422,7 @@ public class ThriftServer extends Configured implements Tool {
host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
conf.get("hbase.thrift.dns.interface", "default"),
conf.get("hbase.thrift.dns.nameserver", "default")));
- userProvider.login("hbase.thrift.keytab.file",
- "hbase.thrift.kerberos.principal", host);
+ userProvider.login("hbase.thrift.keytab.file", "hbase.thrift.kerberos.principal", host);
}
UserGroupInformation realUser = userProvider.getCurrent().getUGI();
@@ -463,12 +431,10 @@ public class ThriftServer extends Configured implements Tool {
if (stringQop != null) {
qop = SaslUtil.getQop(stringQop);
if (!securityEnabled) {
- throw new IOException("Thrift server must"
- + " run in secure mode to support authentication");
+ throw new IOException("Thrift server must run in secure mode to support authentication");
}
// Extract the name from the principal
- name = SecurityUtil.getUserFromPrincipal(
- conf.get("hbase.thrift.kerberos.principal"));
+ name = SecurityUtil.getUserFromPrincipal(conf.get("hbase.thrift.kerberos.principal"));
}
boolean nonblocking = cmd.hasOption("nonblocking");
@@ -478,14 +444,7 @@ public class ThriftServer extends Configured implements Tool {
ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
final JvmPauseMonitor pauseMonitor = new JvmPauseMonitor(conf, metrics.getSource());
- String implType = "threadpool";
- if (nonblocking) {
- implType = "nonblocking";
- } else if (hsha) {
- implType = "hsha";
- } else if (selector) {
- implType = "selector";
- }
+ String implType = getImplType(nonblocking, hsha, selector);
conf.set("hbase.regionserver.thrift.server.type", implType);
conf.setInt("hbase.regionserver.thrift.port", listenPort);
@@ -549,49 +508,12 @@ public class ThriftServer extends Configured implements Tool {
}
// Put up info server.
- int port = conf.getInt("hbase.thrift.info.port", 9095);
- if (port >= 0) {
- conf.setLong("startcode", System.currentTimeMillis());
- String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
- InfoServer infoServer = new InfoServer("thrift", a, port, false, conf);
- infoServer.setAttribute("hbase.conf", conf);
- infoServer.start();
- }
+ startInfoServer(conf);
- if (nonblocking) {
- server = getTNonBlockingServer(protocolFactory,
- processor,
- transportFactory,
- inetSocketAddress);
- } else if (hsha) {
- server = getTHsHaServer(protocolFactory,
- processor,
- transportFactory,
- workerThreads,
- maxCallQueueSize,
- inetSocketAddress,
- metrics);
- } else if (selector) {
- server = getTThreadedSelectorServer(protocolFactory,
- processor,
- transportFactory,
- workerThreads,
- selectorThreads,
- maxCallQueueSize,
- inetSocketAddress,
- metrics);
- } else {
- server = getTThreadPoolServer(protocolFactory,
- processor,
- transportFactory,
- workerThreads,
- inetSocketAddress,
- backlog,
- readTimeout,
- metrics);
- }
+ final TServer tserver = getServer(workerThreads, selectorThreads, maxCallQueueSize, readTimeout,
+ backlog, nonblocking, hsha, selector, metrics, protocolFactory, processor,
+ transportFactory, inetSocketAddress);
- final TServer tserver = server;
realUser.doAs(
new PrivilegedAction