HBASE-23627 Resolved remaining Checkstyle violations in hbase-thrift
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
c080923d77
commit
21f2eddd20
|
@ -15,7 +15,6 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.thrift;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -26,10 +25,8 @@ import java.util.concurrent.Callable;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -41,7 +38,6 @@ import org.apache.hadoop.hbase.thrift.generated.TIncrement;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.MBeanUtil;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.thrift.TException;
|
||||
|
||||
/**
|
||||
* This class will coalesce increments from a thift server if
|
||||
|
@ -53,7 +49,6 @@ import org.apache.thrift.TException;
|
|||
*
|
||||
*/
|
||||
public class IncrementCoalescer implements IncrementCoalescerMBean {
|
||||
|
||||
/**
|
||||
* Used to identify a cell that will be incremented.
|
||||
*
|
||||
|
@ -84,10 +79,6 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
|||
return rowKey;
|
||||
}
|
||||
|
||||
public void setRowKey(byte[] rowKey) {
|
||||
this.rowKey = rowKey;
|
||||
}
|
||||
|
||||
public byte[] getFamily() {
|
||||
return family;
|
||||
}
|
||||
|
@ -117,37 +108,29 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
|||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) return true;
|
||||
if (obj == null) return false;
|
||||
if (getClass() != obj.getClass()) return false;
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null) {
|
||||
return false;
|
||||
}
|
||||
if (getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
FullyQualifiedRow other = (FullyQualifiedRow) obj;
|
||||
if (!Arrays.equals(family, other.family)) return false;
|
||||
if (!Arrays.equals(qualifier, other.qualifier)) return false;
|
||||
if (!Arrays.equals(rowKey, other.rowKey)) return false;
|
||||
if (!Arrays.equals(table, other.table)) return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
if (!Arrays.equals(family, other.family)) {
|
||||
return false;
|
||||
}
|
||||
if (!Arrays.equals(qualifier, other.qualifier)) {
|
||||
return false;
|
||||
}
|
||||
if (!Arrays.equals(rowKey, other.rowKey)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
static class DaemonThreadFactory implements ThreadFactory {
|
||||
static final AtomicInteger poolNumber = new AtomicInteger(1);
|
||||
final ThreadGroup group;
|
||||
final AtomicInteger threadNumber = new AtomicInteger(1);
|
||||
final String namePrefix;
|
||||
|
||||
DaemonThreadFactory() {
|
||||
SecurityManager s = System.getSecurityManager();
|
||||
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
|
||||
namePrefix = "ICV-" + poolNumber.getAndIncrement() + "-thread-";
|
||||
}
|
||||
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
|
||||
if (!t.isDaemon()) t.setDaemon(true);
|
||||
if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);
|
||||
return t;
|
||||
return Arrays.equals(table, other.table);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -155,7 +138,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
|||
private final AtomicLong successfulCoalescings = new AtomicLong();
|
||||
private final AtomicLong totalIncrements = new AtomicLong();
|
||||
private final ConcurrentMap<FullyQualifiedRow, Long> countersMap =
|
||||
new ConcurrentHashMap<FullyQualifiedRow, Long>(100000, 0.75f, 1500);
|
||||
new ConcurrentHashMap<>(100000, 0.75f, 1500);
|
||||
private final ThreadPoolExecutor pool;
|
||||
private final HBaseHandler handler;
|
||||
|
||||
|
@ -167,14 +150,13 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
|||
public IncrementCoalescer(HBaseHandler hand) {
|
||||
this.handler = hand;
|
||||
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
|
||||
pool =
|
||||
new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue,
|
||||
pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue,
|
||||
Threads.newDaemonThreadFactory("IncrementCoalescer"));
|
||||
|
||||
MBeanUtil.registerMBean("thrift", "Thrift", this);
|
||||
}
|
||||
|
||||
public boolean queueIncrement(TIncrement inc) throws TException {
|
||||
public boolean queueIncrement(TIncrement inc) {
|
||||
if (!canQueue()) {
|
||||
failedIncrements.incrementAndGet();
|
||||
return false;
|
||||
|
@ -182,7 +164,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
|||
return internalQueueTincrement(inc);
|
||||
}
|
||||
|
||||
public boolean queueIncrements(List<TIncrement> incs) throws TException {
|
||||
public boolean queueIncrements(List<TIncrement> incs) {
|
||||
if (!canQueue()) {
|
||||
failedIncrements.incrementAndGet();
|
||||
return false;
|
||||
|
@ -191,23 +173,24 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
|||
for (TIncrement tinc : incs) {
|
||||
internalQueueTincrement(tinc);
|
||||
}
|
||||
return true;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean internalQueueTincrement(TIncrement inc) throws TException {
|
||||
private boolean internalQueueTincrement(TIncrement inc) {
|
||||
byte[][] famAndQf = KeyValue.parseColumn(inc.getColumn());
|
||||
if (famAndQf.length != 2) return false;
|
||||
if (famAndQf.length != 2) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return internalQueueIncrement(inc.getTable(), inc.getRow(), famAndQf[0], famAndQf[1],
|
||||
inc.getAmmount());
|
||||
}
|
||||
|
||||
private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam,
|
||||
byte[] qual, long ammount) throws TException {
|
||||
byte[] qual, long ammount) {
|
||||
int countersMapSize = countersMap.size();
|
||||
|
||||
|
||||
//Make sure that the number of threads is scaled.
|
||||
dynamicallySetCoreSize(countersMapSize);
|
||||
|
||||
|
@ -221,7 +204,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
|||
Long value = countersMap.remove(key);
|
||||
if (value == null) {
|
||||
// There was nothing there, create a new value
|
||||
value = Long.valueOf(currentAmount);
|
||||
value = currentAmount;
|
||||
} else {
|
||||
value += currentAmount;
|
||||
successfulCoalescings.incrementAndGet();
|
||||
|
@ -293,7 +276,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
|||
/**
|
||||
* This method samples the incoming requests and, if selected, will check if
|
||||
* the corePoolSize should be changed.
|
||||
* @param countersMapSize
|
||||
* @param countersMapSize the size of the counters map
|
||||
*/
|
||||
private void dynamicallySetCoreSize(int countersMapSize) {
|
||||
// Here we are using countersMapSize as a random number, meaning this
|
||||
|
@ -302,9 +285,10 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
|||
return;
|
||||
}
|
||||
double currentRatio = (double) countersMapSize / (double) maxQueueSize;
|
||||
int newValue = 1;
|
||||
int newValue;
|
||||
|
||||
if (currentRatio < 0.1) {
|
||||
// it's 1
|
||||
newValue = 1;
|
||||
} else if (currentRatio < 0.3) {
|
||||
newValue = 2;
|
||||
} else if (currentRatio < 0.5) {
|
||||
|
@ -316,6 +300,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
|||
} else {
|
||||
newValue = 22;
|
||||
}
|
||||
|
||||
if (pool.getCorePoolSize() != newValue) {
|
||||
pool.setCorePoolSize(newValue);
|
||||
}
|
||||
|
@ -391,5 +376,4 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
|
|||
public long getCountersMapSize() {
|
||||
return countersMap.size();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue