HBASE-887 Fix a hotspot in scanners

M    conf/hbase-default.xml
Added a new config hbase.client.scanner.caching, fixed triplification
M    src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Implements new next(long, int) method for batching
M    src/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
Defines that new next(long, int) method
M    src/java/org/apache/hadoop/hbase/client/HTable.java
Adds new property that is passed to scanners. ClientScanner now handles caching of rows
M    src/java/org/apache/hadoop/hbase/client/MetaScanner.java
Small changes related to ScannerCallable
M    src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
Same sort of small changes
M    src/java/org/apache/hadoop/hbase/client/ScannerCallable.java
Now modified because of new methods in HRS, will be able to handle caching
M    src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
Passes caching to its super class


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@698198 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2008-09-23 14:51:22 +00:00
parent 0b96e8d5ec
commit d65e0b2bd6
9 changed files with 427 additions and 356 deletions

View File

@ -21,6 +21,7 @@ Release 0.19.0 - Unreleased
NEW FEATURES
OPTIMIZATIONS
HBASE-887 Fix a hotspot in scanners
Release 0.18.0 - September 21st, 2008

View File

@ -1,313 +1,304 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<configuration>
<property>
<name>hbase.master</name>
<value>local</value>
<description>The host and port that the HBase master runs at.
A value of 'local' runs the master and a regionserver in
a single process.
</description>
</property>
<property>
<name>hbase.rootdir</name>
<value>file:///tmp/hbase-${user.name}/hbase</value>
<description>The directory shared by region servers.
Should be fully-qualified to include the filesystem to use.
E.g: hdfs://NAMENODE_SERVER:PORT/HBASE_ROOTDIR
</description>
</property>
<property>
<name>hbase.master.info.port</name>
<value>60010</value>
<description>The port for the hbase master web UI
Set to -1 if you do not want the info server to run.
</description>
</property>
<property>
<name>hbase.master.info.bindAddress</name>
<value>0.0.0.0</value>
<description>The address for the hbase master web UI
</description>
</property>
<property>
<name>hbase.regionserver</name>
<value>0.0.0.0:60020</value>
<description>The host and port a HBase region server runs at.
</description>
</property>
<property>
<name>hbase.regionserver.dns.interface</name>
<value>default</value>
<description>Name of the network interface which a regionserver
should use to determine it's "real" IP address. This lookup
prevents strings like "localhost" and "127.0.0.1" from being
reported back to the master.
</description>
</property>
<property>
<name>hbase.regionserver.dns.interface</name>
<value>default</value>
<description>Name of the network interface which a regionserver
should use to determine it's "real" IP address. This lookup
prevents strings like "localhost" and "127.0.0.1" from being
reported back to the master.
</description>
</property>
<property>
<name>hbase.regionserver.dns.interface</name>
<value>default</value>
<description>Name of the network interface which a regionserver
should use to determine it's "real" IP address. This lookup
prevents strings like "localhost" and "127.0.0.1" from being
reported back to the master.
</description>
</property>
<property>
<name>hbase.regionserver.info.port</name>
<value>60030</value>
<description>The port for the hbase regionserver web UI
Set to -1 if you do not want the info server to run.
</description>
</property>
<property>
<name>hbase.regionserver.info.bindAddress</name>
<value>0.0.0.0</value>
<description>The address for the hbase regionserver web UI
</description>
</property>
<property>
<name>hbase.regionserver.class</name>
<value>org.apache.hadoop.hbase.ipc.HRegionInterface</value>
<description>An interface that is assignable to HRegionInterface. Used in HClient for
opening proxy to remote region server.
</description>
</property>
<property>
<name>hbase.client.pause</name>
<value>2000</value>
<description>General client pause value. Used mostly as value to wait
before running a retry of a failed get, region lookup, etc.</description>
</property>
<property>
<name>hbase.client.retries.number</name>
<value>10</value>
<description>Maximum retries. Used as maximum for all retryable
operations such as fetching of the root region from root region
server, getting a cell's value, starting a row update, etc.
Default: 10.
</description>
</property>
<property>
<name>hbase.master.meta.thread.rescanfrequency</name>
<value>60000</value>
<description>How long the HMaster sleeps (in milliseconds) between scans of
the root and meta tables.
</description>
</property>
<property>
<name>hbase.master.lease.period</name>
<value>120000</value>
<description>HMaster server lease period in milliseconds. Default is
60 seconds. Region servers must report in within this period else
they are considered dead. On loaded cluster, may need to up this
period.</description>
</property>
<property>
<name>hbase.hbasemaster.maxregionopen</name>
<value>120000</value>
<description>Period to wait for a region open. If regionserver
takes longer than this interval, assign to a new regionserver.
</description>
</property>
<property>
<name>hbase.regionserver.lease.period</name>
<value>60000</value>
<description>HRegion server lease period in milliseconds. Default is
60 seconds. Clients must report in within this period else they are
considered dead.</description>
</property>
<property>
<name>hbase.server.thread.wakefrequency</name>
<value>10000</value>
<description>Time to sleep in between searches for work (in milliseconds).
Used as sleep interval by service threads such as META scanner and log roller.
</description>
</property>
<property>
<name>hbase.regionserver.handler.count</name>
<value>10</value>
<description>Count of RPC Server instances spun up on RegionServers
Same property is used by the HMaster for count of master handlers.
Default is 10.
</description>
</property>
<property>
<name>hbase.regionserver.msginterval</name>
<value>3000</value>
<description>Interval between messages from the RegionServer to HMaster
in milliseconds. Default is 15. Set this value low if you want unit
tests to be responsive.
</description>
</property>
<property>
<name>hbase.regionserver.maxlogentries</name>
<value>30000</value>
<description>Rotate the HRegion HLogs when count of entries exceeds this
value. Default: 30,000. Value is checked by a thread that runs every
hbase.server.thread.wakefrequency.
</description>
</property>
<property>
<name>hbase.regionserver.optionalcacheflushinterval</name>
<value>1800000</value>
<description>
Amount of time to wait since the last time a region was flushed before
invoking an optional cache flush (An optional cache flush is a
flush even though memcache is not at the memcache.flush.size).
Default: 30 minutes (in miliseconds)
</description>
</property>
<property>
<name>hbase.regionserver.optionallogrollinterval</name>
<value>1800000</value>
<description>
Amount of time to wait since the last time a the region server's log was
rolled before invoking an optional log roll (An optional log roll is a
one in which the log does not contain hbase.regionserver.maxlogentries).
Default: 30 minutes (in miliseconds)
</description>
</property>
<property>
<name>hbase.hregion.memcache.flush.size</name>
<value>67108864</value>
<description>
A HRegion memcache will be flushed to disk if size of the memcache
exceeds this number of bytes. Value is checked by a thread that runs
every hbase.server.thread.wakefrequency.
</description>
</property>
<property>
<name>hbase.hregion.memcache.block.multiplier</name>
<value>2</value>
<description>
Block updates if memcache has hbase.hregion.block.memcache
time hbase.hregion.flush.size bytes. Useful preventing
runaway memcache during spikes in update traffic. Without an
upper-bound, memcache fills such that when it flushes the
resultant flush files take a long time to compact or split, or
worse, we OOME.
</description>
</property>
<property>
<name>hbase.hregion.max.filesize</name>
<value>268435456</value>
<description>
Maximum HStoreFile size. If any one of a column families' HStoreFiles has
grown to exceed this value, the hosting HRegion is split in two.
Default: 256M.
</description>
</property>
<property>
<name>hbase.hstore.compactionThreshold</name>
<value>3</value>
<description>
If more than this number of HStoreFiles in any one HStore
(one HStoreFile is written per flush of memcache) then a compaction
is run to rewrite all HStoreFiles files as one. Larger numbers
put off compaction but when it runs, it takes longer to complete.
During a compaction, updates cannot be flushed to disk. Long
compactions require memory sufficient to carry the logging of
all updates across the duration of the compaction.
If too large, clients timeout during compaction.
</description>
</property>
<property>
<name>hbase.regionserver.thread.splitcompactcheckfrequency</name>
<value>20000</value>
<description>How often a region server runs the split/compaction check.
</description>
</property>
<property>
<name>hbase.hstore.compaction.max</name>
<value>10</value>
<description>Max number of HStoreFiles to compact per 'minor' compaction.
</description>
</property>
<property>
<name>hbase.hregion.majorcompaction</name>
<value>86400000</value>
<description>The time (in miliseconds) between 'major' compactions of all
HStoreFiles in a region. Default: 1 day.
</description>
</property>
<property>
<name>hbase.regionserver.nbreservationblocks</name>
<value>4</value>
<description>The number of reservation blocks which are used to prevent
unstable region servers caused by an OOME.
</description>
</property>
<property>
<name>hbase.io.index.interval</name>
<value>32</value>
<description>The interval at which we record offsets in hbase
store files/mapfiles. Default for stock mapfiles is 128. Index
files are read into memory. If there are many of them, could prove
a burden. If so play with the hadoop io.map.index.skip property and
skip every nth index member when reading back the index into memory.
</description>
</property>
<property>
<name>hbase.io.seqfile.compression.type</name>
<value>NONE</value>
<description>The compression type for hbase sequencefile.Writers
such as hlog.
</description>
</property>
<property>
<name>hbase.hstore.blockCache.blockSize</name>
<value>65536</value>
<description>The size of each block in any block caches.
</description>
</property>
<property>
<name>hbase.regionserver.globalMemcacheLimit</name>
<value>536870912</value>
<description>Maximum size of all memcaches in a region server before new
updates are blocked and flushes are forced. Defaults to 512MB.
</description>
</property>
<property>
<name>hbase.regionserver.globalMemcacheLimitlowMark</name>
<value>256435456</value>
<description>When memcaches are being forced to flush to make room in
memory, keep flushing until we hit this mark. Defaults to 256MB. Setting
this value equal to hbase.regionserver.globalmemcachelimit causes the
minimum possible flushing to occur when updates are blocked due to
memcache limiting.
</description>
</property>
</configuration>
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-->
<configuration>
<property>
<name>hbase.master</name>
<value>local</value>
<description>The host and port that the HBase master runs at.
A value of 'local' runs the master and a regionserver in
a single process.
</description>
</property>
<property>
<name>hbase.rootdir</name>
<value>file:///tmp/hbase-${user.name}/hbase</value>
<description>The directory shared by region servers.
Should be fully-qualified to include the filesystem to use.
E.g: hdfs://NAMENODE_SERVER:PORT/HBASE_ROOTDIR
</description>
</property>
<property>
<name>hbase.master.info.port</name>
<value>60010</value>
<description>The port for the hbase master web UI
Set to -1 if you do not want the info server to run.
</description>
</property>
<property>
<name>hbase.master.info.bindAddress</name>
<value>0.0.0.0</value>
<description>The address for the hbase master web UI
</description>
</property>
<property>
<name>hbase.regionserver</name>
<value>0.0.0.0:60020</value>
<description>The host and port a HBase region server runs at.
</description>
</property>
<property>
<name>hbase.regionserver.dns.interface</name>
<value>default</value>
<description>Name of the network interface which a regionserver
should use to determine it's "real" IP address. This lookup
prevents strings like "localhost" and "127.0.0.1" from being
reported back to the master.
</description>
</property>
<property>
<name>hbase.regionserver.info.port</name>
<value>60030</value>
<description>The port for the hbase regionserver web UI
Set to -1 if you do not want the info server to run.
</description>
</property>
<property>
<name>hbase.regionserver.info.bindAddress</name>
<value>0.0.0.0</value>
<description>The address for the hbase regionserver web UI
</description>
</property>
<property>
<name>hbase.regionserver.class</name>
<value>org.apache.hadoop.hbase.ipc.HRegionInterface</value>
<description>An interface that is assignable to HRegionInterface. Used in HClient for
opening proxy to remote region server.
</description>
</property>
<property>
<name>hbase.client.pause</name>
<value>2000</value>
<description>General client pause value. Used mostly as value to wait
before running a retry of a failed get, region lookup, etc.</description>
</property>
<property>
<name>hbase.client.retries.number</name>
<value>10</value>
<description>Maximum retries. Used as maximum for all retryable
operations such as fetching of the root region from root region
server, getting a cell's value, starting a row update, etc.
Default: 10.
</description>
</property>
<property>
<name>hbase.client.scanner.caching</name>
<value>30</value>
<description>Number of rows that will be fetched when calling next
on a scanner if it is not served from memory. Higher caching values
will enable faster scanners but will eat up more memory and some
calls of next may take longer and longer times when the cache is empty.
</description>
</property>
<property>
<name>hbase.master.meta.thread.rescanfrequency</name>
<value>60000</value>
<description>How long the HMaster sleeps (in milliseconds) between scans of
the root and meta tables.
</description>
</property>
<property>
<name>hbase.master.lease.period</name>
<value>120000</value>
<description>HMaster server lease period in milliseconds. Default is
60 seconds. Region servers must report in within this period else
they are considered dead. On loaded cluster, may need to up this
period.</description>
</property>
<property>
<name>hbase.hbasemaster.maxregionopen</name>
<value>120000</value>
<description>Period to wait for a region open. If regionserver
takes longer than this interval, assign to a new regionserver.
</description>
</property>
<property>
<name>hbase.regionserver.lease.period</name>
<value>60000</value>
<description>HRegion server lease period in milliseconds. Default is
60 seconds. Clients must report in within this period else they are
considered dead.</description>
</property>
<property>
<name>hbase.server.thread.wakefrequency</name>
<value>10000</value>
<description>Time to sleep in between searches for work (in milliseconds).
Used as sleep interval by service threads such as META scanner and log roller.
</description>
</property>
<property>
<name>hbase.regionserver.handler.count</name>
<value>10</value>
<description>Count of RPC Server instances spun up on RegionServers
Same property is used by the HMaster for count of master handlers.
Default is 10.
</description>
</property>
<property>
<name>hbase.regionserver.msginterval</name>
<value>3000</value>
<description>Interval between messages from the RegionServer to HMaster
in milliseconds. Default is 15. Set this value low if you want unit
tests to be responsive.
</description>
</property>
<property>
<name>hbase.regionserver.maxlogentries</name>
<value>30000</value>
<description>Rotate the HRegion HLogs when count of entries exceeds this
value. Default: 30,000. Value is checked by a thread that runs every
hbase.server.thread.wakefrequency.
</description>
</property>
<property>
<name>hbase.regionserver.optionalcacheflushinterval</name>
<value>1800000</value>
<description>
Amount of time to wait since the last time a region was flushed before
invoking an optional cache flush (An optional cache flush is a
flush even though memcache is not at the memcache.flush.size).
Default: 30 minutes (in miliseconds)
</description>
</property>
<property>
<name>hbase.regionserver.optionallogrollinterval</name>
<value>1800000</value>
<description>
Amount of time to wait since the last time a the region server's log was
rolled before invoking an optional log roll (An optional log roll is a
one in which the log does not contain hbase.regionserver.maxlogentries).
Default: 30 minutes (in miliseconds)
</description>
</property>
<property>
<name>hbase.hregion.memcache.flush.size</name>
<value>67108864</value>
<description>
A HRegion memcache will be flushed to disk if size of the memcache
exceeds this number of bytes. Value is checked by a thread that runs
every hbase.server.thread.wakefrequency.
</description>
</property>
<property>
<name>hbase.hregion.memcache.block.multiplier</name>
<value>2</value>
<description>
Block updates if memcache has hbase.hregion.block.memcache
time hbase.hregion.flush.size bytes. Useful preventing
runaway memcache during spikes in update traffic. Without an
upper-bound, memcache fills such that when it flushes the
resultant flush files take a long time to compact or split, or
worse, we OOME.
</description>
</property>
<property>
<name>hbase.hregion.max.filesize</name>
<value>268435456</value>
<description>
Maximum HStoreFile size. If any one of a column families' HStoreFiles has
grown to exceed this value, the hosting HRegion is split in two.
Default: 256M.
</description>
</property>
<property>
<name>hbase.hstore.compactionThreshold</name>
<value>3</value>
<description>
If more than this number of HStoreFiles in any one HStore
(one HStoreFile is written per flush of memcache) then a compaction
is run to rewrite all HStoreFiles files as one. Larger numbers
put off compaction but when it runs, it takes longer to complete.
During a compaction, updates cannot be flushed to disk. Long
compactions require memory sufficient to carry the logging of
all updates across the duration of the compaction.
If too large, clients timeout during compaction.
</description>
</property>
<property>
<name>hbase.regionserver.thread.splitcompactcheckfrequency</name>
<value>20000</value>
<description>How often a region server runs the split/compaction check.
</description>
</property>
<property>
<name>hbase.hstore.compaction.max</name>
<value>10</value>
<description>Max number of HStoreFiles to compact per 'minor' compaction.
</description>
</property>
<property>
<name>hbase.hregion.majorcompaction</name>
<value>86400000</value>
<description>The time (in miliseconds) between 'major' compactions of all
HStoreFiles in a region. Default: 1 day.
</description>
</property>
<property>
<name>hbase.regionserver.nbreservationblocks</name>
<value>4</value>
<description>The number of reservation blocks which are used to prevent
unstable region servers caused by an OOME.
</description>
</property>
<property>
<name>hbase.io.index.interval</name>
<value>32</value>
<description>The interval at which we record offsets in hbase
store files/mapfiles. Default for stock mapfiles is 128. Index
files are read into memory. If there are many of them, could prove
a burden. If so play with the hadoop io.map.index.skip property and
skip every nth index member when reading back the index into memory.
</description>
</property>
<property>
<name>hbase.io.seqfile.compression.type</name>
<value>NONE</value>
<description>The compression type for hbase sequencefile.Writers
such as hlog.
</description>
</property>
<property>
<name>hbase.hstore.blockCache.blockSize</name>
<value>65536</value>
<description>The size of each block in any block caches.
</description>
</property>
<property>
<name>hbase.regionserver.globalMemcacheLimit</name>
<value>536870912</value>
<description>Maximum size of all memcaches in a region server before new
updates are blocked and flushes are forced. Defaults to 512MB.
</description>
</property>
<property>
<name>hbase.regionserver.globalMemcacheLimitlowMark</name>
<value>256435456</value>
<description>When memcaches are being forced to flush to make room in
memory, keep flushing until we hit this mark. Defaults to 256MB. Setting
this value equal to hbase.regionserver.globalmemcachelimit causes the
minimum possible flushing to occur when updates are blocked due to
memcache limiting.
</description>
</property>
</configuration>

