HBASE-3862 Race conditions in aggregate calculation
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1100043 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d60cceaff3
commit
3450c962ad
|
@ -99,6 +99,7 @@ Release 0.91.0 - Unreleased
|
|||
HBASE-3777 Redefine Identity Of HBase Configuration (Karthick Sankarachary)
|
||||
HBASE-3849 Fix master ui; hbase-1502 broke requests/second
|
||||
HBASE-3853 Fix TestInfoServers to pass after HBASE-3835 (todd)
|
||||
HBASE-3862 Race conditions in aggregate calculation (John Heitmann)
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
|
||||
|
|
|
@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.client.coprocessor;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -92,7 +93,7 @@ public class AggregationClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void update(byte[] region, byte[] row, R result) {
|
||||
public synchronized void update(byte[] region, byte[] row, R result) {
|
||||
max = ci.compare(max, result) < 0 ? result : max;
|
||||
}
|
||||
}
|
||||
|
@ -141,7 +142,7 @@ public class AggregationClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void update(byte[] region, byte[] row, R result) {
|
||||
public synchronized void update(byte[] region, byte[] row, R result) {
|
||||
min = (min == null || ci.compare(result, min) < 0) ? result : min;
|
||||
}
|
||||
}
|
||||
|
@ -176,15 +177,15 @@ public class AggregationClient {
|
|||
final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
|
||||
validateParameters(scan);
|
||||
class RowNumCallback implements Batch.Callback<Long> {
|
||||
private long rowCountL = 0l;
|
||||
private final AtomicLong rowCountL = new AtomicLong(0);
|
||||
|
||||
public long getRowNumCount() {
|
||||
return rowCountL;
|
||||
return rowCountL.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(byte[] region, byte[] row, Long result) {
|
||||
rowCountL += result.longValue();
|
||||
rowCountL.addAndGet(result.longValue());
|
||||
}
|
||||
}
|
||||
RowNumCallback rowNum = new RowNumCallback();
|
||||
|
@ -219,7 +220,7 @@ public class AggregationClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void update(byte[] region, byte[] row, S result) {
|
||||
public synchronized void update(byte[] region, byte[] row, S result) {
|
||||
sumVal = ci.add(sumVal, result);
|
||||
}
|
||||
}
|
||||
|
@ -255,7 +256,7 @@ public class AggregationClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void update(byte[] region, byte[] row, Pair<S, Long> result) {
|
||||
public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) {
|
||||
sum = ci.add(sum, result.getFirst());
|
||||
rowCount += result.getSecond();
|
||||
}
|
||||
|
@ -317,7 +318,7 @@ public class AggregationClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
|
||||
public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) {
|
||||
sumVal = ci.add(sumVal, result.getFirst().get(0));
|
||||
sumSqVal = ci.add(sumSqVal, result.getFirst().get(1));
|
||||
rowCountVal += result.getSecond();
|
||||
|
|
Loading…
Reference in New Issue