HBASE-18772 [JDK8] Replace AtomicLong with LongAdder

Signed-off-by: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Yechao Chen 2017-09-13 04:46:14 +08:00 committed by Chia-Ping Tsai
parent a66bd04815
commit eb5e43673c
21 changed files with 268 additions and 262 deletions

View File

@ -18,7 +18,7 @@ package org.apache.hadoop.hbase;
* limitations under the License. * limitations under the License.
*/ */
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -28,69 +28,69 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class SplitLogCounters { public class SplitLogCounters {
//SplitLogManager counters //Spnager counters
public final static AtomicLong tot_mgr_log_split_batch_start = new AtomicLong(0); public final static LongAdder tot_mgr_log_split_batch_start = new LongAdder();
public final static AtomicLong tot_mgr_log_split_batch_success = new AtomicLong(0); public final static LongAdder tot_mgr_log_split_batch_success = new LongAdder();
public final static AtomicLong tot_mgr_log_split_batch_err = new AtomicLong(0); public final static LongAdder tot_mgr_log_split_batch_err = new LongAdder();
public final static AtomicLong tot_mgr_new_unexpected_wals = new AtomicLong(0); public final static LongAdder tot_mgr_new_unexpected_wals = new LongAdder();
public final static AtomicLong tot_mgr_log_split_start = new AtomicLong(0); public final static LongAdder tot_mgr_log_split_start = new LongAdder();
public final static AtomicLong tot_mgr_log_split_success = new AtomicLong(0); public final static LongAdder tot_mgr_log_split_success = new LongAdder();
public final static AtomicLong tot_mgr_log_split_err = new AtomicLong(0); public final static LongAdder tot_mgr_log_split_err = new LongAdder();
public final static AtomicLong tot_mgr_node_create_queued = new AtomicLong(0); public final static LongAdder tot_mgr_node_create_queued = new LongAdder();
public final static AtomicLong tot_mgr_node_create_result = new AtomicLong(0); public final static LongAdder tot_mgr_node_create_result = new LongAdder();
public final static AtomicLong tot_mgr_node_already_exists = new AtomicLong(0); public final static LongAdder tot_mgr_node_already_exists = new LongAdder();
public final static AtomicLong tot_mgr_node_create_err = new AtomicLong(0); public final static LongAdder tot_mgr_node_create_err = new LongAdder();
public final static AtomicLong tot_mgr_node_create_retry = new AtomicLong(0); public final static LongAdder tot_mgr_node_create_retry = new LongAdder();
public final static AtomicLong tot_mgr_get_data_queued = new AtomicLong(0); public final static LongAdder tot_mgr_get_data_queued = new LongAdder();
public final static AtomicLong tot_mgr_get_data_result = new AtomicLong(0); public final static LongAdder tot_mgr_get_data_result = new LongAdder();
public final static AtomicLong tot_mgr_get_data_nonode = new AtomicLong(0); public final static LongAdder tot_mgr_get_data_nonode = new LongAdder();
public final static AtomicLong tot_mgr_get_data_err = new AtomicLong(0); public final static LongAdder tot_mgr_get_data_err = new LongAdder();
public final static AtomicLong tot_mgr_get_data_retry = new AtomicLong(0); public final static LongAdder tot_mgr_get_data_retry = new LongAdder();
public final static AtomicLong tot_mgr_node_delete_queued = new AtomicLong(0); public final static LongAdder tot_mgr_node_delete_queued = new LongAdder();
public final static AtomicLong tot_mgr_node_delete_result = new AtomicLong(0); public final static LongAdder tot_mgr_node_delete_result = new LongAdder();
public final static AtomicLong tot_mgr_node_delete_err = new AtomicLong(0); public final static LongAdder tot_mgr_node_delete_err = new LongAdder();
public final static AtomicLong tot_mgr_resubmit = new AtomicLong(0); public final static LongAdder tot_mgr_resubmit = new LongAdder();
public final static AtomicLong tot_mgr_resubmit_failed = new AtomicLong(0); public final static LongAdder tot_mgr_resubmit_failed = new LongAdder();
public final static AtomicLong tot_mgr_null_data = new AtomicLong(0); public final static LongAdder tot_mgr_null_data = new LongAdder();
public final static AtomicLong tot_mgr_orphan_task_acquired = new AtomicLong(0); public final static LongAdder tot_mgr_orphan_task_acquired = new LongAdder();
public final static AtomicLong tot_mgr_wait_for_zk_delete = new AtomicLong(0); public final static LongAdder tot_mgr_wait_for_zk_delete = new LongAdder();
public final static AtomicLong tot_mgr_unacquired_orphan_done = new AtomicLong(0); public final static LongAdder tot_mgr_unacquired_orphan_done = new LongAdder();
public final static AtomicLong tot_mgr_resubmit_threshold_reached = new AtomicLong(0); public final static LongAdder tot_mgr_resubmit_threshold_reached = new LongAdder();
public final static AtomicLong tot_mgr_missing_state_in_delete = new AtomicLong(0); public final static LongAdder tot_mgr_missing_state_in_delete = new LongAdder();
public final static AtomicLong tot_mgr_heartbeat = new AtomicLong(0); public final static LongAdder tot_mgr_heartbeat = new LongAdder();
public final static AtomicLong tot_mgr_rescan = new AtomicLong(0); public final static LongAdder tot_mgr_rescan = new LongAdder();
public final static AtomicLong tot_mgr_rescan_deleted = new AtomicLong(0); public final static LongAdder tot_mgr_rescan_deleted = new LongAdder();
public final static AtomicLong tot_mgr_task_deleted = new AtomicLong(0); public final static LongAdder tot_mgr_task_deleted = new LongAdder();
public final static AtomicLong tot_mgr_resubmit_unassigned = new AtomicLong(0); public final static LongAdder tot_mgr_resubmit_unassigned = new LongAdder();
public final static AtomicLong tot_mgr_relist_logdir = new AtomicLong(0); public final static LongAdder tot_mgr_relist_logdir = new LongAdder();
public final static AtomicLong tot_mgr_resubmit_dead_server_task = new AtomicLong(0); public final static LongAdder tot_mgr_resubmit_dead_server_task = new LongAdder();
public final static AtomicLong tot_mgr_resubmit_force = new AtomicLong(0); public final static LongAdder tot_mgr_resubmit_force = new LongAdder();
// SplitLogWorker counters // SplitLogWorker counters
public final static AtomicLong tot_wkr_failed_to_grab_task_no_data = new AtomicLong(0); public final static LongAdder tot_wkr_failed_to_grab_task_no_data = new LongAdder();
public final static AtomicLong tot_wkr_failed_to_grab_task_exception = new AtomicLong(0); public final static LongAdder tot_wkr_failed_to_grab_task_exception = new LongAdder();
public final static AtomicLong tot_wkr_failed_to_grab_task_owned = new AtomicLong(0); public final static LongAdder tot_wkr_failed_to_grab_task_owned = new LongAdder();
public final static AtomicLong tot_wkr_failed_to_grab_task_lost_race = new AtomicLong(0); public final static LongAdder tot_wkr_failed_to_grab_task_lost_race = new LongAdder();
public final static AtomicLong tot_wkr_task_acquired = new AtomicLong(0); public final static LongAdder tot_wkr_task_acquired = new LongAdder();
public final static AtomicLong tot_wkr_task_resigned = new AtomicLong(0); public final static LongAdder tot_wkr_task_resigned = new LongAdder();
public final static AtomicLong tot_wkr_task_done = new AtomicLong(0); public final static LongAdder tot_wkr_task_done = new LongAdder();
public final static AtomicLong tot_wkr_task_err = new AtomicLong(0); public final static LongAdder tot_wkr_task_err = new LongAdder();
public final static AtomicLong tot_wkr_task_heartbeat = new AtomicLong(0); public final static LongAdder tot_wkr_task_heartbeat = new LongAdder();
public final static AtomicLong tot_wkr_task_acquired_rescan = new AtomicLong(0); public final static LongAdder tot_wkr_task_acquired_rescan = new LongAdder();
public final static AtomicLong tot_wkr_get_data_queued = new AtomicLong(0); public final static LongAdder tot_wkr_get_data_queued = new LongAdder();
public final static AtomicLong tot_wkr_get_data_result = new AtomicLong(0); public final static LongAdder tot_wkr_get_data_result = new LongAdder();
public final static AtomicLong tot_wkr_get_data_retry = new AtomicLong(0); public final static LongAdder tot_wkr_get_data_retry = new LongAdder();
public final static AtomicLong tot_wkr_preempt_task = new AtomicLong(0); public final static LongAdder tot_wkr_preempt_task = new LongAdder();
public final static AtomicLong tot_wkr_task_heartbeat_failed = new AtomicLong(0); public final static LongAdder tot_wkr_task_heartbeat_failed = new LongAdder();
public final static AtomicLong tot_wkr_final_transition_failed = new AtomicLong(0); public final static LongAdder tot_wkr_final_transition_failed = new LongAdder();
public final static AtomicLong tot_wkr_task_grabing = new AtomicLong(0); public final static LongAdder tot_wkr_task_grabing = new LongAdder();
public static void resetCounters() throws Exception { public static void resetCounters() throws Exception {
Class<?> cl = SplitLogCounters.class; Class<?> cl = SplitLogCounters.class;
for (Field fld : cl.getDeclaredFields()) { for (Field fld : cl.getDeclaredFields()) {
/* Guard against source instrumentation. */ /* Guard against source instrumentation. */
if ((!fld.isSynthetic()) && (AtomicLong.class.isAssignableFrom(fld.getType()))) { if ((!fld.isSynthetic()) && (LongAdder.class.isAssignableFrom(fld.getType()))) {
((AtomicLong)fld.get(null)).set(0); ((LongAdder)fld.get(null)).reset();
} }
} }
} }

View File

@ -18,7 +18,7 @@
*/ */
package org.apache.hadoop.hbase.coordination; package org.apache.hadoop.hbase.coordination;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
* {@link #isStop()} a flag indicates whether worker should finish <BR> * {@link #isStop()} a flag indicates whether worker should finish <BR>
* {@link #registerListener()} called from {@link SplitLogWorker#run()} and could register listener * {@link #registerListener()} called from {@link SplitLogWorker#run()} and could register listener
* for external changes in coordination (if required) <BR> * for external changes in coordination (if required) <BR>
* {@link #endTask(SplitLogTask, AtomicLong, SplitTaskDetails)} notify coordination engine that * {@link #endTask(SplitLogTask, LongAdder, SplitTaskDetails)} notify coordination engine that
* <p> * <p>
* Important methods for WALSplitterHandler: <BR> * Important methods for WALSplitterHandler: <BR>
* splitting task has completed. * splitting task has completed.
@ -121,7 +121,7 @@ public interface SplitLogWorkerCoordination {
* @param splitTaskDetails details about log split task (specific to coordination engine being * @param splitTaskDetails details about log split task (specific to coordination engine being
* used). * used).
*/ */
void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails splitTaskDetails); void endTask(SplitLogTask slt, LongAdder ctr, SplitTaskDetails splitTaskDetails);
/** /**
* Interface for log-split tasks Used to carry implementation details in encapsulated way through * Interface for log-split tasks Used to carry implementation details in encapsulated way through

View File

@ -206,7 +206,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
if (task.unforcedResubmits.get() >= resubmitThreshold) { if (task.unforcedResubmits.get() >= resubmitThreshold) {
if (!task.resubmitThresholdReached) { if (!task.resubmitThresholdReached) {
task.resubmitThresholdReached = true; task.resubmitThresholdReached = true;
SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet(); SplitLogCounters.tot_mgr_resubmit_threshold_reached.increment();
LOG.info("Skipping resubmissions of task " + path + " because threshold " LOG.info("Skipping resubmissions of task " + path + " because threshold "
+ resubmitThreshold + " reached"); + resubmitThreshold + " reached");
} }
@ -215,7 +215,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
// race with heartbeat() that might be changing last_version // race with heartbeat() that might be changing last_version
version = task.last_version; version = task.last_version;
} else { } else {
SplitLogCounters.tot_mgr_resubmit_force.incrementAndGet(); SplitLogCounters.tot_mgr_resubmit_force.increment();
version = -1; version = -1;
} }
LOG.info("resubmitting task " + path); LOG.info("resubmitting task " + path);
@ -231,7 +231,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
} }
task.setUnassigned(); task.setUnassigned();
rescan(Long.MAX_VALUE); rescan(Long.MAX_VALUE);
SplitLogCounters.tot_mgr_resubmit.incrementAndGet(); SplitLogCounters.tot_mgr_resubmit.increment();
return true; return true;
} }
@ -273,7 +273,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
.getZooKeeper() .getZooKeeper()
.getData(path, this.watcher, new GetDataAsyncCallback(), .getData(path, this.watcher, new GetDataAsyncCallback(),
Long.valueOf(-1) /* retry count */); Long.valueOf(-1) /* retry count */);
SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet(); SplitLogCounters.tot_mgr_get_data_queued.increment();
} }
/** /**
@ -354,7 +354,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
} }
private void deleteNode(String path, Long retries) { private void deleteNode(String path, Long retries) {
SplitLogCounters.tot_mgr_node_delete_queued.incrementAndGet(); SplitLogCounters.tot_mgr_node_delete_queued.increment();
// Once a task znode is ready for delete, that is it is in the TASK_DONE // Once a task znode is ready for delete, that is it is in the TASK_DONE
// state, then no one should be writing to it anymore. That is no one // state, then no one should be writing to it anymore. That is no one
// will be updating the znode version any more. // will be updating the znode version any more.
@ -370,9 +370,9 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
task = details.getTasks().remove(path); task = details.getTasks().remove(path);
if (task == null) { if (task == null) {
if (ZKSplitLog.isRescanNode(watcher, path)) { if (ZKSplitLog.isRescanNode(watcher, path)) {
SplitLogCounters.tot_mgr_rescan_deleted.incrementAndGet(); SplitLogCounters.tot_mgr_rescan_deleted.increment();
} }
SplitLogCounters.tot_mgr_missing_state_in_delete.incrementAndGet(); SplitLogCounters.tot_mgr_missing_state_in_delete.increment();
LOG.debug("deleted task without in memory state " + path); LOG.debug("deleted task without in memory state " + path);
return; return;
} }
@ -380,7 +380,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
task.status = DELETED; task.status = DELETED;
task.notify(); task.notify();
} }
SplitLogCounters.tot_mgr_task_deleted.incrementAndGet(); SplitLogCounters.tot_mgr_task_deleted.increment();
} }
private void deleteNodeFailure(String path) { private void deleteNodeFailure(String path) {
@ -389,7 +389,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
} }
private void createRescanSuccess(String path) { private void createRescanSuccess(String path) {
SplitLogCounters.tot_mgr_rescan.incrementAndGet(); SplitLogCounters.tot_mgr_rescan.increment();
getDataSetWatch(path, zkretries); getDataSetWatch(path, zkretries);
} }
@ -416,7 +416,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName(), getRecoveryMode()); SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName(), getRecoveryMode());
ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(), ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(),
retry_count); retry_count);
SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet(); SplitLogCounters.tot_mgr_node_create_queued.increment();
return; return;
} }
@ -434,7 +434,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
private void getDataSetWatch(String path, Long retry_count) { private void getDataSetWatch(String path, Long retry_count) {
this.watcher.getRecoverableZooKeeper().getZooKeeper() this.watcher.getRecoverableZooKeeper().getZooKeeper()
.getData(path, this.watcher, new GetDataAsyncCallback(), retry_count); .getData(path, this.watcher, new GetDataAsyncCallback(), retry_count);
SplitLogCounters.tot_mgr_get_data_queued.incrementAndGet(); SplitLogCounters.tot_mgr_get_data_queued.increment();
} }
@ -446,7 +446,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
setDone(path, SUCCESS); setDone(path, SUCCESS);
return; return;
} }
SplitLogCounters.tot_mgr_null_data.incrementAndGet(); SplitLogCounters.tot_mgr_null_data.increment();
LOG.fatal("logic error - got null data " + path); LOG.fatal("logic error - got null data " + path);
setDone(path, FAILURE); setDone(path, FAILURE);
return; return;
@ -497,17 +497,17 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
Task task = details.getTasks().get(path); Task task = details.getTasks().get(path);
if (task == null) { if (task == null) {
if (!ZKSplitLog.isRescanNode(watcher, path)) { if (!ZKSplitLog.isRescanNode(watcher, path)) {
SplitLogCounters.tot_mgr_unacquired_orphan_done.incrementAndGet(); SplitLogCounters.tot_mgr_unacquired_orphan_done.increment();
LOG.debug("unacquired orphan task is done " + path); LOG.debug("unacquired orphan task is done " + path);
} }
} else { } else {
synchronized (task) { synchronized (task) {
if (task.status == IN_PROGRESS) { if (task.status == IN_PROGRESS) {
if (status == SUCCESS) { if (status == SUCCESS) {
SplitLogCounters.tot_mgr_log_split_success.incrementAndGet(); SplitLogCounters.tot_mgr_log_split_success.increment();
LOG.info("Done splitting " + path); LOG.info("Done splitting " + path);
} else { } else {
SplitLogCounters.tot_mgr_log_split_err.incrementAndGet(); SplitLogCounters.tot_mgr_log_split_err.increment();
LOG.warn("Error splitting " + path); LOG.warn("Error splitting " + path);
} }
task.status = status; task.status = status;
@ -536,7 +536,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
private Task findOrCreateOrphanTask(String path) { private Task findOrCreateOrphanTask(String path) {
return computeIfAbsent(details.getTasks(), path, Task::new, () -> { return computeIfAbsent(details.getTasks(), path, Task::new, () -> {
LOG.info("creating orphan task " + path); LOG.info("creating orphan task " + path);
SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet(); SplitLogCounters.tot_mgr_orphan_task_acquired.increment();
}); });
} }
@ -547,7 +547,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
LOG.info("task " + path + " acquired by " + workerName); LOG.info("task " + path + " acquired by " + workerName);
} }
task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName); task.heartbeat(EnvironmentEdgeManager.currentTime(), new_version, workerName);
SplitLogCounters.tot_mgr_heartbeat.incrementAndGet(); SplitLogCounters.tot_mgr_heartbeat.increment();
} else { } else {
// duplicate heartbeats - heartbeats w/o zk node version // duplicate heartbeats - heartbeats w/o zk node version
// changing - are possible. The timeout thread does // changing - are possible. The timeout thread does
@ -898,7 +898,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
LOG.debug("failed to resubmit task " + path + " version changed"); LOG.debug("failed to resubmit task " + path + " version changed");
return false; return false;
} catch (KeeperException e) { } catch (KeeperException e) {
SplitLogCounters.tot_mgr_resubmit_failed.incrementAndGet(); SplitLogCounters.tot_mgr_resubmit_failed.increment();
LOG.warn("failed to resubmit " + path, e); LOG.warn("failed to resubmit " + path, e);
return false; return false;
} }
@ -947,7 +947,7 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
@Override @Override
public void processResult(int rc, String path, Object ctx, String name) { public void processResult(int rc, String path, Object ctx, String name) {
SplitLogCounters.tot_mgr_node_create_result.incrementAndGet(); SplitLogCounters.tot_mgr_node_create_result.increment();
if (rc != 0) { if (rc != 0) {
if (needAbandonRetries(rc, "Create znode " + path)) { if (needAbandonRetries(rc, "Create znode " + path)) {
createNodeFailure(path); createNodeFailure(path);
@ -961,16 +961,16 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
// And all code pieces correctly handle the case of suddenly // And all code pieces correctly handle the case of suddenly
// disappearing task-znode. // disappearing task-znode.
LOG.debug("found pre-existing znode " + path); LOG.debug("found pre-existing znode " + path);
SplitLogCounters.tot_mgr_node_already_exists.incrementAndGet(); SplitLogCounters.tot_mgr_node_already_exists.increment();
} else { } else {
Long retry_count = (Long) ctx; Long retry_count = (Long) ctx;
LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " + path LOG.warn("create rc =" + KeeperException.Code.get(rc) + " for " + path
+ " remaining retries=" + retry_count); + " remaining retries=" + retry_count);
if (retry_count == 0) { if (retry_count == 0) {
SplitLogCounters.tot_mgr_node_create_err.incrementAndGet(); SplitLogCounters.tot_mgr_node_create_err.increment();
createNodeFailure(path); createNodeFailure(path);
} else { } else {
SplitLogCounters.tot_mgr_node_create_retry.incrementAndGet(); SplitLogCounters.tot_mgr_node_create_retry.increment();
createNode(path, retry_count - 1); createNode(path, retry_count - 1);
} }
return; return;
@ -988,13 +988,13 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
@Override @Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
SplitLogCounters.tot_mgr_get_data_result.incrementAndGet(); SplitLogCounters.tot_mgr_get_data_result.increment();
if (rc != 0) { if (rc != 0) {
if (needAbandonRetries(rc, "GetData from znode " + path)) { if (needAbandonRetries(rc, "GetData from znode " + path)) {
return; return;
} }
if (rc == KeeperException.Code.NONODE.intValue()) { if (rc == KeeperException.Code.NONODE.intValue()) {
SplitLogCounters.tot_mgr_get_data_nonode.incrementAndGet(); SplitLogCounters.tot_mgr_get_data_nonode.increment();
LOG.warn("task znode " + path + " vanished or not created yet."); LOG.warn("task znode " + path + " vanished or not created yet.");
// ignore since we should not end up in a case where there is in-memory task, // ignore since we should not end up in a case where there is in-memory task,
// but no znode. The only case is between the time task is created in-memory // but no znode. The only case is between the time task is created in-memory
@ -1011,10 +1011,10 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path
+ " remaining retries=" + retry_count); + " remaining retries=" + retry_count);
if (retry_count == 0) { if (retry_count == 0) {
SplitLogCounters.tot_mgr_get_data_err.incrementAndGet(); SplitLogCounters.tot_mgr_get_data_err.increment();
getDataSetWatchFailure(path); getDataSetWatchFailure(path);
} else { } else {
SplitLogCounters.tot_mgr_get_data_retry.incrementAndGet(); SplitLogCounters.tot_mgr_get_data_retry.increment();
getDataSetWatch(path, retry_count - 1); getDataSetWatch(path, retry_count - 1);
} }
return; return;
@ -1036,14 +1036,14 @@ public class ZKSplitLogManagerCoordination extends ZooKeeperListener implements
@Override @Override
public void processResult(int rc, String path, Object ctx) { public void processResult(int rc, String path, Object ctx) {
SplitLogCounters.tot_mgr_node_delete_result.incrementAndGet(); SplitLogCounters.tot_mgr_node_delete_result.increment();
if (rc != 0) { if (rc != 0) {
if (needAbandonRetries(rc, "Delete znode " + path)) { if (needAbandonRetries(rc, "Delete znode " + path)) {
details.getFailedDeletions().add(path); details.getFailedDeletions().add(path);
return; return;
} }
if (rc != KeeperException.Code.NONODE.intValue()) { if (rc != KeeperException.Code.NONODE.intValue()) {
SplitLogCounters.tot_mgr_node_delete_err.incrementAndGet(); SplitLogCounters.tot_mgr_node_delete_err.increment();
Long retry_count = (Long) ctx; Long retry_count = (Long) ctx;
LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " + path LOG.warn("delete rc=" + KeeperException.Code.get(rc) + " for " + path
+ " remaining retries=" + retry_count); + " remaining retries=" + retry_count);

View File

@ -24,7 +24,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.mutable.MutableInt;
@ -156,7 +156,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
String taskpath = currentTask; String taskpath = currentTask;
if (taskpath != null && taskpath.equals(path)) { if (taskpath != null && taskpath.equals(path)) {
LOG.info("retrying data watch on " + path); LOG.info("retrying data watch on " + path);
SplitLogCounters.tot_wkr_get_data_retry.incrementAndGet(); SplitLogCounters.tot_wkr_get_data_retry.increment();
getDataSetWatchAsync(); getDataSetWatchAsync();
} else { } else {
// no point setting a watch on the task which this worker is not // no point setting a watch on the task which this worker is not
@ -169,7 +169,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
public void getDataSetWatchAsync() { public void getDataSetWatchAsync() {
watcher.getRecoverableZooKeeper().getZooKeeper() watcher.getRecoverableZooKeeper().getZooKeeper()
.getData(currentTask, watcher, new GetDataAsyncCallback(), null); .getData(currentTask, watcher, new GetDataAsyncCallback(), null);
SplitLogCounters.tot_wkr_get_data_queued.incrementAndGet(); SplitLogCounters.tot_wkr_get_data_queued.increment();
} }
void getDataSetWatchSuccess(String path, byte[] data) { void getDataSetWatchSuccess(String path, byte[] data) {
@ -221,12 +221,12 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
try { try {
try { try {
if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) { if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) {
SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet(); SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.increment();
return; return;
} }
} catch (KeeperException e) { } catch (KeeperException e) {
LOG.warn("Failed to get data for znode " + path, e); LOG.warn("Failed to get data for znode " + path, e);
SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment();
return; return;
} }
SplitLogTask slt; SplitLogTask slt;
@ -234,11 +234,11 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
slt = SplitLogTask.parseFrom(data); slt = SplitLogTask.parseFrom(data);
} catch (DeserializationException e) { } catch (DeserializationException e) {
LOG.warn("Failed parse data for znode " + path, e); LOG.warn("Failed parse data for znode " + path, e);
SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment();
return; return;
} }
if (!slt.isUnassigned()) { if (!slt.isUnassigned()) {
SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet(); SplitLogCounters.tot_wkr_failed_to_grab_task_owned.increment();
return; return;
} }
@ -246,7 +246,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
attemptToOwnTask(true, watcher, server.getServerName(), path, attemptToOwnTask(true, watcher, server.getServerName(), path,
slt.getMode(), stat.getVersion()); slt.getMode(), stat.getVersion());
if (currentVersion < 0) { if (currentVersion < 0) {
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet(); SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.increment();
return; return;
} }
@ -262,7 +262,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
} }
LOG.info("worker " + server.getServerName() + " acquired task " + path); LOG.info("worker " + server.getServerName() + " acquired task " + path);
SplitLogCounters.tot_wkr_task_acquired.incrementAndGet(); SplitLogCounters.tot_wkr_task_acquired.increment();
getDataSetWatchAsync(); getDataSetWatchAsync();
submitTask(path, slt.getMode(), currentVersion, reportPeriod); submitTask(path, slt.getMode(), currentVersion, reportPeriod);
@ -371,11 +371,11 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion); Stat stat = zkw.getRecoverableZooKeeper().setData(task, slt.toByteArray(), taskZKVersion);
if (stat == null) { if (stat == null) {
LOG.warn("zk.setData() returned null for path " + task); LOG.warn("zk.setData() returned null for path " + task);
SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet(); SplitLogCounters.tot_wkr_task_heartbeat_failed.increment();
return FAILED_TO_OWN_TASK; return FAILED_TO_OWN_TASK;
} }
latestZKVersion = stat.getVersion(); latestZKVersion = stat.getVersion();
SplitLogCounters.tot_wkr_task_heartbeat.incrementAndGet(); SplitLogCounters.tot_wkr_task_heartbeat.increment();
return latestZKVersion; return latestZKVersion;
} catch (KeeperException e) { } catch (KeeperException e) {
if (!isFirstTime) { if (!isFirstTime) {
@ -392,7 +392,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
+ StringUtils.stringifyException(e1)); + StringUtils.stringifyException(e1));
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
SplitLogCounters.tot_wkr_task_heartbeat_failed.incrementAndGet(); SplitLogCounters.tot_wkr_task_heartbeat_failed.increment();
return FAILED_TO_OWN_TASK; return FAILED_TO_OWN_TASK;
} }
@ -440,7 +440,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
return; return;
} }
} }
SplitLogCounters.tot_wkr_task_grabing.incrementAndGet(); SplitLogCounters.tot_wkr_task_grabing.increment();
synchronized (taskReadyLock) { synchronized (taskReadyLock) {
while (seq_start == taskReadySeq.get()) { while (seq_start == taskReadySeq.get()) {
taskReadyLock.wait(checkInterval); taskReadyLock.wait(checkInterval);
@ -567,7 +567,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
@Override @Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
SplitLogCounters.tot_wkr_get_data_result.incrementAndGet(); SplitLogCounters.tot_wkr_get_data_result.increment();
if (rc != 0) { if (rc != 0) {
LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path); LOG.warn("getdata rc = " + KeeperException.Code.get(rc) + " " + path);
getDataSetWatchFailure(path); getDataSetWatchFailure(path);
@ -588,14 +588,14 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
* @param ctr * @param ctr
*/ */
@Override @Override
public void endTask(SplitLogTask slt, AtomicLong ctr, SplitTaskDetails details) { public void endTask(SplitLogTask slt, LongAdder ctr, SplitTaskDetails details) {
ZkSplitTaskDetails zkDetails = (ZkSplitTaskDetails) details; ZkSplitTaskDetails zkDetails = (ZkSplitTaskDetails) details;
String task = zkDetails.getTaskNode(); String task = zkDetails.getTaskNode();
int taskZKVersion = zkDetails.getCurTaskZKVersion().intValue(); int taskZKVersion = zkDetails.getCurTaskZKVersion().intValue();
try { try {
if (ZKUtil.setData(watcher, task, slt.toByteArray(), taskZKVersion)) { if (ZKUtil.setData(watcher, task, slt.toByteArray(), taskZKVersion)) {
LOG.info("successfully transitioned task " + task + " to final state " + slt); LOG.info("successfully transitioned task " + task + " to final state " + slt);
ctr.incrementAndGet(); ctr.increment();
return; return;
} }
LOG.warn("failed to transistion task " + task + " to end state " + slt LOG.warn("failed to transistion task " + task + " to end state " + slt
@ -609,7 +609,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
} catch (KeeperException e) { } catch (KeeperException e) {
LOG.warn("failed to end task, " + task + " " + slt, e); LOG.warn("failed to end task, " + task + " " + slt, e);
} }
SplitLogCounters.tot_wkr_final_transition_failed.incrementAndGet(); SplitLogCounters.tot_wkr_final_transition_failed.increment();
} }
/** /**

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.io.hfile; package org.apache.hadoop.hbase.io.hfile;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.EnumMap; import java.util.EnumMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -32,6 +31,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -178,13 +178,13 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
private final AtomicLong size; private final AtomicLong size;
/** Current size of data blocks */ /** Current size of data blocks */
private final AtomicLong dataBlockSize; private final LongAdder dataBlockSize;
/** Current number of cached elements */ /** Current number of cached elements */
private final AtomicLong elements; private final AtomicLong elements;
/** Current number of cached data block elements */ /** Current number of cached data block elements */
private final AtomicLong dataBlockElements; private final LongAdder dataBlockElements;
/** Cache access count (sequential ID) */ /** Cache access count (sequential ID) */
private final AtomicLong count; private final AtomicLong count;
@ -321,8 +321,8 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
this.stats = new CacheStats(this.getClass().getSimpleName()); this.stats = new CacheStats(this.getClass().getSimpleName());
this.count = new AtomicLong(0); this.count = new AtomicLong(0);
this.elements = new AtomicLong(0); this.elements = new AtomicLong(0);
this.dataBlockElements = new AtomicLong(0); this.dataBlockElements = new LongAdder();
this.dataBlockSize = new AtomicLong(0); this.dataBlockSize = new LongAdder();
this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel); this.overhead = calculateOverhead(maxSize, blockSize, mapConcurrencyLevel);
this.size = new AtomicLong(this.overhead); this.size = new AtomicLong(this.overhead);
this.hardCapacityLimitFactor = hardLimitFactor; this.hardCapacityLimitFactor = hardLimitFactor;
@ -409,7 +409,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
map.put(cacheKey, cb); map.put(cacheKey, cb);
long val = elements.incrementAndGet(); long val = elements.incrementAndGet();
if (buf.getBlockType().isData()) { if (buf.getBlockType().isData()) {
dataBlockElements.incrementAndGet(); dataBlockElements.increment();
} }
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
long size = map.size(); long size = map.size();
@ -462,7 +462,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
heapsize *= -1; heapsize *= -1;
} }
if (bt != null && bt.isData()) { if (bt != null && bt.isData()) {
dataBlockSize.addAndGet(heapsize); dataBlockSize.add(heapsize);
} }
return size.addAndGet(heapsize); return size.addAndGet(heapsize);
} }
@ -569,7 +569,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
assertCounterSanity(size, val); assertCounterSanity(size, val);
} }
if (block.getBuffer().getBlockType().isData()) { if (block.getBuffer().getBlockType().isData()) {
dataBlockElements.decrementAndGet(); dataBlockElements.decrement();
} }
if (evictedByEvictionProcess) { if (evictedByEvictionProcess) {
// When the eviction of the block happened because of invalidation of HFiles, no need to // When the eviction of the block happened because of invalidation of HFiles, no need to
@ -844,7 +844,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
@Override @Override
public long getCurrentDataSize() { public long getCurrentDataSize() {
return this.dataBlockSize.get(); return this.dataBlockSize.sum();
} }
@Override @Override
@ -864,7 +864,7 @@ public class LruBlockCache implements ResizableBlockCache, HeapSize {
@Override @Override
public long getDataBlockCount() { public long getDataBlockCount() {
return this.dataBlockElements.get(); return this.dataBlockElements.sum();
} }
EvictionThread getEvictionThread() { EvictionThread getEvictionThread() {

View File

@ -27,7 +27,7 @@ import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.MinMaxPriorityQueue; import org.apache.hadoop.hbase.shaded.com.google.common.collect.MinMaxPriorityQueue;
import org.apache.commons.collections4.map.LinkedMap; import org.apache.commons.collections4.map.LinkedMap;
@ -347,7 +347,7 @@ public final class BucketAllocator {
* @throws BucketAllocatorException * @throws BucketAllocatorException
*/ */
BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map, BucketAllocator(long availableSpace, int[] bucketSizes, Map<BlockCacheKey, BucketEntry> map,
AtomicLong realCacheSize) throws BucketAllocatorException { LongAdder realCacheSize) throws BucketAllocatorException {
this(availableSpace, bucketSizes); this(availableSpace, bucketSizes);
// each bucket has an offset, sizeindex. probably the buckets are too big // each bucket has an offset, sizeindex. probably the buckets are too big
@ -398,7 +398,7 @@ public final class BucketAllocator {
bsi.instantiateBucket(b); bsi.instantiateBucket(b);
reconfigured[bucketNo] = true; reconfigured[bucketNo] = true;
} }
realCacheSize.addAndGet(foundLen); realCacheSize.add(foundLen);
buckets[bucketNo].addAllocation(foundOffset); buckets[bucketNo].addAllocation(foundOffset);
usedSize += buckets[bucketNo].getItemAllocationSize(); usedSize += buckets[bucketNo].getItemAllocationSize();
bucketSizeInfos[bucketSizeIndex].blockAllocated(b); bucketSizeInfos[bucketSizeIndex].blockAllocated(b);

View File

@ -48,6 +48,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -165,13 +166,13 @@ public class BucketCache implements BlockCache, HeapSize {
private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<>(); private UniqueIndexMap<Integer> deserialiserMap = new UniqueIndexMap<>();
private final AtomicLong realCacheSize = new AtomicLong(0); private final LongAdder realCacheSize = new LongAdder();
private final AtomicLong heapSize = new AtomicLong(0); private final LongAdder heapSize = new LongAdder();
/** Current number of cached elements */ /** Current number of cached elements */
private final AtomicLong blockNumber = new AtomicLong(0); private final LongAdder blockNumber = new LongAdder();
/** Cache access count (sequential ID) */ /** Cache access count (sequential ID) */
private final AtomicLong accessCount = new AtomicLong(0); private final AtomicLong accessCount = new AtomicLong();
private static final int DEFAULT_CACHE_WAIT_TIME = 50; private static final int DEFAULT_CACHE_WAIT_TIME = 50;
// Used in test now. If the flag is false and the cache speed is very fast, // Used in test now. If the flag is false and the cache speed is very fast,
@ -469,8 +470,8 @@ public class BucketCache implements BlockCache, HeapSize {
ramCache.remove(cacheKey); ramCache.remove(cacheKey);
cacheStats.failInsert(); cacheStats.failInsert();
} else { } else {
this.blockNumber.incrementAndGet(); this.blockNumber.increment();
this.heapSize.addAndGet(cachedItem.heapSize()); this.heapSize.add(cachedItem.heapSize());
blocksByHFile.add(cacheKey); blocksByHFile.add(cacheKey);
} }
} }
@ -545,10 +546,10 @@ public class BucketCache implements BlockCache, HeapSize {
@VisibleForTesting @VisibleForTesting
void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) { void blockEvicted(BlockCacheKey cacheKey, BucketEntry bucketEntry, boolean decrementBlockNumber) {
bucketAllocator.freeBlock(bucketEntry.offset()); bucketAllocator.freeBlock(bucketEntry.offset());
realCacheSize.addAndGet(-1 * bucketEntry.getLength()); realCacheSize.add(-1 * bucketEntry.getLength());
blocksByHFile.remove(cacheKey); blocksByHFile.remove(cacheKey);
if (decrementBlockNumber) { if (decrementBlockNumber) {
this.blockNumber.decrementAndGet(); this.blockNumber.decrement();
} }
} }
@ -591,8 +592,8 @@ public class BucketCache implements BlockCache, HeapSize {
private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) { private RAMQueueEntry checkRamCache(BlockCacheKey cacheKey) {
RAMQueueEntry removedBlock = ramCache.remove(cacheKey); RAMQueueEntry removedBlock = ramCache.remove(cacheKey);
if (removedBlock != null) { if (removedBlock != null) {
this.blockNumber.decrementAndGet(); this.blockNumber.decrement();
this.heapSize.addAndGet(-1 * removedBlock.getData().heapSize()); this.heapSize.add(-1 * removedBlock.getData().heapSize());
} }
return removedBlock; return removedBlock;
} }
@ -689,7 +690,7 @@ public class BucketCache implements BlockCache, HeapSize {
} }
public long getRealCacheSize() { public long getRealCacheSize() {
return this.realCacheSize.get(); return this.realCacheSize.sum();
} }
private long acceptableSize() { private long acceptableSize() {
@ -791,7 +792,7 @@ public class BucketCache implements BlockCache, HeapSize {
if (LOG.isDebugEnabled() && msgBuffer != null) { if (LOG.isDebugEnabled() && msgBuffer != null) {
LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() + LOG.debug("Free started because \"" + why + "\"; " + msgBuffer.toString() +
" of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" + " of current used=" + StringUtils.byteDesc(currentSize) + ", actual cacheSize=" +
StringUtils.byteDesc(realCacheSize.get()) + ", total=" + StringUtils.byteDesc(totalSize)); StringUtils.byteDesc(realCacheSize.sum()) + ", total=" + StringUtils.byteDesc(totalSize));
} }
long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra long bytesToFreeWithExtra = (long) Math.floor(bytesToFreeWithoutExtra
@ -1016,7 +1017,7 @@ public class BucketCache implements BlockCache, HeapSize {
// Always remove from ramCache even if we failed adding it to the block cache above. // Always remove from ramCache even if we failed adding it to the block cache above.
RAMQueueEntry ramCacheEntry = ramCache.remove(key); RAMQueueEntry ramCacheEntry = ramCache.remove(key);
if (ramCacheEntry != null) { if (ramCacheEntry != null) {
heapSize.addAndGet(-1 * entries.get(i).getData().heapSize()); heapSize.add(-1 * entries.get(i).getData().heapSize());
} else if (bucketEntries[i] != null){ } else if (bucketEntries[i] != null){
// Block should have already been evicted. Remove it and free space. // Block should have already been evicted. Remove it and free space.
ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset()); ReentrantReadWriteLock lock = offsetLock.getLock(bucketEntries[i].offset());
@ -1195,12 +1196,12 @@ public class BucketCache implements BlockCache, HeapSize {
@Override @Override
public long heapSize() { public long heapSize() {
return this.heapSize.get(); return this.heapSize.sum();
} }
@Override @Override
public long size() { public long size() {
return this.realCacheSize.get(); return this.realCacheSize.sum();
} }
@Override @Override
@ -1215,7 +1216,7 @@ public class BucketCache implements BlockCache, HeapSize {
@Override @Override
public long getBlockCount() { public long getBlockCount() {
return this.blockNumber.get(); return this.blockNumber.sum();
} }
@Override @Override
@ -1438,7 +1439,7 @@ public class BucketCache implements BlockCache, HeapSize {
public BucketEntry writeToCache(final IOEngine ioEngine, public BucketEntry writeToCache(final IOEngine ioEngine,
final BucketAllocator bucketAllocator, final BucketAllocator bucketAllocator,
final UniqueIndexMap<Integer> deserialiserMap, final UniqueIndexMap<Integer> deserialiserMap,
final AtomicLong realCacheSize) throws CacheFullException, IOException, final LongAdder realCacheSize) throws CacheFullException, IOException,
BucketAllocatorException { BucketAllocatorException {
int len = data.getSerializedLength(); int len = data.getSerializedLength();
// This cacheable thing can't be serialized // This cacheable thing can't be serialized
@ -1468,7 +1469,7 @@ public class BucketCache implements BlockCache, HeapSize {
throw ioe; throw ioe;
} }
realCacheSize.addAndGet(len); realCacheSize.add(len);
return bucketEntry; return bucketEntry;
} }
} }

View File

@ -24,7 +24,7 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -52,8 +52,8 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
private int maxCapacity; private int maxCapacity;
// metrics (shared across all queues) // metrics (shared across all queues)
private AtomicLong numGeneralCallsDropped; private LongAdder numGeneralCallsDropped;
private AtomicLong numLifoModeSwitches; private LongAdder numLifoModeSwitches;
// Both are in milliseconds // Both are in milliseconds
private volatile int codelTargetDelay; private volatile int codelTargetDelay;
@ -76,7 +76,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
private AtomicBoolean isOverloaded = new AtomicBoolean(false); private AtomicBoolean isOverloaded = new AtomicBoolean(false);
public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval, public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval,
double lifoThreshold, AtomicLong numGeneralCallsDropped, AtomicLong numLifoModeSwitches) { double lifoThreshold, LongAdder numGeneralCallsDropped, LongAdder numLifoModeSwitches) {
this.maxCapacity = capacity; this.maxCapacity = capacity;
this.queue = new LinkedBlockingDeque<>(capacity); this.queue = new LinkedBlockingDeque<>(capacity);
this.codelTargetDelay = targetDelay; this.codelTargetDelay = targetDelay;
@ -112,13 +112,13 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
CallRunner cr; CallRunner cr;
while(true) { while(true) {
if (((double) queue.size() / this.maxCapacity) > lifoThreshold) { if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
numLifoModeSwitches.incrementAndGet(); numLifoModeSwitches.increment();
cr = queue.takeLast(); cr = queue.takeLast();
} else { } else {
cr = queue.takeFirst(); cr = queue.takeFirst();
} }
if (needToDrop(cr)) { if (needToDrop(cr)) {
numGeneralCallsDropped.incrementAndGet(); numGeneralCallsDropped.increment();
cr.drop(); cr.drop();
} else { } else {
return cr; return cr;
@ -135,7 +135,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
// Only count once per switch. // Only count once per switch.
if (!switched) { if (!switched) {
switched = true; switched = true;
numLifoModeSwitches.incrementAndGet(); numLifoModeSwitches.increment();
} }
cr = queue.pollLast(); cr = queue.pollLast();
} else { } else {
@ -146,7 +146,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
return cr; return cr;
} }
if (needToDrop(cr)) { if (needToDrop(cr)) {
numGeneralCallsDropped.incrementAndGet(); numGeneralCallsDropped.increment();
cr.drop(); cr.drop();
} else { } else {
return cr; return cr;

View File

@ -248,7 +248,7 @@ public class SplitLogManager {
logDirs + " for serverName=" + serverNames); logDirs + " for serverName=" + serverNames);
FileStatus[] logfiles = getFileList(logDirs, filter); FileStatus[] logfiles = getFileList(logDirs, filter);
status.setStatus("Checking directory contents..."); status.setStatus("Checking directory contents...");
SplitLogCounters.tot_mgr_log_split_batch_start.incrementAndGet(); SplitLogCounters.tot_mgr_log_split_batch_start.increment();
LOG.info("Started splitting " + logfiles.length + " logs in " + logDirs + LOG.info("Started splitting " + logfiles.length + " logs in " + logDirs +
" for " + serverNames); " for " + serverNames);
long t = EnvironmentEdgeManager.currentTime(); long t = EnvironmentEdgeManager.currentTime();
@ -278,7 +278,7 @@ public class SplitLogManager {
if (batch.done != batch.installed) { if (batch.done != batch.installed) {
batch.isDead = true; batch.isDead = true;
SplitLogCounters.tot_mgr_log_split_batch_err.incrementAndGet(); SplitLogCounters.tot_mgr_log_split_batch_err.increment();
LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed LOG.warn("error while splitting logs in " + logDirs + " installed = " + batch.installed
+ " but only " + batch.done + " done"); + " but only " + batch.done + " done");
String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch; String msg = "error or interrupted while splitting logs in " + logDirs + " Task = " + batch;
@ -302,7 +302,7 @@ public class SplitLogManager {
LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe); LOG.warn("Unable to delete log src dir. Ignoring. " + logDir, ioe);
} }
} }
SplitLogCounters.tot_mgr_log_split_batch_success.incrementAndGet(); SplitLogCounters.tot_mgr_log_split_batch_success.increment();
} }
String msg = String msg =
"finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed "finished splitting (more than or equal to) " + totalSize + " bytes in " + batch.installed
@ -474,7 +474,7 @@ public class SplitLogManager {
} }
while (oldtask.status == FAILURE) { while (oldtask.status == FAILURE) {
LOG.debug("wait for status of task " + path + " to change to DELETED"); LOG.debug("wait for status of task " + path + " to change to DELETED");
SplitLogCounters.tot_mgr_wait_for_zk_delete.incrementAndGet(); SplitLogCounters.tot_mgr_wait_for_zk_delete.increment();
try { try {
oldtask.wait(); oldtask.wait();
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -694,7 +694,7 @@ public class SplitLogManager {
} }
found_assigned_task = true; found_assigned_task = true;
if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) { if (localDeadWorkers != null && localDeadWorkers.contains(cur_worker)) {
SplitLogCounters.tot_mgr_resubmit_dead_server_task.incrementAndGet(); SplitLogCounters.tot_mgr_resubmit_dead_server_task.increment();
if (getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) { if (getSplitLogManagerCoordination().resubmitTask(path, task, FORCE)) {
resubmitted++; resubmitted++;
} else { } else {
@ -741,7 +741,7 @@ public class SplitLogManager {
} }
} }
getSplitLogManagerCoordination().checkTasks(); getSplitLogManagerCoordination().checkTasks();
SplitLogCounters.tot_mgr_resubmit_unassigned.incrementAndGet(); SplitLogCounters.tot_mgr_resubmit_unassigned.increment();
LOG.debug("resubmitting unassigned task(s) after timeout"); LOG.debug("resubmitting unassigned task(s) after timeout");
} }
Set<String> failedDeletions = Set<String> failedDeletions =

View File

@ -28,6 +28,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -76,9 +77,9 @@ public class MobFileCache {
// caches access count // caches access count
private final AtomicLong count = new AtomicLong(0); private final AtomicLong count = new AtomicLong(0);
private long lastAccess = 0; private long lastAccess = 0;
private final AtomicLong miss = new AtomicLong(0); private final LongAdder miss = new LongAdder();
private long lastMiss = 0; private long lastMiss = 0;
private final AtomicLong evictedFileCount = new AtomicLong(0); private final LongAdder evictedFileCount = new LongAdder();
private long lastEvictedFileCount = 0; private long lastEvictedFileCount = 0;
// a lock to sync the evict to guarantee the eviction occurs in sequence. // a lock to sync the evict to guarantee the eviction occurs in sequence.
@ -163,7 +164,7 @@ public class MobFileCache {
for (CachedMobFile evictedFile : evictedFiles) { for (CachedMobFile evictedFile : evictedFiles) {
closeFile(evictedFile); closeFile(evictedFile);
} }
evictedFileCount.addAndGet(evictedFiles.size()); evictedFileCount.add(evictedFiles.size());
} }
} }
@ -180,7 +181,7 @@ public class MobFileCache {
CachedMobFile evictedFile = map.remove(fileName); CachedMobFile evictedFile = map.remove(fileName);
if (evictedFile != null) { if (evictedFile != null) {
evictedFile.close(); evictedFile.close();
evictedFileCount.incrementAndGet(); evictedFileCount.increment();
} }
} catch (IOException e) { } catch (IOException e) {
LOG.error("Failed to evict the file " + fileName, e); LOG.error("Failed to evict the file " + fileName, e);
@ -219,7 +220,7 @@ public class MobFileCache {
cached = CachedMobFile.create(fs, path, conf, cacheConf); cached = CachedMobFile.create(fs, path, conf, cacheConf);
cached.open(); cached.open();
map.put(fileName, cached); map.put(fileName, cached);
miss.incrementAndGet(); miss.increment();
} }
} }
cached.open(); cached.open();
@ -294,7 +295,7 @@ public class MobFileCache {
* @return The count of misses to the mob file cache. * @return The count of misses to the mob file cache.
*/ */
public long getMissCount() { public long getMissCount() {
return miss.get(); return miss.sum();
} }
/** /**
@ -302,7 +303,7 @@ public class MobFileCache {
* @return The number of items evicted from the mob file cache. * @return The number of items evicted from the mob file cache.
*/ */
public long getEvictedFileCount() { public long getEvictedFileCount() {
return evictedFileCount.get(); return evictedFileCount.sum();
} }
/** /**
@ -310,7 +311,7 @@ public class MobFileCache {
* @return The hit ratio to the mob file cache. * @return The hit ratio to the mob file cache.
*/ */
public double getHitRatio() { public double getHitRatio() {
return count.get() == 0 ? 0 : ((float) (count.get() - miss.get())) / (float) count.get(); return count.get() == 0 ? 0 : ((float) (count.get() - miss.sum())) / (float) count.get();
} }
/** /**
@ -318,8 +319,8 @@ public class MobFileCache {
*/ */
public void printStatistics() { public void printStatistics() {
long access = count.get() - lastAccess; long access = count.get() - lastAccess;
long missed = miss.get() - lastMiss; long missed = miss.sum() - lastMiss;
long evicted = evictedFileCount.get() - lastEvictedFileCount; long evicted = evictedFileCount.sum() - lastEvictedFileCount;
int hitRatio = access == 0 ? 0 : (int) (((float) (access - missed)) / (float) access * 100); int hitRatio = access == 0 ? 0 : (int) (((float) (access - missed)) / (float) access * 100);
LOG.info("MobFileCache Statistics, access: " + access + ", miss: " + missed + ", hit: " LOG.info("MobFileCache Statistics, access: " + access + ", miss: " + missed + ", hit: "
+ (access - missed) + ", hit ratio: " + hitRatio + "%, evicted files: " + evicted); + (access - missed) + ", hit ratio: " + hitRatio + "%, evicted files: " + evicted);

View File

@ -18,7 +18,6 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
@ -29,6 +28,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -220,7 +220,7 @@ public class ChunkCreator {
/** Statistics thread */ /** Statistics thread */
private static final int statThreadPeriod = 60 * 5; private static final int statThreadPeriod = 60 * 5;
private final AtomicLong chunkCount = new AtomicLong(); private final AtomicLong chunkCount = new AtomicLong();
private final AtomicLong reusedChunkCount = new AtomicLong(); private final LongAdder reusedChunkCount = new LongAdder();
MemStoreChunkPool(int maxCount, int initialCount, float poolSizePercentage) { MemStoreChunkPool(int maxCount, int initialCount, float poolSizePercentage) {
this.maxCount = maxCount; this.maxCount = maxCount;
@ -254,7 +254,7 @@ public class ChunkCreator {
Chunk chunk = reclaimedChunks.poll(); Chunk chunk = reclaimedChunks.poll();
if (chunk != null) { if (chunk != null) {
chunk.reset(); chunk.reset();
reusedChunkCount.incrementAndGet(); reusedChunkCount.increment();
} else { } else {
// Make a chunk iff we have not yet created the maxCount chunks // Make a chunk iff we have not yet created the maxCount chunks
while (true) { while (true) {
@ -303,7 +303,7 @@ public class ChunkCreator {
private void logStats() { private void logStats() {
if (!LOG.isDebugEnabled()) return; if (!LOG.isDebugEnabled()) return;
long created = chunkCount.get(); long created = chunkCount.get();
long reused = reusedChunkCount.get(); long reused = reusedChunkCount.sum();
long total = created + reused; long total = created + reused;
LOG.debug("Stats: current pool size=" + reclaimedChunks.size() LOG.debug("Stats: current pool size=" + reclaimedChunks.size()
+ ",created chunk count=" + created + ",created chunk count=" + created

View File

@ -281,12 +281,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final LongAdder blockedRequestsCount = new LongAdder(); private final LongAdder blockedRequestsCount = new LongAdder();
// Compaction LongAdders // Compaction LongAdders
final AtomicLong compactionsFinished = new AtomicLong(0L); final LongAdder compactionsFinished = new LongAdder();
final AtomicLong compactionsFailed = new AtomicLong(0L); final LongAdder compactionsFailed = new LongAdder();
final AtomicLong compactionNumFilesCompacted = new AtomicLong(0L); final LongAdder compactionNumFilesCompacted = new LongAdder();
final AtomicLong compactionNumBytesCompacted = new AtomicLong(0L); final LongAdder compactionNumBytesCompacted = new LongAdder();
final AtomicLong compactionsQueued = new AtomicLong(0L); final LongAdder compactionsQueued = new LongAdder();
final AtomicLong flushesQueued = new AtomicLong(0L); final LongAdder flushesQueued = new LongAdder();
private final WAL wal; private final WAL wal;
private final HRegionFileSystem fs; private final HRegionFileSystem fs;
@ -2272,7 +2272,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
if(fs.isFlushSucceeded()) { if(fs.isFlushSucceeded()) {
flushesQueued.set(0L); flushesQueued.reset();
} }
status.markComplete("Flush successful"); status.markComplete("Flush successful");
@ -8100,27 +8100,27 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet(); int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
// metrics // metrics
compactionsFinished.incrementAndGet(); compactionsFinished.increment();
compactionNumFilesCompacted.addAndGet(numFiles); compactionNumFilesCompacted.add(numFiles);
compactionNumBytesCompacted.addAndGet(filesSizeCompacted); compactionNumBytesCompacted.add(filesSizeCompacted);
assert newValue >= 0; assert newValue >= 0;
} }
public void reportCompactionRequestFailure() { public void reportCompactionRequestFailure() {
compactionsFailed.incrementAndGet(); compactionsFailed.increment();
} }
public void incrementCompactionsQueuedCount() { public void incrementCompactionsQueuedCount() {
compactionsQueued.incrementAndGet(); compactionsQueued.increment();
} }
public void decrementCompactionsQueuedCount() { public void decrementCompactionsQueuedCount() {
compactionsQueued.decrementAndGet(); compactionsQueued.decrement();
} }
public void incrementFlushesQueuedCount() { public void incrementFlushesQueuedCount() {
flushesQueued.incrementAndGet(); flushesQueued.increment();
} }
@VisibleForTesting @VisibleForTesting

View File

@ -134,17 +134,17 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
@Override @Override
public long getNumFilesCompacted() { public long getNumFilesCompacted() {
return this.region.compactionNumFilesCompacted.get(); return this.region.compactionNumFilesCompacted.sum();
} }
@Override @Override
public long getNumBytesCompacted() { public long getNumBytesCompacted() {
return this.region.compactionNumBytesCompacted.get(); return this.region.compactionNumBytesCompacted.sum();
} }
@Override @Override
public long getNumCompactionsCompleted() { public long getNumCompactionsCompleted() {
return this.region.compactionsFinished.get(); return this.region.compactionsFinished.sum();
} }
@Override @Override
@ -161,17 +161,17 @@ public class MetricsRegionWrapperImpl implements MetricsRegionWrapper, Closeable
@Override @Override
public long getNumCompactionsFailed() { public long getNumCompactionsFailed() {
return this.region.compactionsFailed.get(); return this.region.compactionsFailed.sum();
} }
@Override @Override
public long getNumCompactionsQueued() { public long getNumCompactionsQueued() {
return this.region.compactionsQueued.get(); return this.region.compactionsQueued.sum();
} }
@Override @Override
public long getNumFlushesQueued() { public long getNumFlushesQueued() {
return this.region.flushesQueued.get(); return this.region.flushesQueued.sum();
} }
@Override @Override

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.lang.management.MemoryType; import java.lang.management.MemoryType;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -38,11 +38,11 @@ import org.apache.hadoop.hbase.util.Pair;
public class RegionServerAccounting { public class RegionServerAccounting {
// memstore data size // memstore data size
private final AtomicLong globalMemstoreDataSize = new AtomicLong(0); private final LongAdder globalMemstoreDataSize = new LongAdder();
// memstore heap size. When off heap MSLAB in place, this will be only heap overhead of the Cell // memstore heap size. When off heap MSLAB in place, this will be only heap overhead of the Cell
// POJOs and entry overhead of them onto memstore. When on heap MSLAB, this will be include heap // POJOs and entry overhead of them onto memstore. When on heap MSLAB, this will be include heap
// overhead as well as the cell data size. Ya cell data is in on heap area only then. // overhead as well as the cell data size. Ya cell data is in on heap area only then.
private final AtomicLong globalMemstoreHeapSize = new AtomicLong(0); private final LongAdder globalMemstoreHeapSize = new LongAdder();
// Store the edits size during replaying WAL. Use this to roll back the // Store the edits size during replaying WAL. Use this to roll back the
// global memstore size once a region opening failed. // global memstore size once a region opening failed.
@ -115,14 +115,14 @@ public class RegionServerAccounting {
* @return the global Memstore data size in the RegionServer * @return the global Memstore data size in the RegionServer
*/ */
public long getGlobalMemstoreDataSize() { public long getGlobalMemstoreDataSize() {
return globalMemstoreDataSize.get(); return globalMemstoreDataSize.sum();
} }
/** /**
* @return the global memstore heap size in the RegionServer * @return the global memstore heap size in the RegionServer
*/ */
public long getGlobalMemstoreHeapSize() { public long getGlobalMemstoreHeapSize() {
return this.globalMemstoreHeapSize.get(); return this.globalMemstoreHeapSize.sum();
} }
/** /**
@ -130,13 +130,13 @@ public class RegionServerAccounting {
* the global Memstore size * the global Memstore size
*/ */
public void incGlobalMemstoreSize(MemstoreSize memStoreSize) { public void incGlobalMemstoreSize(MemstoreSize memStoreSize) {
globalMemstoreDataSize.addAndGet(memStoreSize.getDataSize()); globalMemstoreDataSize.add(memStoreSize.getDataSize());
globalMemstoreHeapSize.addAndGet(memStoreSize.getHeapSize()); globalMemstoreHeapSize.add(memStoreSize.getHeapSize());
} }
public void decGlobalMemstoreSize(MemstoreSize memStoreSize) { public void decGlobalMemstoreSize(MemstoreSize memStoreSize) {
globalMemstoreDataSize.addAndGet(-memStoreSize.getDataSize()); globalMemstoreDataSize.add(-memStoreSize.getDataSize());
globalMemstoreHeapSize.addAndGet(-memStoreSize.getHeapSize()); globalMemstoreHeapSize.add(-memStoreSize.getHeapSize());
} }
/** /**

View File

@ -76,7 +76,7 @@ public class WALSplitterHandler extends EventHandler {
SplitLogCounters.tot_wkr_task_done, splitTaskDetails); SplitLogCounters.tot_wkr_task_done, splitTaskDetails);
break; break;
case PREEMPTED: case PREEMPTED:
SplitLogCounters.tot_wkr_preempt_task.incrementAndGet(); SplitLogCounters.tot_wkr_preempt_task.increment();
LOG.warn("task execution preempted " + splitTaskDetails.getWALFile()); LOG.warn("task execution preempted " + splitTaskDetails.getWALFile());
break; break;
case ERR: case ERR:

View File

@ -45,6 +45,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -250,24 +251,24 @@ public final class Canary implements Tool {
public static class RegionStdOutSink extends StdOutSink { public static class RegionStdOutSink extends StdOutSink {
private Map<String, AtomicLong> perTableReadLatency = new HashMap<>(); private Map<String, LongAdder> perTableReadLatency = new HashMap<>();
private AtomicLong writeLatency = new AtomicLong(); private LongAdder writeLatency = new LongAdder();
public Map<String, AtomicLong> getReadLatencyMap() { public Map<String, LongAdder> getReadLatencyMap() {
return this.perTableReadLatency; return this.perTableReadLatency;
} }
public AtomicLong initializeAndGetReadLatencyForTable(String tableName) { public LongAdder initializeAndGetReadLatencyForTable(String tableName) {
AtomicLong initLatency = new AtomicLong(0L); LongAdder initLatency = new LongAdder();
this.perTableReadLatency.put(tableName, initLatency); this.perTableReadLatency.put(tableName, initLatency);
return initLatency; return initLatency;
} }
public void initializeWriteLatency() { public void initializeWriteLatency() {
this.writeLatency.set(0L); this.writeLatency.reset();
} }
public AtomicLong getWriteLatency() { public LongAdder getWriteLatency() {
return this.writeLatency; return this.writeLatency;
} }
} }
@ -323,10 +324,10 @@ public final class Canary implements Tool {
private TaskType taskType; private TaskType taskType;
private boolean rawScanEnabled; private boolean rawScanEnabled;
private ServerName serverName; private ServerName serverName;
private AtomicLong readWriteLatency; private LongAdder readWriteLatency;
RegionTask(Connection connection, HRegionInfo region, ServerName serverName, RegionStdOutSink sink, RegionTask(Connection connection, HRegionInfo region, ServerName serverName, RegionStdOutSink sink,
TaskType taskType, boolean rawScanEnabled, AtomicLong rwLatency) { TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency) {
this.connection = connection; this.connection = connection;
this.region = region; this.region = region;
this.serverName = serverName; this.serverName = serverName;
@ -414,7 +415,7 @@ public final class Canary implements Tool {
rs.next(); rs.next();
} }
stopWatch.stop(); stopWatch.stop();
this.readWriteLatency.addAndGet(stopWatch.getTime()); this.readWriteLatency.add(stopWatch.getTime());
sink.publishReadTiming(serverName, region, column, stopWatch.getTime()); sink.publishReadTiming(serverName, region, column, stopWatch.getTime());
} catch (Exception e) { } catch (Exception e) {
sink.publishReadFailure(serverName, region, column, e); sink.publishReadFailure(serverName, region, column, e);
@ -466,7 +467,7 @@ public final class Canary implements Tool {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
table.put(put); table.put(put);
long time = System.currentTimeMillis() - startTime; long time = System.currentTimeMillis() - startTime;
this.readWriteLatency.addAndGet(time); this.readWriteLatency.add(time);
sink.publishWriteTiming(serverName, region, column, time); sink.publishWriteTiming(serverName, region, column, time);
} catch (Exception e) { } catch (Exception e) {
sink.publishWriteFailure(serverName, region, column, e); sink.publishWriteFailure(serverName, region, column, e);
@ -1049,7 +1050,7 @@ public final class Canary implements Tool {
} }
this.initialized = true; this.initialized = true;
for (String table : tables) { for (String table : tables) {
AtomicLong readLatency = regionSink.initializeAndGetReadLatencyForTable(table); LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table);
taskFutures.addAll(Canary.sniff(admin, regionSink, table, executor, TaskType.READ, taskFutures.addAll(Canary.sniff(admin, regionSink, table, executor, TaskType.READ,
this.rawScanEnabled, readLatency)); this.rawScanEnabled, readLatency));
} }
@ -1068,7 +1069,7 @@ public final class Canary implements Tool {
} }
// sniff canary table with write operation // sniff canary table with write operation
regionSink.initializeWriteLatency(); regionSink.initializeWriteLatency();
AtomicLong writeTableLatency = regionSink.getWriteLatency(); LongAdder writeTableLatency = regionSink.getWriteLatency();
taskFutures.addAll(Canary.sniff(admin, regionSink, admin.getTableDescriptor(writeTableName), taskFutures.addAll(Canary.sniff(admin, regionSink, admin.getTableDescriptor(writeTableName),
executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency)); executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency));
} }
@ -1080,7 +1081,7 @@ public final class Canary implements Tool {
LOG.error("Sniff region failed!", e); LOG.error("Sniff region failed!", e);
} }
} }
Map<String, AtomicLong> actualReadTableLatency = regionSink.getReadLatencyMap(); Map<String, LongAdder> actualReadTableLatency = regionSink.getReadLatencyMap();
for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) { for (Map.Entry<String, Long> entry : configuredReadTableTimeouts.entrySet()) {
String tableName = entry.getKey(); String tableName = entry.getKey();
if (actualReadTableLatency.containsKey(tableName)) { if (actualReadTableLatency.containsKey(tableName)) {
@ -1167,7 +1168,7 @@ public final class Canary implements Tool {
for (HTableDescriptor table : admin.listTables()) { for (HTableDescriptor table : admin.listTables()) {
if (admin.isTableEnabled(table.getTableName()) if (admin.isTableEnabled(table.getTableName())
&& (!table.getTableName().equals(writeTableName))) { && (!table.getTableName().equals(writeTableName))) {
AtomicLong readLatency = regionSink.initializeAndGetReadLatencyForTable(table.getNameAsString()); LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table.getNameAsString());
taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType, this.rawScanEnabled, readLatency)); taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType, this.rawScanEnabled, readLatency));
} }
} }
@ -1235,7 +1236,7 @@ public final class Canary implements Tool {
* @throws Exception * @throws Exception
*/ */
private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName, private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
ExecutorService executor, TaskType taskType, boolean rawScanEnabled, AtomicLong readLatency) throws Exception { ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency) throws Exception {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(String.format("checking table is enabled and getting table descriptor for table %s", LOG.debug(String.format("checking table is enabled and getting table descriptor for table %s",
tableName)); tableName));
@ -1254,7 +1255,7 @@ public final class Canary implements Tool {
*/ */
private static List<Future<Void>> sniff(final Admin admin, final Sink sink, private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType, HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType,
boolean rawScanEnabled, AtomicLong rwLatency) throws Exception { boolean rawScanEnabled, LongAdder rwLatency) throws Exception {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(String.format("reading list of regions for table %s", tableDesc.getTableName())); LOG.debug(String.format("reading list of regions for table %s", tableDesc.getTableName()));

View File

@ -37,6 +37,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -139,7 +140,7 @@ public class TestBucketWriterThread {
RAMQueueEntry spiedRqe = Mockito.spy(rqe); RAMQueueEntry spiedRqe = Mockito.spy(rqe);
Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe). Mockito.doThrow(new IOException("Mocked!")).when(spiedRqe).
writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(), writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
(UniqueIndexMap<Integer>)Mockito.any(), (AtomicLong)Mockito.any()); (UniqueIndexMap<Integer>)Mockito.any(), (LongAdder) Mockito.any());
this.q.add(spiedRqe); this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q); doDrainOfOneEntry(bc, wt, q);
// Cache disabled when ioes w/o ever healing. // Cache disabled when ioes w/o ever healing.
@ -162,7 +163,7 @@ public class TestBucketWriterThread {
Mockito.doThrow(cfe). Mockito.doThrow(cfe).
doReturn(mockedBucketEntry). doReturn(mockedBucketEntry).
when(spiedRqe).writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(), when(spiedRqe).writeToCache((IOEngine)Mockito.any(), (BucketAllocator)Mockito.any(),
(UniqueIndexMap<Integer>)Mockito.any(), (AtomicLong)Mockito.any()); (UniqueIndexMap<Integer>)Mockito.any(), (LongAdder) Mockito.any());
this.q.add(spiedRqe); this.q.add(spiedRqe);
doDrainOfOneEntry(bc, wt, q); doDrainOfOneEntry(bc, wt, q);
} }

View File

@ -45,6 +45,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -1049,15 +1050,15 @@ public class TestDistributedLogSplitting {
long waitTime = 80000; long waitTime = 80000;
long endt = curt + waitTime; long endt = curt + waitTime;
while (curt < endt) { while (curt < endt) {
if ((tot_wkr_task_resigned.get() + tot_wkr_task_err.get() + if ((tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() + tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
tot_wkr_preempt_task.get()) == 0) { tot_wkr_preempt_task.sum()) == 0) {
Thread.yield(); Thread.yield();
curt = System.currentTimeMillis(); curt = System.currentTimeMillis();
} else { } else {
assertTrue(1 <= (tot_wkr_task_resigned.get() + tot_wkr_task_err.get() + assertTrue(1 <= (tot_wkr_task_resigned.sum() + tot_wkr_task_err.sum() +
tot_wkr_final_transition_failed.get() + tot_wkr_task_done.get() + tot_wkr_final_transition_failed.sum() + tot_wkr_task_done.sum() +
tot_wkr_preempt_task.get())); tot_wkr_preempt_task.sum()));
return; return;
} }
} }
@ -1717,16 +1718,16 @@ public class TestDistributedLogSplitting {
} }
} }
private void waitForCounter(AtomicLong ctr, long oldval, long newval, private void waitForCounter(LongAdder ctr, long oldval, long newval,
long timems) { long timems) {
long curt = System.currentTimeMillis(); long curt = System.currentTimeMillis();
long endt = curt + timems; long endt = curt + timems;
while (curt < endt) { while (curt < endt) {
if (ctr.get() == oldval) { if (ctr.sum() == oldval) {
Thread.yield(); Thread.yield();
curt = System.currentTimeMillis(); curt = System.currentTimeMillis();
} else { } else {
assertEquals(newval, ctr.get()); assertEquals(newval, ctr.sum());
return; return;
} }
} }

View File

@ -40,6 +40,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -171,12 +172,12 @@ public class TestSplitLogManager {
long eval(); long eval();
} }
private void waitForCounter(final AtomicLong ctr, long oldval, long newval, long timems) private void waitForCounter(final LongAdder ctr, long oldval, long newval, long timems)
throws Exception { throws Exception {
Expr e = new Expr() { Expr e = new Expr() {
@Override @Override
public long eval() { public long eval() {
return ctr.get(); return ctr.sum();
} }
}; };
waitForCounter(e, oldval, newval, timems); waitForCounter(e, oldval, newval, timems);
@ -199,7 +200,7 @@ public class TestSplitLogManager {
private Task findOrCreateOrphanTask(String path) { private Task findOrCreateOrphanTask(String path) {
return slm.tasks.computeIfAbsent(path, k -> { return slm.tasks.computeIfAbsent(path, k -> {
LOG.info("creating orphan task " + k); LOG.info("creating orphan task " + k);
SplitLogCounters.tot_mgr_orphan_task_acquired.incrementAndGet(); SplitLogCounters.tot_mgr_orphan_task_acquired.increment();
return new Task(); return new Task();
}); });
} }
@ -214,7 +215,7 @@ public class TestSplitLogManager {
slm.enqueueSplitTask(name, batch); slm.enqueueSplitTask(name, batch);
assertEquals(1, batch.installed); assertEquals(1, batch.installed);
assertTrue(findOrCreateOrphanTask(tasknode).batch == batch); assertTrue(findOrCreateOrphanTask(tasknode).batch == batch);
assertEquals(1L, tot_mgr_node_create_queued.get()); assertEquals(1L, tot_mgr_node_create_queued.sum());
LOG.debug("waiting for task node creation"); LOG.debug("waiting for task node creation");
listener.waitForCreation(); listener.waitForCreation();
@ -286,7 +287,7 @@ public class TestSplitLogManager {
Task task2 = findOrCreateOrphanTask(tasknode); Task task2 = findOrCreateOrphanTask(tasknode);
assertTrue(task == task2); assertTrue(task == task2);
LOG.debug("task = " + task); LOG.debug("task = " + task);
assertEquals(1L, tot_mgr_resubmit.get()); assertEquals(1L, tot_mgr_resubmit.sum());
assertEquals(1, task.incarnation.get()); assertEquals(1, task.incarnation.get());
assertEquals(0, task.unforcedResubmits.get()); assertEquals(0, task.unforcedResubmits.get());
assertTrue(task.isOrphan()); assertTrue(task.isOrphan());
@ -323,7 +324,7 @@ public class TestSplitLogManager {
waitForCounter(tot_mgr_heartbeat, 2, 3, to/2); waitForCounter(tot_mgr_heartbeat, 2, 3, to/2);
waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2); waitForCounter(tot_mgr_resubmit_threshold_reached, 0, 1, to + to/2);
Thread.sleep(to + to/2); Thread.sleep(to + to/2);
assertEquals(2L, tot_mgr_resubmit.get() - tot_mgr_resubmit_force.get()); assertEquals(2L, tot_mgr_resubmit.sum() - tot_mgr_resubmit_force.sum());
} }
@Test (timeout=180000) @Test (timeout=180000)
@ -342,10 +343,10 @@ public class TestSplitLogManager {
waitForCounter(new Expr() { waitForCounter(new Expr() {
@Override @Override
public long eval() { public long eval() {
return (tot_mgr_resubmit.get() + tot_mgr_resubmit_failed.get()); return (tot_mgr_resubmit.sum() + tot_mgr_resubmit_failed.sum());
} }
}, 0, 1, 5*60000); // wait long enough }, 0, 1, 5*60000); // wait long enough
Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.get()); Assert.assertEquals("Could not run test. Lost ZK connection?", 0, tot_mgr_resubmit_failed.sum());
int version1 = ZKUtil.checkExists(zkw, tasknode); int version1 = ZKUtil.checkExists(zkw, tasknode);
assertTrue(version1 > version); assertTrue(version1 > version);
byte[] taskstate = ZKUtil.getData(zkw, tasknode); byte[] taskstate = ZKUtil.getData(zkw, tasknode);
@ -400,23 +401,23 @@ public class TestSplitLogManager {
@Test (timeout=180000) @Test (timeout=180000)
public void testTaskResigned() throws Exception { public void testTaskResigned() throws Exception {
LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state"); LOG.info("TestTaskResigned - resubmit task node once in RESIGNED state");
assertEquals(tot_mgr_resubmit.get(), 0); assertEquals(tot_mgr_resubmit.sum(), 0);
slm = new SplitLogManager(master, conf); slm = new SplitLogManager(master, conf);
assertEquals(tot_mgr_resubmit.get(), 0); assertEquals(tot_mgr_resubmit.sum(), 0);
TaskBatch batch = new TaskBatch(); TaskBatch batch = new TaskBatch();
String tasknode = submitTaskAndWait(batch, "foo/1"); String tasknode = submitTaskAndWait(batch, "foo/1");
assertEquals(tot_mgr_resubmit.get(), 0); assertEquals(tot_mgr_resubmit.sum(), 0);
final ServerName worker1 = ServerName.valueOf("worker1,1,1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1");
assertEquals(tot_mgr_resubmit.get(), 0); assertEquals(tot_mgr_resubmit.sum(), 0);
SplitLogTask slt = new SplitLogTask.Resigned(worker1, this.mode); SplitLogTask slt = new SplitLogTask.Resigned(worker1, this.mode);
assertEquals(tot_mgr_resubmit.get(), 0); assertEquals(tot_mgr_resubmit.sum(), 0);
ZKUtil.setData(zkw, tasknode, slt.toByteArray()); ZKUtil.setData(zkw, tasknode, slt.toByteArray());
ZKUtil.checkExists(zkw, tasknode); ZKUtil.checkExists(zkw, tasknode);
// Could be small race here. // Could be small race here.
if (tot_mgr_resubmit.get() == 0) { if (tot_mgr_resubmit.sum() == 0) {
waitForCounter(tot_mgr_resubmit, 0, 1, to/2); waitForCounter(tot_mgr_resubmit, 0, 1, to/2);
} }
assertEquals(tot_mgr_resubmit.get(), 1); assertEquals(tot_mgr_resubmit.sum(), 1);
byte[] taskstate = ZKUtil.getData(zkw, tasknode); byte[] taskstate = ZKUtil.getData(zkw, tasknode);
slt = SplitLogTask.parseFrom(taskstate); slt = SplitLogTask.parseFrom(taskstate);
@ -472,10 +473,10 @@ public class TestSplitLogManager {
final ServerName worker1 = ServerName.valueOf("worker1,1,1"); final ServerName worker1 = ServerName.valueOf("worker1,1,1");
SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode); SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray()); ZKUtil.setData(zkw, tasknode, slt.toByteArray());
if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); if (tot_mgr_heartbeat.sum() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
slm.handleDeadWorker(worker1); slm.handleDeadWorker(worker1);
if (tot_mgr_resubmit.get() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2); if (tot_mgr_resubmit.sum() == 0) waitForCounter(tot_mgr_resubmit, 0, 1, to+to/2);
if (tot_mgr_resubmit_dead_server_task.get() == 0) { if (tot_mgr_resubmit_dead_server_task.sum() == 0) {
waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2); waitForCounter(tot_mgr_resubmit_dead_server_task, 0, 1, to + to/2);
} }
@ -497,10 +498,10 @@ public class TestSplitLogManager {
SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode); SplitLogTask slt = new SplitLogTask.Owned(worker1, this.mode);
ZKUtil.setData(zkw, tasknode, slt.toByteArray()); ZKUtil.setData(zkw, tasknode, slt.toByteArray());
if (tot_mgr_heartbeat.get() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2); if (tot_mgr_heartbeat.sum() == 0) waitForCounter(tot_mgr_heartbeat, 0, 1, to/2);
// Not yet resubmitted. // Not yet resubmitted.
Assert.assertEquals(0, tot_mgr_resubmit.get()); Assert.assertEquals(0, tot_mgr_resubmit.sum());
// This server becomes dead // This server becomes dead
Mockito.when(sm.isServerOnline(worker1)).thenReturn(false); Mockito.when(sm.isServerOnline(worker1)).thenReturn(false);
@ -508,7 +509,7 @@ public class TestSplitLogManager {
Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded). Thread.sleep(1300); // The timeout checker is done every 1000 ms (hardcoded).
// It has been resubmitted // It has been resubmitted
Assert.assertEquals(1, tot_mgr_resubmit.get()); Assert.assertEquals(1, tot_mgr_resubmit.sum());
} }
@Test (timeout=180000) @Test (timeout=180000)

View File

@ -27,7 +27,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -151,32 +151,32 @@ public class TestSplitLogWorker {
} }
} }
private void waitForCounter(AtomicLong ctr, long oldval, long newval, long timems) private void waitForCounter(LongAdder ctr, long oldval, long newval, long timems)
throws Exception { throws Exception {
assertTrue("ctr=" + ctr.get() + ", oldval=" + oldval + ", newval=" + newval, assertTrue("ctr=" + ctr.sum() + ", oldval=" + oldval + ", newval=" + newval,
waitForCounterBoolean(ctr, oldval, newval, timems)); waitForCounterBoolean(ctr, oldval, newval, timems));
} }
private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, long newval, private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, long newval,
long timems) throws Exception { long timems) throws Exception {
return waitForCounterBoolean(ctr, oldval, newval, timems, true); return waitForCounterBoolean(ctr, oldval, newval, timems, true);
} }
private boolean waitForCounterBoolean(final AtomicLong ctr, final long oldval, final long newval, private boolean waitForCounterBoolean(final LongAdder ctr, final long oldval, final long newval,
long timems, boolean failIfTimeout) throws Exception { long timems, boolean failIfTimeout) throws Exception {
long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout, long timeWaited = TEST_UTIL.waitFor(timems, 10, failIfTimeout,
new Waiter.Predicate<Exception>() { new Waiter.Predicate<Exception>() {
@Override @Override
public boolean evaluate() throws Exception { public boolean evaluate() throws Exception {
return (ctr.get() >= newval); return (ctr.sum() >= newval);
} }
}); });
if( timeWaited > 0) { if( timeWaited > 0) {
// when not timed out // when not timed out
assertEquals(newval, ctr.get()); assertEquals(newval, ctr.sum());
} }
return true; return true;
} }
@ -293,7 +293,7 @@ public class TestSplitLogWorker {
// not it, that we fell through to the next counter in line and it was set. // not it, that we fell through to the next counter in line and it was set.
assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1, assertTrue(waitForCounterBoolean(SplitLogCounters.tot_wkr_failed_to_grab_task_owned, 0, 1,
WAIT_TIME, false) || WAIT_TIME, false) ||
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.get() == 1); SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.sum() == 1);
byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT)); byte [] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TRFT));
SplitLogTask slt = SplitLogTask.parseFrom(bytes); SplitLogTask slt = SplitLogTask.parseFrom(bytes);
assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2)); assertTrue(slt.isOwned(SVR1) || slt.isOwned(SVR2));

View File

@ -30,7 +30,7 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -150,9 +150,9 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
} }
} }
private final AtomicLong failedIncrements = new AtomicLong(); private final LongAdder failedIncrements = new LongAdder();
private final AtomicLong successfulCoalescings = new AtomicLong(); private final LongAdder successfulCoalescings = new LongAdder();
private final AtomicLong totalIncrements = new AtomicLong(); private final LongAdder totalIncrements = new LongAdder();
private final ConcurrentMap<FullyQualifiedRow, Long> countersMap = private final ConcurrentMap<FullyQualifiedRow, Long> countersMap =
new ConcurrentHashMap<>(100000, 0.75f, 1500); new ConcurrentHashMap<>(100000, 0.75f, 1500);
private final ThreadPoolExecutor pool; private final ThreadPoolExecutor pool;
@ -176,7 +176,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
public boolean queueIncrement(TIncrement inc) throws TException { public boolean queueIncrement(TIncrement inc) throws TException {
if (!canQueue()) { if (!canQueue()) {
failedIncrements.incrementAndGet(); failedIncrements.increment();
return false; return false;
} }
return internalQueueTincrement(inc); return internalQueueTincrement(inc);
@ -184,7 +184,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
public boolean queueIncrements(List<TIncrement> incs) throws TException { public boolean queueIncrements(List<TIncrement> incs) throws TException {
if (!canQueue()) { if (!canQueue()) {
failedIncrements.incrementAndGet(); failedIncrements.increment();
return false; return false;
} }
@ -211,7 +211,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
//Make sure that the number of threads is scaled. //Make sure that the number of threads is scaled.
dynamicallySetCoreSize(countersMapSize); dynamicallySetCoreSize(countersMapSize);
totalIncrements.incrementAndGet(); totalIncrements.increment();
FullyQualifiedRow key = new FullyQualifiedRow(tableName, rowKey, fam, qual); FullyQualifiedRow key = new FullyQualifiedRow(tableName, rowKey, fam, qual);
@ -224,7 +224,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
value = Long.valueOf(currentAmount); value = Long.valueOf(currentAmount);
} else { } else {
value += currentAmount; value += currentAmount;
successfulCoalescings.incrementAndGet(); successfulCoalescings.increment();
} }
// Try to put the value, only if there was none // Try to put the value, only if there was none
Long oldValue = countersMap.putIfAbsent(key, value); Long oldValue = countersMap.putIfAbsent(key, value);
@ -354,15 +354,15 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
pool.setMaximumPoolSize(newMaxSize); pool.setMaximumPoolSize(newMaxSize);
} }
public long getFailedIncrements() { public long getFailedIncrements() {
return failedIncrements.get(); return failedIncrements.sum();
} }
public long getSuccessfulCoalescings() { public long getSuccessfulCoalescings() {
return successfulCoalescings.get(); return successfulCoalescings.sum();
} }
public long getTotalIncrements() { public long getTotalIncrements() {
return totalIncrements.get(); return totalIncrements.sum();
} }
public long getCountersMapSize() { public long getCountersMapSize() {