View File

@ -337,7 +337,9 @@ public class HConnectionManager implements HConstants {
currentRegion = s.getHRegionInfo();
try {
RowResult r = null;
while (result && (r = getRegionServerWithRetries(s)) != null) {
RowResult[] rrs = null;
while (result && (rrs = getRegionServerWithRetries(s)) != null) {
r = rrs[0];
Cell c = r.get(HConstants.COL_REGIONINFO);
if (c != null) {
byte[] value = c.getValue();

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@ -51,6 +52,7 @@ public class HTable {
private final HConnection connection;
private final byte [] tableName;
private HBaseConfiguration configuration;
private int scannerCaching;
/**
* Creates an object to access a HBase table
@ -99,6 +101,7 @@ public class HTable {
this.configuration = conf;
this.tableName = tableName;
this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 30);
}
/**
@ -175,6 +178,22 @@ public class HTable {
public HConnection getConnection() {
return this.connection;
}
/**
* Get the number of rows for caching that will be passed to scanners
* @return the number of rows for caching
*/
public int getScannerCaching() {
return scannerCaching;
}
/**
* Set the number of rows for caching that will be passed to scanners
* @param scannerCaching the number of rows for caching
*/
public void setScannerCaching(int scannerCaching) {
this.scannerCaching = scannerCaching;
}
/**
* @return table metadata
@ -726,7 +745,8 @@ public class HTable {
public Scanner getScanner(final byte [][] columns,
final byte [] startRow, long timestamp, RowFilterInterface filter)
throws IOException {
ClientScanner s = new ClientScanner(columns, startRow, timestamp, filter);
ClientScanner s = new ClientScanner(columns, startRow,
timestamp, filter);
s.initialize();
return s;
}
@ -1040,12 +1060,15 @@ public class HTable {
private HRegionInfo currentRegion = null;
private ScannerCallable callable = null;
protected RowFilterInterface filter;
final private LinkedList<RowResult> cache = new LinkedList<RowResult>();
final private int scannerCaching = HTable.this.scannerCaching;
protected ClientScanner(final byte[][] columns, final byte [] startRow,
final long timestamp, final RowFilterInterface filter) {
if (CLIENT_LOG.isDebugEnabled()) {
CLIENT_LOG.debug("Creating scanner over " + Bytes.toString(getTableName()) +
" starting at key '" + Bytes.toString(startRow) + "'");
CLIENT_LOG.debug("Creating scanner over "
+ Bytes.toString(getTableName())
+ " starting at key '" + Bytes.toString(startRow) + "'");
}
// save off the simple parameters
this.columns = columns;
@ -1063,7 +1086,7 @@ public class HTable {
//TODO: change visibility to protected
public void initialize() throws IOException {
nextScanner();
nextScanner(this.scannerCaching);
}
protected byte[][] getColumns() {
@ -1082,7 +1105,7 @@ public class HTable {
* Gets a scanner for the next region.
* Returns false if there are no more scanners.
*/
private boolean nextScanner() throws IOException {
private boolean nextScanner(int nbRows) throws IOException {
// Close the previous scanner if it's open
if (this.callable != null) {
this.callable.setClose();
@ -1115,7 +1138,7 @@ public class HTable {
}
try {
callable = getScannerCallable(localStartKey);
callable = getScannerCallable(localStartKey, nbRows);
// open a scanner on the region server starting at the
// beginning of the region
getConnection().getRegionServerWithRetries(callable);
@ -1127,9 +1150,13 @@ public class HTable {
return true;
}
protected ScannerCallable getScannerCallable(byte [] localStartKey) {
return new ScannerCallable(getConnection(), getTableName(), columns,
protected ScannerCallable getScannerCallable(byte [] localStartKey,
int nbRows) {
ScannerCallable s = new ScannerCallable(getConnection(),
getTableName(), columns,
localStartKey, scanTime, filter);
s.setCaching(nbRows);
return s;
}
/**
@ -1147,16 +1174,31 @@ public class HTable {
}
public RowResult next() throws IOException {
if (this.closed) {
// If the scanner is closed but there is some rows left in the cache,
// it will first empty it before returning null
if (cache.size() == 0 && this.closed) {
return null;
}
RowResult values = null;
do {
values = getConnection().getRegionServerWithRetries(callable);
} while ((values == null || values.size() == 0) && nextScanner());
if (cache.size() == 0) {
RowResult[] values = null;
int countdown = this.scannerCaching;
// We need to reset it if it's a new callable that was created
// with a countdown in nextScanner
callable.setCaching(this.scannerCaching);
do {
values = getConnection().getRegionServerWithRetries(callable);
if (values != null && values.length > 0) {
for (RowResult rs : values) {
cache.add(rs);
countdown--;
}
}
if (values != null && values.size() != 0) {
return values;
} while (countdown > 0 && nextScanner(countdown));
}
if (cache.size() > 0) {
return cache.poll();
}
return null;
}

View File

@ -57,10 +57,12 @@ class MetaScanner implements HConstants {
try {
RowResult r = null;
do {
r = connection.getRegionServerWithRetries(callable);
if (r == null || r.size() == 0) {
RowResult[] rrs = connection.getRegionServerWithRetries(callable);
if (rrs == null || rrs.length == 0 || rrs[0].size() == 0) {
break;
}
r = rrs[0];
} while(visitor.processRow(r));
// Advance the startRow to the end key of the current region
startRow = callable.getHRegionInfo().getEndKey();

View File

@ -31,13 +31,14 @@ import org.apache.hadoop.hbase.io.RowResult;
* Retries scanner operations such as create, next, etc.
* Used by {@link Scanner}s made by {@link HTable}.
*/
public class ScannerCallable extends ServerCallable<RowResult> {
public class ScannerCallable extends ServerCallable<RowResult[]> {
private long scannerId = -1L;
private boolean instantiated = false;
private boolean closed = false;
private final byte [][] columns;
private final long timestamp;
private final RowFilterInterface filter;
private int caching = 1;
/**
* @param connection
@ -67,7 +68,7 @@ public class ScannerCallable extends ServerCallable<RowResult> {
}
}
public RowResult call() throws IOException {
public RowResult[] call() throws IOException {
if (scannerId != -1L && closed) {
server.close(scannerId);
scannerId = -1L;
@ -75,7 +76,8 @@ public class ScannerCallable extends ServerCallable<RowResult> {
// open the scanner
scannerId = openScanner();
} else {
return server.next(scannerId);
RowResult[] rrs = server.next(scannerId, caching);
return rrs.length == 0 ? null : rrs;
}
return null;
}
@ -114,4 +116,20 @@ public class ScannerCallable extends ServerCallable<RowResult> {
}
return location.getRegionInfo();
}
/**
* Get the number of rows that will be fetched on next
* @return the number of rows for caching
*/
public int getCaching() {
return caching;
}
/**
* Set the number of rows that will be fetched on next
* @param caching the number of rows for caching
*/
public void setCaching(int caching) {
this.caching = caching;
}
}

View File

@ -414,10 +414,14 @@ public class TransactionalTable extends HTable {
}
@Override
protected ScannerCallable getScannerCallable(final byte[] localStartKey) {
return new TransactionScannerCallable(transactionState, getConnection(),
protected ScannerCallable getScannerCallable(
final byte[] localStartKey, int caching) {
TransactionScannerCallable t =
new TransactionScannerCallable(transactionState, getConnection(),
getTableName(), getColumns(), localStartKey, getTimestamp(),
getFilter());
t.setCaching(caching);
return t;
}
}

View File

@ -36,9 +36,9 @@ import org.apache.hadoop.hbase.NotServingRegionException;
public interface HRegionInterface extends VersionedProtocol {
/**
* Protocol version.
* Upped to 4 when we removed overloaded methods from the protocol.
* Upped to 5 when we added scanner caching
*/
public static final long versionID = 4L;
public static final long versionID = 5L;
/**
* Get metainfo about an HRegion
@ -174,7 +174,7 @@ public interface HRegionInterface extends VersionedProtocol {
public long openScanner(final byte [] regionName, final byte [][] columns,
final byte [] startRow, long timestamp, RowFilterInterface filter)
throws IOException;
/**
* Get the next set of values
* @param scannerId clientId passed to openScanner
@ -183,6 +183,15 @@ public interface HRegionInterface extends VersionedProtocol {
*/
public RowResult next(long scannerId) throws IOException;
/**
* Get the next set of values
* @param scannerId clientId passed to openScanner
* @param numberOfRows the number of rows to fetch
* @return map of values
* @throws IOException
*/
public RowResult[] next(long scannerId, int numberOfRows) throws IOException;
/**
* Close a scanner
*

View File

@ -1062,10 +1062,16 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
throw e;
}
}
public RowResult next(final long scannerId) throws IOException {
RowResult[] rrs = next(scannerId, 1);
return rrs.length == 0 ? null : rrs[0];
}
public RowResult[] next(final long scannerId, int nbRows) throws IOException {
checkOpen();
requestCount.incrementAndGet();
ArrayList<RowResult> resultSets = new ArrayList<RowResult>();
try {
String scannerName = String.valueOf(scannerId);
InternalScanner s = scanners.get(scannerName);
@ -1073,24 +1079,20 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
throw new UnknownScannerException("Name: " + scannerName);
}
this.leases.renewLease(scannerName);
// Collect values to be returned here
HbaseMapWritable<byte [], Cell> values
= new HbaseMapWritable<byte [], Cell>();
HStoreKey key = new HStoreKey();
TreeMap<byte [], Cell> results =
new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR);
while (s.next(key, results)) {
values.putAll(results);
if (values.size() > 0) {
// Row has something in it. Return the value.
break;
for(int i = 0; i < nbRows; i++) {
// Collect values to be returned here
HbaseMapWritable<byte [], Cell> values
= new HbaseMapWritable<byte [], Cell>();
HStoreKey key = new HStoreKey();
while (s.next(key, values)) {
if (values.size() > 0) {
// Row has something in it. Return the value.
resultSets.add(new RowResult(key.getRow(), values));
break;
}
}
// No data for this row, go get another.
results.clear();
}
return values.size() == 0 ? null : new RowResult(key.getRow(), values);
return resultSets.toArray(new RowResult[resultSets.size()]);
} catch (IOException e) {
checkFileSystem();
throw e;