HBASE-4439 Move ClientScanner out of HTable (Lars H)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1222551 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6313293e51
commit
8b4c4f094e
|
@ -0,0 +1,72 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper class for custom client scanners.
|
||||||
|
*/
|
||||||
|
public abstract class AbstractClientScanner implements ResultScanner {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<Result> iterator() {
|
||||||
|
return new Iterator<Result>() {
|
||||||
|
// The next RowResult, possibly pre-read
|
||||||
|
Result next = null;
|
||||||
|
|
||||||
|
// return true if there is another item pending, false if there isn't.
|
||||||
|
// this method is where the actual advancing takes place, but you need
|
||||||
|
// to call next() to consume it. hasNext() will only advance if there
|
||||||
|
// isn't a pending next().
|
||||||
|
public boolean hasNext() {
|
||||||
|
if (next == null) {
|
||||||
|
try {
|
||||||
|
next = AbstractClientScanner.this.next();
|
||||||
|
return next != null;
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// get the pending next item and advance the iterator. returns null if
|
||||||
|
// there is no next item.
|
||||||
|
public Result next() {
|
||||||
|
// since hasNext() does the real advancing, we call this to determine
|
||||||
|
// if there is a next before proceeding.
|
||||||
|
if (!hasNext()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// if we get to here, then hasNext() has given us an item to return.
|
||||||
|
// we want to return the item and then null out the next pointer, so
|
||||||
|
// we use a temporary variable.
|
||||||
|
Result temp = next;
|
||||||
|
next = null;
|
||||||
|
return temp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void remove() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,378 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
|
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||||
|
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implements the scanner interface for the HBase client.
|
||||||
|
* If there are multiple regions in a table, this scanner will iterate
|
||||||
|
* through them all.
|
||||||
|
*/
|
||||||
|
public class ClientScanner extends AbstractClientScanner {
|
||||||
|
private final Log LOG = LogFactory.getLog(this.getClass());
|
||||||
|
private Scan scan;
|
||||||
|
private boolean closed = false;
|
||||||
|
// Current region scanner is against. Gets cleared if current region goes
|
||||||
|
// wonky: e.g. if it splits on us.
|
||||||
|
private HRegionInfo currentRegion = null;
|
||||||
|
private ScannerCallable callable = null;
|
||||||
|
private final LinkedList<Result> cache = new LinkedList<Result>();
|
||||||
|
private final int caching;
|
||||||
|
private long lastNext;
|
||||||
|
// Keep lastResult returned successfully in case we have to reset scanner.
|
||||||
|
private Result lastResult = null;
|
||||||
|
private ScanMetrics scanMetrics = null;
|
||||||
|
private final long maxScannerResultSize;
|
||||||
|
private final HConnection connection;
|
||||||
|
private final byte[] tableName;
|
||||||
|
private final int scannerTimeout;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new ClientScanner for the specified table. An HConnection will be
|
||||||
|
* retrieved using the passed Configuration.
|
||||||
|
* Note that the passed {@link Scan}'s start row maybe changed changed.
|
||||||
|
*
|
||||||
|
* @param conf The {@link Configuration} to use.
|
||||||
|
* @param scan {@link Scan} to use in this scanner
|
||||||
|
* @param tableName The table that we wish to scan
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public ClientScanner(final Configuration conf, final Scan scan,
|
||||||
|
final byte[] tableName) throws IOException {
|
||||||
|
this(conf, scan, tableName, HConnectionManager.getConnection(conf));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new ClientScanner for the specified table
|
||||||
|
* Note that the passed {@link Scan}'s start row maybe changed changed.
|
||||||
|
*
|
||||||
|
* @param conf The {@link Configuration} to use.
|
||||||
|
* @param scan {@link Scan} to use in this scanner
|
||||||
|
* @param tableName The table that we wish to scan
|
||||||
|
* @param connection Connection identifying the cluster
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public ClientScanner(final Configuration conf, final Scan scan,
|
||||||
|
final byte[] tableName, HConnection connection) throws IOException {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Creating scanner over "
|
||||||
|
+ Bytes.toString(tableName)
|
||||||
|
+ " starting at key '" + Bytes.toStringBinary(scan.getStartRow()) + "'");
|
||||||
|
}
|
||||||
|
this.scan = scan;
|
||||||
|
this.tableName = tableName;
|
||||||
|
this.lastNext = System.currentTimeMillis();
|
||||||
|
this.connection = connection;
|
||||||
|
this.maxScannerResultSize = conf.getLong(
|
||||||
|
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
|
||||||
|
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
|
||||||
|
this.scannerTimeout = (int) conf.getLong(
|
||||||
|
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
|
||||||
|
HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
|
||||||
|
|
||||||
|
// check if application wants to collect scan metrics
|
||||||
|
byte[] enableMetrics = scan.getAttribute(
|
||||||
|
Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
|
||||||
|
if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) {
|
||||||
|
scanMetrics = new ScanMetrics();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use the caching from the Scan. If not set, use the default cache setting for this table.
|
||||||
|
if (this.scan.getCaching() > 0) {
|
||||||
|
this.caching = this.scan.getCaching();
|
||||||
|
} else {
|
||||||
|
this.caching = conf.getInt("hbase.client.scanner.caching", 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// initialize the scanner
|
||||||
|
nextScanner(this.caching, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected HConnection getConnection() {
|
||||||
|
return this.connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected byte[] getTableName() {
|
||||||
|
return this.tableName;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Scan getScan() {
|
||||||
|
return scan;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected long getTimestamp() {
|
||||||
|
return lastNext;
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns true if the passed region endKey
|
||||||
|
private boolean checkScanStopRow(final byte [] endKey) {
|
||||||
|
if (this.scan.getStopRow().length > 0) {
|
||||||
|
// there is a stop row, check to see if we are past it.
|
||||||
|
byte [] stopRow = scan.getStopRow();
|
||||||
|
int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
|
||||||
|
endKey, 0, endKey.length);
|
||||||
|
if (cmp <= 0) {
|
||||||
|
// stopRow <= endKey (endKey is equals to or larger than stopRow)
|
||||||
|
// This is a stop.
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false; //unlikely.
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Gets a scanner for the next region. If this.currentRegion != null, then
|
||||||
|
* we will move to the endrow of this.currentRegion. Else we will get
|
||||||
|
* scanner at the scan.getStartRow(). We will go no further, just tidy
|
||||||
|
* up outstanding scanners, if <code>currentRegion != null</code> and
|
||||||
|
* <code>done</code> is true.
|
||||||
|
* @param nbRows
|
||||||
|
* @param done Server-side says we're done scanning.
|
||||||
|
*/
|
||||||
|
private boolean nextScanner(int nbRows, final boolean done)
|
||||||
|
throws IOException {
|
||||||
|
// Close the previous scanner if it's open
|
||||||
|
if (this.callable != null) {
|
||||||
|
this.callable.setClose();
|
||||||
|
getConnection().getRegionServerWithRetries(callable);
|
||||||
|
this.callable = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Where to start the next scanner
|
||||||
|
byte [] localStartKey;
|
||||||
|
|
||||||
|
// if we're at end of table, close and return false to stop iterating
|
||||||
|
if (this.currentRegion != null) {
|
||||||
|
byte [] endKey = this.currentRegion.getEndKey();
|
||||||
|
if (endKey == null ||
|
||||||
|
Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
|
||||||
|
checkScanStopRow(endKey) ||
|
||||||
|
done) {
|
||||||
|
close();
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Finished with scanning at " + this.currentRegion);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
localStartKey = endKey;
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Finished with region " + this.currentRegion);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
localStartKey = this.scan.getStartRow();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Advancing internal scanner to startKey at '" +
|
||||||
|
Bytes.toStringBinary(localStartKey) + "'");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
callable = getScannerCallable(localStartKey, nbRows);
|
||||||
|
// Open a scanner on the region server starting at the
|
||||||
|
// beginning of the region
|
||||||
|
getConnection().getRegionServerWithRetries(callable);
|
||||||
|
this.currentRegion = callable.getHRegionInfo();
|
||||||
|
if (this.scanMetrics != null) {
|
||||||
|
this.scanMetrics.countOfRegions.inc();
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
close();
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ScannerCallable getScannerCallable(byte [] localStartKey,
|
||||||
|
int nbRows) {
|
||||||
|
scan.setStartRow(localStartKey);
|
||||||
|
ScannerCallable s = new ScannerCallable(getConnection(),
|
||||||
|
getTableName(), scan, this.scanMetrics);
|
||||||
|
s.setCaching(nbRows);
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* publish the scan metrics
|
||||||
|
* For now, we use scan.setAttribute to pass the metrics for application
|
||||||
|
* or TableInputFormat to consume
|
||||||
|
* Later, we could push it to other systems
|
||||||
|
* We don't use metrics framework because it doesn't support
|
||||||
|
* multi instances of the same metrics on the same machine; for scan/map
|
||||||
|
* reduce scenarios, we will have multiple scans running at the same time
|
||||||
|
*/
|
||||||
|
private void writeScanMetrics() throws IOException
|
||||||
|
{
|
||||||
|
// by default, scanMetrics is null
|
||||||
|
// if application wants to collect scanMetrics, it can turn it on by
|
||||||
|
// calling scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE,
|
||||||
|
// Bytes.toBytes(Boolean.TRUE))
|
||||||
|
if (this.scanMetrics == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
final DataOutputBuffer d = new DataOutputBuffer();
|
||||||
|
scanMetrics.write(d);
|
||||||
|
scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, d.getData());
|
||||||
|
}
|
||||||
|
|
||||||
|
public Result next() throws IOException {
|
||||||
|
// If the scanner is closed but there is some rows left in the cache,
|
||||||
|
// it will first empty it before returning null
|
||||||
|
if (cache.size() == 0 && this.closed) {
|
||||||
|
writeScanMetrics();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (cache.size() == 0) {
|
||||||
|
Result [] values = null;
|
||||||
|
long remainingResultSize = maxScannerResultSize;
|
||||||
|
int countdown = this.caching;
|
||||||
|
// We need to reset it if it's a new callable that was created
|
||||||
|
// with a countdown in nextScanner
|
||||||
|
callable.setCaching(this.caching);
|
||||||
|
// This flag is set when we want to skip the result returned. We do
|
||||||
|
// this when we reset scanner because it split under us.
|
||||||
|
boolean skipFirst = false;
|
||||||
|
do {
|
||||||
|
try {
|
||||||
|
if (skipFirst) {
|
||||||
|
// Skip only the first row (which was the last row of the last
|
||||||
|
// already-processed batch).
|
||||||
|
callable.setCaching(1);
|
||||||
|
values = getConnection().getRegionServerWithRetries(callable);
|
||||||
|
callable.setCaching(this.caching);
|
||||||
|
skipFirst = false;
|
||||||
|
}
|
||||||
|
// Server returns a null values if scanning is to stop. Else,
|
||||||
|
// returns an empty array if scanning is to go on and we've just
|
||||||
|
// exhausted current region.
|
||||||
|
values = getConnection().getRegionServerWithRetries(callable);
|
||||||
|
} catch (DoNotRetryIOException e) {
|
||||||
|
if (e instanceof UnknownScannerException) {
|
||||||
|
long timeout = lastNext + scannerTimeout;
|
||||||
|
// If we are over the timeout, throw this exception to the client
|
||||||
|
// Else, it's because the region moved and we used the old id
|
||||||
|
// against the new region server; reset the scanner.
|
||||||
|
if (timeout < System.currentTimeMillis()) {
|
||||||
|
long elapsed = System.currentTimeMillis() - lastNext;
|
||||||
|
ScannerTimeoutException ex = new ScannerTimeoutException(
|
||||||
|
elapsed + "ms passed since the last invocation, " +
|
||||||
|
"timeout is currently set to " + scannerTimeout);
|
||||||
|
ex.initCause(e);
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Throwable cause = e.getCause();
|
||||||
|
if (cause == null || (!(cause instanceof NotServingRegionException)
|
||||||
|
&& !(cause instanceof RegionServerStoppedException))) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Else, its signal from depths of ScannerCallable that we got an
|
||||||
|
// NSRE on a next and that we need to reset the scanner.
|
||||||
|
if (this.lastResult != null) {
|
||||||
|
this.scan.setStartRow(this.lastResult.getRow());
|
||||||
|
// Skip first row returned. We already let it out on previous
|
||||||
|
// invocation.
|
||||||
|
skipFirst = true;
|
||||||
|
}
|
||||||
|
// Clear region
|
||||||
|
this.currentRegion = null;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
long currentTime = System.currentTimeMillis();
|
||||||
|
if (this.scanMetrics != null ) {
|
||||||
|
this.scanMetrics.sumOfMillisSecBetweenNexts.inc(
|
||||||
|
currentTime-lastNext);
|
||||||
|
}
|
||||||
|
lastNext = currentTime;
|
||||||
|
if (values != null && values.length > 0) {
|
||||||
|
for (Result rs : values) {
|
||||||
|
cache.add(rs);
|
||||||
|
for (KeyValue kv : rs.raw()) {
|
||||||
|
remainingResultSize -= kv.heapSize();
|
||||||
|
}
|
||||||
|
countdown--;
|
||||||
|
this.lastResult = rs;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Values == null means server-side filter has determined we must STOP
|
||||||
|
} while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cache.size() > 0) {
|
||||||
|
return cache.poll();
|
||||||
|
}
|
||||||
|
writeScanMetrics();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get <param>nbRows</param> rows.
|
||||||
|
* How many RPCs are made is determined by the {@link Scan#setCaching(int)}
|
||||||
|
* setting (or hbase.client.scanner.caching in hbase-site.xml).
|
||||||
|
* @param nbRows number of rows to return
|
||||||
|
* @return Between zero and <param>nbRows</param> RowResults. Scan is done
|
||||||
|
* if returned array is of zero-length (We never return null).
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Result [] next(int nbRows) throws IOException {
|
||||||
|
// Collect values to be returned here
|
||||||
|
ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
|
||||||
|
for(int i = 0; i < nbRows; i++) {
|
||||||
|
Result next = next();
|
||||||
|
if (next != null) {
|
||||||
|
resultSets.add(next);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return resultSets.toArray(new Result[resultSets.size()]);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() {
|
||||||
|
if (callable != null) {
|
||||||
|
callable.setClose();
|
||||||
|
try {
|
||||||
|
getConnection().getRegionServerWithRetries(callable);
|
||||||
|
} catch (IOException e) {
|
||||||
|
// We used to catch this error, interpret, and rethrow. However, we
|
||||||
|
// have since decided that it's not nice for a scanner's close to
|
||||||
|
// throw exceptions. Chances are it was just an UnknownScanner
|
||||||
|
// exception due to lease time out.
|
||||||
|
}
|
||||||
|
callable = null;
|
||||||
|
}
|
||||||
|
closed = true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,8 +26,6 @@ import java.io.IOException;
|
||||||
import java.lang.reflect.Proxy;
|
import java.lang.reflect.Proxy;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.NavigableMap;
|
import java.util.NavigableMap;
|
||||||
|
@ -42,7 +40,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
@ -50,21 +47,16 @@ import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.HServerAddress;
|
import org.apache.hadoop.hbase.HServerAddress;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
|
||||||
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
|
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
|
||||||
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
|
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
|
||||||
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
|
|
||||||
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
import org.apache.hadoop.hbase.client.coprocessor.Batch;
|
||||||
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
|
||||||
import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
|
import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
|
|
||||||
import org.apache.hadoop.hbase.util.Addressing;
|
import org.apache.hadoop.hbase.util.Addressing;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.Writables;
|
import org.apache.hadoop.hbase.util.Writables;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>Used to communicate with a single HBase table.
|
* <p>Used to communicate with a single HBase table.
|
||||||
|
@ -111,7 +103,6 @@ public class HTable implements HTableInterface, Closeable {
|
||||||
private static final Log LOG = LogFactory.getLog(HTable.class);
|
private static final Log LOG = LogFactory.getLog(HTable.class);
|
||||||
private HConnection connection;
|
private HConnection connection;
|
||||||
private final byte [] tableName;
|
private final byte [] tableName;
|
||||||
protected int scannerTimeout;
|
|
||||||
private volatile Configuration configuration;
|
private volatile Configuration configuration;
|
||||||
private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
|
private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
|
||||||
private long writeBufferSize;
|
private long writeBufferSize;
|
||||||
|
@ -121,7 +112,6 @@ public class HTable implements HTableInterface, Closeable {
|
||||||
protected int scannerCaching;
|
protected int scannerCaching;
|
||||||
private int maxKeyValueSize;
|
private int maxKeyValueSize;
|
||||||
private ExecutorService pool; // For Multi
|
private ExecutorService pool; // For Multi
|
||||||
private long maxScannerResultSize;
|
|
||||||
private boolean closed;
|
private boolean closed;
|
||||||
private int operationTimeout;
|
private int operationTimeout;
|
||||||
private static final int DOPUT_WB_CHECK = 10; // i.e., doPut checks the writebuffer every X Puts.
|
private static final int DOPUT_WB_CHECK = 10; // i.e., doPut checks the writebuffer every X Puts.
|
||||||
|
@ -158,7 +148,6 @@ public class HTable implements HTableInterface, Closeable {
|
||||||
this.tableName = tableName;
|
this.tableName = tableName;
|
||||||
this.cleanupOnClose = true;
|
this.cleanupOnClose = true;
|
||||||
if (conf == null) {
|
if (conf == null) {
|
||||||
this.scannerTimeout = 0;
|
|
||||||
this.connection = null;
|
this.connection = null;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -218,9 +207,6 @@ public class HTable implements HTableInterface, Closeable {
|
||||||
*/
|
*/
|
||||||
private void finishSetup() throws IOException {
|
private void finishSetup() throws IOException {
|
||||||
this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
|
this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
|
||||||
this.scannerTimeout = (int) this.configuration.getLong(
|
|
||||||
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
|
|
||||||
HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
|
|
||||||
this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
|
this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
|
||||||
: this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
|
: this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
|
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
|
||||||
|
@ -232,9 +218,6 @@ public class HTable implements HTableInterface, Closeable {
|
||||||
this.scannerCaching = this.configuration.getInt(
|
this.scannerCaching = this.configuration.getInt(
|
||||||
"hbase.client.scanner.caching", 1);
|
"hbase.client.scanner.caching", 1);
|
||||||
|
|
||||||
this.maxScannerResultSize = this.configuration.getLong(
|
|
||||||
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
|
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
|
|
||||||
this.maxKeyValueSize = this.configuration.getInt(
|
this.maxKeyValueSize = this.configuration.getInt(
|
||||||
"hbase.client.keyvalue.maxsize", -1);
|
"hbase.client.keyvalue.maxsize", -1);
|
||||||
this.closed = false;
|
this.closed = false;
|
||||||
|
@ -352,6 +335,7 @@ public class HTable implements HTableInterface, Closeable {
|
||||||
* Gets the number of rows that a scanner will fetch at once.
|
* Gets the number of rows that a scanner will fetch at once.
|
||||||
* <p>
|
* <p>
|
||||||
* The default value comes from {@code hbase.client.scanner.caching}.
|
* The default value comes from {@code hbase.client.scanner.caching}.
|
||||||
|
* @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
|
||||||
*/
|
*/
|
||||||
public int getScannerCaching() {
|
public int getScannerCaching() {
|
||||||
return scannerCaching;
|
return scannerCaching;
|
||||||
|
@ -366,6 +350,7 @@ public class HTable implements HTableInterface, Closeable {
|
||||||
* {@code next()} is called on a scanner, at the expense of memory use
|
* {@code next()} is called on a scanner, at the expense of memory use
|
||||||
* (since more rows will need to be maintained in memory by the scanners).
|
* (since more rows will need to be maintained in memory by the scanners).
|
||||||
* @param scannerCaching the number of rows a scanner will fetch at once.
|
* @param scannerCaching the number of rows a scanner will fetch at once.
|
||||||
|
* @deprecated Use {@link Scan#setCaching(int)}
|
||||||
*/
|
*/
|
||||||
public void setScannerCaching(int scannerCaching) {
|
public void setScannerCaching(int scannerCaching) {
|
||||||
this.scannerCaching = scannerCaching;
|
this.scannerCaching = scannerCaching;
|
||||||
|
@ -598,9 +583,11 @@ public class HTable implements HTableInterface, Closeable {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public ResultScanner getScanner(final Scan scan) throws IOException {
|
public ResultScanner getScanner(final Scan scan) throws IOException {
|
||||||
ClientScanner s = new ClientScanner(scan);
|
if (scan.getCaching() <= 0) {
|
||||||
s.initialize();
|
scan.setCaching(getScannerCaching());
|
||||||
return s;
|
}
|
||||||
|
return new ClientScanner(getConfiguration(), scan, getTableName(),
|
||||||
|
this.connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1072,355 +1059,6 @@ public class HTable implements HTableInterface, Closeable {
|
||||||
return writeBuffer;
|
return writeBuffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Implements the scanner interface for the HBase client.
|
|
||||||
* If there are multiple regions in a table, this scanner will iterate
|
|
||||||
* through them all.
|
|
||||||
*/
|
|
||||||
protected class ClientScanner implements ResultScanner {
|
|
||||||
private final Log CLIENT_LOG = LogFactory.getLog(this.getClass());
|
|
||||||
// HEADSUP: The scan internal start row can change as we move through table.
|
|
||||||
private Scan scan;
|
|
||||||
private boolean closed = false;
|
|
||||||
// Current region scanner is against. Gets cleared if current region goes
|
|
||||||
// wonky: e.g. if it splits on us.
|
|
||||||
private HRegionInfo currentRegion = null;
|
|
||||||
private ScannerCallable callable = null;
|
|
||||||
private final LinkedList<Result> cache = new LinkedList<Result>();
|
|
||||||
private final int caching;
|
|
||||||
private long lastNext;
|
|
||||||
// Keep lastResult returned successfully in case we have to reset scanner.
|
|
||||||
private Result lastResult = null;
|
|
||||||
private ScanMetrics scanMetrics = null;
|
|
||||||
|
|
||||||
protected ClientScanner(final Scan scan) {
|
|
||||||
if (CLIENT_LOG.isDebugEnabled()) {
|
|
||||||
CLIENT_LOG.debug("Creating scanner over "
|
|
||||||
+ Bytes.toString(getTableName())
|
|
||||||
+ " starting at key '" + Bytes.toStringBinary(scan.getStartRow()) + "'");
|
|
||||||
}
|
|
||||||
this.scan = scan;
|
|
||||||
this.lastNext = System.currentTimeMillis();
|
|
||||||
|
|
||||||
// check if application wants to collect scan metrics
|
|
||||||
byte[] enableMetrics = scan.getAttribute(
|
|
||||||
Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
|
|
||||||
if (enableMetrics != null && Bytes.toBoolean(enableMetrics)) {
|
|
||||||
scanMetrics = new ScanMetrics();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Use the caching from the Scan. If not set, use the default cache setting for this table.
|
|
||||||
if (this.scan.getCaching() > 0) {
|
|
||||||
this.caching = this.scan.getCaching();
|
|
||||||
} else {
|
|
||||||
this.caching = HTable.this.scannerCaching;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Removed filter validation. We have a new format now, only one of all
|
|
||||||
// the current filters has a validate() method. We can add it back,
|
|
||||||
// need to decide on what we're going to do re: filter redesign.
|
|
||||||
// Need, at the least, to break up family from qualifier as separate
|
|
||||||
// checks, I think it's important server-side filters are optimal in that
|
|
||||||
// respect.
|
|
||||||
}
|
|
||||||
|
|
||||||
public void initialize() throws IOException {
|
|
||||||
nextScanner(this.caching, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected Scan getScan() {
|
|
||||||
return scan;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected long getTimestamp() {
|
|
||||||
return lastNext;
|
|
||||||
}
|
|
||||||
|
|
||||||
// returns true if the passed region endKey
|
|
||||||
private boolean checkScanStopRow(final byte [] endKey) {
|
|
||||||
if (this.scan.getStopRow().length > 0) {
|
|
||||||
// there is a stop row, check to see if we are past it.
|
|
||||||
byte [] stopRow = scan.getStopRow();
|
|
||||||
int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
|
|
||||||
endKey, 0, endKey.length);
|
|
||||||
if (cmp <= 0) {
|
|
||||||
// stopRow <= endKey (endKey is equals to or larger than stopRow)
|
|
||||||
// This is a stop.
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false; //unlikely.
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Gets a scanner for the next region. If this.currentRegion != null, then
|
|
||||||
* we will move to the endrow of this.currentRegion. Else we will get
|
|
||||||
* scanner at the scan.getStartRow(). We will go no further, just tidy
|
|
||||||
* up outstanding scanners, if <code>currentRegion != null</code> and
|
|
||||||
* <code>done</code> is true.
|
|
||||||
* @param nbRows
|
|
||||||
* @param done Server-side says we're done scanning.
|
|
||||||
*/
|
|
||||||
private boolean nextScanner(int nbRows, final boolean done)
|
|
||||||
throws IOException {
|
|
||||||
// Close the previous scanner if it's open
|
|
||||||
if (this.callable != null) {
|
|
||||||
this.callable.setClose();
|
|
||||||
getConnection().getRegionServerWithRetries(callable);
|
|
||||||
this.callable = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Where to start the next scanner
|
|
||||||
byte [] localStartKey;
|
|
||||||
|
|
||||||
// if we're at end of table, close and return false to stop iterating
|
|
||||||
if (this.currentRegion != null) {
|
|
||||||
byte [] endKey = this.currentRegion.getEndKey();
|
|
||||||
if (endKey == null ||
|
|
||||||
Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
|
|
||||||
checkScanStopRow(endKey) ||
|
|
||||||
done) {
|
|
||||||
close();
|
|
||||||
if (CLIENT_LOG.isDebugEnabled()) {
|
|
||||||
CLIENT_LOG.debug("Finished with scanning at " + this.currentRegion);
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
localStartKey = endKey;
|
|
||||||
if (CLIENT_LOG.isDebugEnabled()) {
|
|
||||||
CLIENT_LOG.debug("Finished with region " + this.currentRegion);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
localStartKey = this.scan.getStartRow();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (CLIENT_LOG.isDebugEnabled()) {
|
|
||||||
CLIENT_LOG.debug("Advancing internal scanner to startKey at '" +
|
|
||||||
Bytes.toStringBinary(localStartKey) + "'");
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
callable = getScannerCallable(localStartKey, nbRows);
|
|
||||||
// Open a scanner on the region server starting at the
|
|
||||||
// beginning of the region
|
|
||||||
getConnection().getRegionServerWithRetries(callable);
|
|
||||||
this.currentRegion = callable.getHRegionInfo();
|
|
||||||
if (this.scanMetrics != null) {
|
|
||||||
this.scanMetrics.countOfRegions.inc();
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
close();
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected ScannerCallable getScannerCallable(byte [] localStartKey,
|
|
||||||
int nbRows) {
|
|
||||||
scan.setStartRow(localStartKey);
|
|
||||||
ScannerCallable s = new ScannerCallable(getConnection(),
|
|
||||||
getTableName(), scan, this.scanMetrics);
|
|
||||||
s.setCaching(nbRows);
|
|
||||||
return s;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* publish the scan metrics
|
|
||||||
* For now, we use scan.setAttribute to pass the metrics for application
|
|
||||||
* or TableInputFormat to consume
|
|
||||||
* Later, we could push it to other systems
|
|
||||||
* We don't use metrics framework because it doesn't support
|
|
||||||
* multi instances of the same metrics on the same machine; for scan/map
|
|
||||||
* reduce scenarios, we will have multiple scans running at the same time
|
|
||||||
*/
|
|
||||||
private void writeScanMetrics() throws IOException
|
|
||||||
{
|
|
||||||
// by default, scanMetrics is null
|
|
||||||
// if application wants to collect scanMetrics, it can turn it on by
|
|
||||||
// calling scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE,
|
|
||||||
// Bytes.toBytes(Boolean.TRUE))
|
|
||||||
if (this.scanMetrics == null) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
final DataOutputBuffer d = new DataOutputBuffer();
|
|
||||||
scanMetrics.write(d);
|
|
||||||
scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, d.getData());
|
|
||||||
}
|
|
||||||
|
|
||||||
public Result next() throws IOException {
|
|
||||||
// If the scanner is closed but there is some rows left in the cache,
|
|
||||||
// it will first empty it before returning null
|
|
||||||
if (cache.size() == 0 && this.closed) {
|
|
||||||
writeScanMetrics();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
if (cache.size() == 0) {
|
|
||||||
Result [] values = null;
|
|
||||||
long remainingResultSize = maxScannerResultSize;
|
|
||||||
int countdown = this.caching;
|
|
||||||
// We need to reset it if it's a new callable that was created
|
|
||||||
// with a countdown in nextScanner
|
|
||||||
callable.setCaching(this.caching);
|
|
||||||
// This flag is set when we want to skip the result returned. We do
|
|
||||||
// this when we reset scanner because it split under us.
|
|
||||||
boolean skipFirst = false;
|
|
||||||
do {
|
|
||||||
try {
|
|
||||||
if (skipFirst) {
|
|
||||||
// Skip only the first row (which was the last row of the last
|
|
||||||
// already-processed batch).
|
|
||||||
callable.setCaching(1);
|
|
||||||
values = getConnection().getRegionServerWithRetries(callable);
|
|
||||||
callable.setCaching(this.caching);
|
|
||||||
skipFirst = false;
|
|
||||||
}
|
|
||||||
// Server returns a null values if scanning is to stop. Else,
|
|
||||||
// returns an empty array if scanning is to go on and we've just
|
|
||||||
// exhausted current region.
|
|
||||||
values = getConnection().getRegionServerWithRetries(callable);
|
|
||||||
} catch (DoNotRetryIOException e) {
|
|
||||||
if (e instanceof UnknownScannerException) {
|
|
||||||
long timeout = lastNext + scannerTimeout;
|
|
||||||
// If we are over the timeout, throw this exception to the client
|
|
||||||
// Else, it's because the region moved and we used the old id
|
|
||||||
// against the new region server; reset the scanner.
|
|
||||||
if (timeout < System.currentTimeMillis()) {
|
|
||||||
long elapsed = System.currentTimeMillis() - lastNext;
|
|
||||||
ScannerTimeoutException ex = new ScannerTimeoutException(
|
|
||||||
elapsed + "ms passed since the last invocation, " +
|
|
||||||
"timeout is currently set to " + scannerTimeout);
|
|
||||||
ex.initCause(e);
|
|
||||||
throw ex;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Throwable cause = e.getCause();
|
|
||||||
if (cause == null || (!(cause instanceof NotServingRegionException)
|
|
||||||
&& !(cause instanceof RegionServerStoppedException))) {
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Else, its signal from depths of ScannerCallable that we got an
|
|
||||||
// NSRE on a next and that we need to reset the scanner.
|
|
||||||
if (this.lastResult != null) {
|
|
||||||
this.scan.setStartRow(this.lastResult.getRow());
|
|
||||||
// Skip first row returned. We already let it out on previous
|
|
||||||
// invocation.
|
|
||||||
skipFirst = true;
|
|
||||||
}
|
|
||||||
// Clear region
|
|
||||||
this.currentRegion = null;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
long currentTime = System.currentTimeMillis();
|
|
||||||
if (this.scanMetrics != null ) {
|
|
||||||
this.scanMetrics.sumOfMillisSecBetweenNexts.inc(
|
|
||||||
currentTime-lastNext);
|
|
||||||
}
|
|
||||||
lastNext = currentTime;
|
|
||||||
if (values != null && values.length > 0) {
|
|
||||||
for (Result rs : values) {
|
|
||||||
cache.add(rs);
|
|
||||||
for (KeyValue kv : rs.raw()) {
|
|
||||||
remainingResultSize -= kv.heapSize();
|
|
||||||
}
|
|
||||||
countdown--;
|
|
||||||
this.lastResult = rs;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Values == null means server-side filter has determined we must STOP
|
|
||||||
} while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (cache.size() > 0) {
|
|
||||||
return cache.poll();
|
|
||||||
}
|
|
||||||
writeScanMetrics();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get <param>nbRows</param> rows.
|
|
||||||
* How many RPCs are made is determined by the {@link Scan#setCaching(int)}
|
|
||||||
* setting (or hbase.client.scanner.caching in hbase-site.xml).
|
|
||||||
* @param nbRows number of rows to return
|
|
||||||
* @return Between zero and <param>nbRows</param> RowResults. Scan is done
|
|
||||||
* if returned array is of zero-length (We never return null).
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
public Result [] next(int nbRows) throws IOException {
|
|
||||||
// Collect values to be returned here
|
|
||||||
ArrayList<Result> resultSets = new ArrayList<Result>(nbRows);
|
|
||||||
for(int i = 0; i < nbRows; i++) {
|
|
||||||
Result next = next();
|
|
||||||
if (next != null) {
|
|
||||||
resultSets.add(next);
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return resultSets.toArray(new Result[resultSets.size()]);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void close() {
|
|
||||||
if (callable != null) {
|
|
||||||
callable.setClose();
|
|
||||||
try {
|
|
||||||
getConnection().getRegionServerWithRetries(callable);
|
|
||||||
} catch (IOException e) {
|
|
||||||
// We used to catch this error, interpret, and rethrow. However, we
|
|
||||||
// have since decided that it's not nice for a scanner's close to
|
|
||||||
// throw exceptions. Chances are it was just an UnknownScanner
|
|
||||||
// exception due to lease time out.
|
|
||||||
}
|
|
||||||
callable = null;
|
|
||||||
}
|
|
||||||
closed = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Iterator<Result> iterator() {
|
|
||||||
return new Iterator<Result>() {
|
|
||||||
// The next RowResult, possibly pre-read
|
|
||||||
Result next = null;
|
|
||||||
|
|
||||||
// return true if there is another item pending, false if there isn't.
|
|
||||||
// this method is where the actual advancing takes place, but you need
|
|
||||||
// to call next() to consume it. hasNext() will only advance if there
|
|
||||||
// isn't a pending next().
|
|
||||||
public boolean hasNext() {
|
|
||||||
if (next == null) {
|
|
||||||
try {
|
|
||||||
next = ClientScanner.this.next();
|
|
||||||
return next != null;
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// get the pending next item and advance the iterator. returns null if
|
|
||||||
// there is no next item.
|
|
||||||
public Result next() {
|
|
||||||
// since hasNext() does the real advancing, we call this to determine
|
|
||||||
// if there is a next before proceeding.
|
|
||||||
if (!hasNext()) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
// if we get to here, then hasNext() has given us an item to return.
|
|
||||||
// we want to return the item and then null out the next pointer, so
|
|
||||||
// we use a temporary variable.
|
|
||||||
Result temp = next;
|
|
||||||
next = null;
|
|
||||||
return temp;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void remove() {
|
|
||||||
throw new UnsupportedOperationException();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The pool is used for mutli requests for this HTable
|
* The pool is used for mutli requests for this HTable
|
||||||
* @return the pool used for mutli
|
* @return the pool used for mutli
|
||||||
|
|
|
@ -148,6 +148,8 @@ public interface HTableInterface {
|
||||||
/**
|
/**
|
||||||
* Returns a scanner on the current table as specified by the {@link Scan}
|
* Returns a scanner on the current table as specified by the {@link Scan}
|
||||||
* object.
|
* object.
|
||||||
|
* Note that the passed {@link Scan}'s start row and caching properties
|
||||||
|
* maybe changed.
|
||||||
*
|
*
|
||||||
* @param scan A configured {@link Scan} object.
|
* @param scan A configured {@link Scan} object.
|
||||||
* @return A scanner.
|
* @return A scanner.
|
||||||
|
|
Loading…
Reference in New Issue