HBASE-17936 Refine sum endpoint example in ref guide
Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
75d1e0361a
commit
d15f75b3cf
|
@ -610,7 +610,7 @@ The effect is that the duplicate coprocessor is effectively ignored.
|
||||||
+
|
+
|
||||||
[source, java]
|
[source, java]
|
||||||
----
|
----
|
||||||
public class SumEndPoint extends SumService implements Coprocessor, CoprocessorService {
|
public class SumEndPoint extends Sum.SumService implements Coprocessor, CoprocessorService {
|
||||||
|
|
||||||
private RegionCoprocessorEnvironment env;
|
private RegionCoprocessorEnvironment env;
|
||||||
|
|
||||||
|
@ -630,31 +630,33 @@ public class SumEndPoint extends SumService implements Coprocessor, CoprocessorS
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stop(CoprocessorEnvironment env) throws IOException {
|
public void stop(CoprocessorEnvironment env) throws IOException {
|
||||||
// do mothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void getSum(RpcController controller, SumRequest request, RpcCallback done) {
|
public void getSum(RpcController controller, Sum.SumRequest request, RpcCallback<Sum.SumResponse> done) {
|
||||||
Scan scan = new Scan();
|
Scan scan = new Scan();
|
||||||
scan.addFamily(Bytes.toBytes(request.getFamily()));
|
scan.addFamily(Bytes.toBytes(request.getFamily()));
|
||||||
scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));
|
scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));
|
||||||
SumResponse response = null;
|
|
||||||
|
Sum.SumResponse response = null;
|
||||||
InternalScanner scanner = null;
|
InternalScanner scanner = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
scanner = env.getRegion().getScanner(scan);
|
scanner = env.getRegion().getScanner(scan);
|
||||||
List results = new ArrayList();
|
List<Cell> results = new ArrayList<>();
|
||||||
boolean hasMore = false;
|
boolean hasMore = false;
|
||||||
long sum = 0L;
|
long sum = 0L;
|
||||||
do {
|
|
||||||
hasMore = scanner.next(results);
|
|
||||||
for (Cell cell : results) {
|
|
||||||
sum = sum + Bytes.toLong(CellUtil.cloneValue(cell));
|
|
||||||
}
|
|
||||||
results.clear();
|
|
||||||
} while (hasMore);
|
|
||||||
|
|
||||||
response = SumResponse.newBuilder().setSum(sum).build();
|
do {
|
||||||
|
hasMore = scanner.next(results);
|
||||||
|
for (Cell cell : results) {
|
||||||
|
sum = sum + Bytes.toLong(CellUtil.cloneValue(cell));
|
||||||
|
}
|
||||||
|
results.clear();
|
||||||
|
} while (hasMore);
|
||||||
|
|
||||||
|
response = Sum.SumResponse.newBuilder().setSum(sum).build();
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
ResponseConverter.setControllerException(controller, ioe);
|
ResponseConverter.setControllerException(controller, ioe);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -664,6 +666,7 @@ public class SumEndPoint extends SumService implements Coprocessor, CoprocessorS
|
||||||
} catch (IOException ignored) {}
|
} catch (IOException ignored) {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
done.run(response);
|
done.run(response);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -681,24 +684,29 @@ Table table = connection.getTable(tableName);
|
||||||
//HConnection connection = HConnectionManager.createConnection(conf);
|
//HConnection connection = HConnectionManager.createConnection(conf);
|
||||||
//HTableInterface table = connection.getTable("users");
|
//HTableInterface table = connection.getTable("users");
|
||||||
|
|
||||||
final SumRequest request = SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross")
|
final Sum.SumRequest request = Sum.SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross").build();
|
||||||
.build();
|
|
||||||
try {
|
try {
|
||||||
Map<byte[], Long> results = table.CoprocessorService (SumService.class, null, null,
|
Map<byte[], Long> results = table.coprocessorService(
|
||||||
new Batch.Call<SumService, Long>() {
|
Sum.SumService.class,
|
||||||
@Override
|
null, /* start key */
|
||||||
public Long call(SumService aggregate) throws IOException {
|
null, /* end key */
|
||||||
BlockingRpcCallback rpcCallback = new BlockingRpcCallback();
|
new Batch.Call<Sum.SumService, Long>() {
|
||||||
aggregate.getSum(null, request, rpcCallback);
|
@Override
|
||||||
SumResponse response = rpcCallback.get();
|
public Long call(Sum.SumService aggregate) throws IOException {
|
||||||
return response.hasSum() ? response.getSum() : 0L;
|
BlockingRpcCallback<Sum.SumResponse> rpcCallback = new BlockingRpcCallback<>();
|
||||||
|
aggregate.getSum(null, request, rpcCallback);
|
||||||
|
Sum.SumResponse response = rpcCallback.get();
|
||||||
|
|
||||||
|
return response.hasSum() ? response.getSum() : 0L;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
);
|
||||||
|
|
||||||
for (Long sum : results.values()) {
|
for (Long sum : results.values()) {
|
||||||
System.out.println("Sum = " + sum);
|
System.out.println("Sum = " + sum);
|
||||||
}
|
}
|
||||||
} catch (ServiceException e) {
|
} catch (ServiceException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue