HBASE-9598 Non thread safe increment of task.unforcedResubmits in SplitLogManager#resubmit()

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1528568 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-10-02 17:21:47 +00:00
parent 3d0c0cb394
commit 03adb3aae4
2 changed files with 6 additions and 5 deletions

View File

@ -33,6 +33,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
@ -856,7 +857,7 @@ public class SplitLogManager extends ZooKeeperListener {
" while the timeout is " + timeout);
return false;
}
if (task.unforcedResubmits >= resubmit_threshold) {
if (task.unforcedResubmits.get() >= resubmit_threshold) {
if (!task.resubmitThresholdReached) {
task.resubmitThresholdReached = true;
SplitLogCounters.tot_mgr_resubmit_threshold_reached.incrementAndGet();
@ -904,7 +905,7 @@ public class SplitLogManager extends ZooKeeperListener {
}
// don't count forced resubmits
if (directive != FORCE) {
task.unforcedResubmits++;
task.unforcedResubmits.incrementAndGet();
}
task.setUnassigned();
createRescanNode(Long.MAX_VALUE);
@ -1281,7 +1282,7 @@ public class SplitLogManager extends ZooKeeperListener {
volatile TaskBatch batch;
volatile TerminationStatus status;
volatile int incarnation;
volatile int unforcedResubmits;
volatile AtomicInteger unforcedResubmits = new AtomicInteger();
volatile boolean resubmitThresholdReached;
@Override
@ -1291,7 +1292,7 @@ public class SplitLogManager extends ZooKeeperListener {
" cur_worker_name = " + cur_worker_name +
" status = " + status +
" incarnation = " + incarnation +
" resubmits = " + unforcedResubmits +
" resubmits = " + unforcedResubmits.get() +
" batch = " + batch);
}

View File

@ -255,7 +255,7 @@ public class TestSplitLogManager {
LOG.debug("task = " + task);
assertEquals(1L, tot_mgr_resubmit.get());
assertEquals(1, task.incarnation);
assertEquals(0, task.unforcedResubmits);
assertEquals(0, task.unforcedResubmits.get());
assertTrue(task.isOrphan());
assertTrue(task.isUnassigned());
assertTrue(ZKUtil.checkExists(zkw, tasknode) > version);