HBASE-5139 Compute (weighted) median using AggregateProtocol

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1230200 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-01-11 19:24:48 +00:00
parent 39a1e07ce4
commit 5ad571ab6c
4 changed files with 211 additions and 1 deletions

View File

@ -23,13 +23,20 @@ package org.apache.hadoop.hbase.client.coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.AggregateProtocol;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
@ -362,4 +369,133 @@ public class AggregationClient {
return res;
}
/**
* It helps locate the region with median for a given column whose weight
* is specified in an optional column.
* From individual regions, it obtains sum of values and sum of weights.
* @param tableName
* @param ci
* @param scan
* @return pair whose first element is a map between start row of the region
* and (sum of values, sum of weights) for the region, the second element is
* (sum of values, sum of weights) for all the regions chosen
* @throws Throwable
*/
private <R, S> Pair<NavigableMap<byte[], List<S>>, List<S>>
getMedianArgs(final byte[] tableName,
final ColumnInterpreter<R, S> ci, final Scan scan) throws Throwable {
validateParameters(scan);
final NavigableMap<byte[], List<S>> map =
new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR);
class StdCallback implements Batch.Callback<List<S>> {
S sumVal = null, sumWeights = null;
public Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() {
List<S> l = new ArrayList<S>();
l.add(sumVal);
l.add(sumWeights);
Pair<NavigableMap<byte[], List<S>>, List<S>> p =
new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l);
return p;
}
@Override
public synchronized void update(byte[] region, byte[] row, List<S> result) {
map.put(row, result);
sumVal = ci.add(sumVal, result.get(0));
sumWeights = ci.add(sumWeights, result.get(1));
}
}
StdCallback stdCallback = new StdCallback();
HTable table = new HTable(conf, tableName);
table.coprocessorExec(AggregateProtocol.class, scan.getStartRow(), scan
.getStopRow(),
new Batch.Call<AggregateProtocol, List<S>>() {
@Override
public List<S> call(AggregateProtocol instance)
throws IOException {
return instance.getMedian(ci, scan);
}
}, stdCallback);
return stdCallback.getMedianParams();
}
/**
* This is the client side interface/handler for calling the median method for a
* given cf-cq combination. This method collects the necessary parameters
* to compute the median and returns the median.
* @param tableName
* @param ci
* @param scan
* @return R the median
* @throws Throwable
*/
public <R, S> R median(final byte[] tableName, ColumnInterpreter<R, S> ci,
Scan scan) throws Throwable {
Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(tableName, ci, scan);
byte[] startRow = null;
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
NavigableMap<byte[], List<S>> map = p.getFirst();
S sumVal = p.getSecond().get(0);
S sumWeights = p.getSecond().get(1);
double halfSumVal = ci.divideForAvg(sumVal, 2L);
double movingSumVal = 0;
boolean weighted = false;
if (quals.size() > 1) {
weighted = true;
halfSumVal = ci.divideForAvg(sumWeights, 2L);
}
for (Map.Entry<byte[], List<S>> entry : map.entrySet()) {
S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0);
double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
if (newSumVal > halfSumVal) break; // we found the region with the median
movingSumVal = newSumVal;
startRow = entry.getKey();
}
// scan the region with median and find it
Scan scan2 = new Scan(scan);
// inherit stop row from method parameter
scan2.setStartRow(startRow);
HTable table = new HTable(conf, tableName);
int cacheSize = scan2.getCaching();
if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) {
scan2.setCacheBlocks(true);
cacheSize = 5;
scan2.setCaching(cacheSize);
}
ResultScanner scanner = table.getScanner(scan2);
Result[] results = null;
byte[] qualifier = quals.pollFirst();
// qualifier for the weight column
byte[] weightQualifier = weighted ? quals.pollLast() : qualifier;
R value = null;
try {
do {
results = scanner.next(cacheSize);
if (results != null && results.length > 0) {
for (int i = 0; i < results.length; i++) {
Result r = results[i];
// retrieve weight
KeyValue kv = r.getColumnLatest(colFamily, weightQualifier);
R newValue = ci.getValue(colFamily, weightQualifier, kv);
S s = ci.castToReturnType(newValue);
double newSumVal = movingSumVal + ci.divideForAvg(s, 1L);
// see if we have moved past the median
if (newSumVal > halfSumVal) {
return value;
}
movingSumVal = newSumVal;
kv = r.getColumnLatest(colFamily, qualifier);
value = ci.getValue(colFamily, qualifier, kv);
}
}
} while (results != null && results.length > 0);
} finally {
scanner.close();
}
return null;
}
}

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -232,4 +233,45 @@ public class AggregateImplementation extends BaseEndpointCoprocessor implements
return p;
}
@Override
public <T, S> List<S> getMedian(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException {
S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())
.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily);
byte[] valQualifier = quals.pollFirst();
// if weighted median is requested, get qualifier for the weight column
byte[] weightQualifier = quals.size() > 1 ? quals.pollLast() : null;
List<KeyValue> results = new ArrayList<KeyValue>();
boolean hasMoreRows = false;
try {
do {
tempVal = null;
tempWeight = null;
hasMoreRows = scanner.next(results);
for (KeyValue kv : results) {
tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily,
valQualifier, kv)));
if (weightQualifier != null) {
tempWeight = ci.add(tempWeight,
ci.castToReturnType(ci.getValue(colFamily, weightQualifier, kv)));
}
}
results.clear();
sumVal = ci.add(sumVal, tempVal);
sumWeights = ci.add(sumWeights, tempWeight);
} while (hasMoreRows);
} finally {
scanner.close();
}
List<S> l = new ArrayList<S>();
l.add(sumVal);
l.add(sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : sumWeights);
return l;
}
}

View File

@ -126,4 +126,19 @@ public interface AggregateProtocol extends CoprocessorProtocol {
<T, S> Pair<List<S>, Long> getStd(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException;
}
/**
* Gives a List containing sum of values and sum of weights.
* It is computed for the combination of column
* family and column qualifier(s) in the given row range as defined in the
* Scan object. In its current implementation, it takes one column family and
* two column qualifiers. The first qualifier is for values column and
* the second qualifier (optional) is for weight column.
* @param ci
* @param scan
* @return Pair
* @throws IOException
*/
<T, S> List<S> getMedian(ColumnInterpreter<T, S> ci, Scan scan)
throws IOException;
}

View File

@ -123,6 +123,23 @@ public class TestAggregateProtocol {
return ret;
}
/**
* ****************** Test cases for Median **********************
*/
/**
* @throws Throwable
*/
@Test
public void testMedianWithValidRange() throws Throwable {
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(TEST_FAMILY,TEST_QUALIFIER);
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
long median = aClient.median(TEST_TABLE, ci,
scan);
assertEquals(8L, median);
}
/**
* **************************** ROW COUNT Test cases *******************
*/