From 21f2eddd209a47f970e1006c3ab0b6ed8c19efff Mon Sep 17 00:00:00 2001 From: Jan Hentschel Date: Fri, 24 Jan 2020 13:24:58 +0100 Subject: [PATCH] HBASE-23627 Resolved remaining Checkstyle violations in hbase-thrift Signed-off-by: stack --- .../hbase/thrift/IncrementCoalescer.java | 88 ++++++++----------- 1 file changed, 36 insertions(+), 52 deletions(-) diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java index c8ef2476282..aec7cc91c13 100644 --- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java +++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.thrift; import java.io.IOException; @@ -26,10 +25,8 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -41,7 +38,6 @@ import org.apache.hadoop.hbase.thrift.generated.TIncrement; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.MBeanUtil; import org.apache.hadoop.hbase.util.Threads; -import org.apache.thrift.TException; /** * This class will coalesce increments from a thift server if @@ -53,7 +49,6 @@ import org.apache.thrift.TException; * */ public class IncrementCoalescer implements IncrementCoalescerMBean { - /** * Used to identify a cell that will be incremented. * @@ -84,10 +79,6 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { return rowKey; } - public void setRowKey(byte[] rowKey) { - this.rowKey = rowKey; - } - public byte[] getFamily() { return family; } @@ -117,37 +108,29 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { @Override public boolean equals(Object obj) { - if (this == obj) return true; - if (obj == null) return false; - if (getClass() != obj.getClass()) return false; + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + FullyQualifiedRow other = (FullyQualifiedRow) obj; - if (!Arrays.equals(family, other.family)) return false; - if (!Arrays.equals(qualifier, other.qualifier)) return false; - if (!Arrays.equals(rowKey, other.rowKey)) return false; - if (!Arrays.equals(table, other.table)) return false; - return true; - } - } + if (!Arrays.equals(family, other.family)) { + return false; + } + if (!Arrays.equals(qualifier, other.qualifier)) { + return false; + } + if (!Arrays.equals(rowKey, other.rowKey)) { + return false; + } - static class DaemonThreadFactory implements ThreadFactory { - static final AtomicInteger poolNumber = new AtomicInteger(1); - final ThreadGroup group; - final AtomicInteger threadNumber = new AtomicInteger(1); - final String namePrefix; - - DaemonThreadFactory() { - SecurityManager s = System.getSecurityManager(); - group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); - namePrefix = "ICV-" + poolNumber.getAndIncrement() + "-thread-"; - } - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); - if (!t.isDaemon()) t.setDaemon(true); - if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); - return t; + return Arrays.equals(table, other.table); } } @@ -155,7 +138,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { private final AtomicLong successfulCoalescings = new AtomicLong(); private final AtomicLong totalIncrements = new AtomicLong(); private final ConcurrentMap countersMap = - new ConcurrentHashMap(100000, 0.75f, 1500); + new ConcurrentHashMap<>(100000, 0.75f, 1500); private final ThreadPoolExecutor pool; private final HBaseHandler handler; @@ -167,14 +150,13 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { public IncrementCoalescer(HBaseHandler hand) { this.handler = hand; LinkedBlockingQueue queue = new LinkedBlockingQueue(); - pool = - new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue, + pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue, Threads.newDaemonThreadFactory("IncrementCoalescer")); MBeanUtil.registerMBean("thrift", "Thrift", this); } - public boolean queueIncrement(TIncrement inc) throws TException { + public boolean queueIncrement(TIncrement inc) { if (!canQueue()) { failedIncrements.incrementAndGet(); return false; @@ -182,7 +164,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { return internalQueueTincrement(inc); } - public boolean queueIncrements(List incs) throws TException { + public boolean queueIncrements(List incs) { if (!canQueue()) { failedIncrements.incrementAndGet(); return false; @@ -191,23 +173,24 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { for (TIncrement tinc : incs) { internalQueueTincrement(tinc); } - return true; + return true; } - private boolean internalQueueTincrement(TIncrement inc) throws TException { + private boolean internalQueueTincrement(TIncrement inc) { byte[][] famAndQf = KeyValue.parseColumn(inc.getColumn()); - if (famAndQf.length != 2) return false; + if (famAndQf.length != 2) { + return false; + } return internalQueueIncrement(inc.getTable(), inc.getRow(), famAndQf[0], famAndQf[1], inc.getAmmount()); } private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam, - byte[] qual, long ammount) throws TException { + byte[] qual, long ammount) { int countersMapSize = countersMap.size(); - //Make sure that the number of threads is scaled. dynamicallySetCoreSize(countersMapSize); @@ -221,7 +204,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { Long value = countersMap.remove(key); if (value == null) { // There was nothing there, create a new value - value = Long.valueOf(currentAmount); + value = currentAmount; } else { value += currentAmount; successfulCoalescings.incrementAndGet(); @@ -293,7 +276,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { /** * This method samples the incoming requests and, if selected, will check if * the corePoolSize should be changed. - * @param countersMapSize + * @param countersMapSize the size of the counters map */ private void dynamicallySetCoreSize(int countersMapSize) { // Here we are using countersMapSize as a random number, meaning this @@ -302,9 +285,10 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { return; } double currentRatio = (double) countersMapSize / (double) maxQueueSize; - int newValue = 1; + int newValue; + if (currentRatio < 0.1) { - // it's 1 + newValue = 1; } else if (currentRatio < 0.3) { newValue = 2; } else if (currentRatio < 0.5) { @@ -316,6 +300,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { } else { newValue = 22; } + if (pool.getCorePoolSize() != newValue) { pool.setCorePoolSize(newValue); } @@ -391,5 +376,4 @@ public class IncrementCoalescer implements IncrementCoalescerMBean { public long getCountersMapSize() { return countersMap.size(); } - }