HBASE-2468 Improvements to prewarm META cache on client
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@954268 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e019542660
commit
d92c49629d
|
@ -691,6 +691,8 @@ Release 0.21.0 - Unreleased
|
||||||
(Alex Newman via Stack)
|
(Alex Newman via Stack)
|
||||||
HBASE-2718 Update .gitignore for trunk after removal of contribs
|
HBASE-2718 Update .gitignore for trunk after removal of contribs
|
||||||
(Lars Francke via Stack)
|
(Lars Francke via Stack)
|
||||||
|
HBASE-2468 Improvements to prewarm META cache on clients
|
||||||
|
(Mingjie Lai via Stack)
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
HBASE-1961 HBase EC2 scripts
|
HBASE-1961 HBase EC2 scripts
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
import org.apache.hadoop.hbase.HServerAddress;
|
import org.apache.hadoop.hbase.HServerAddress;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -219,5 +221,29 @@ public interface HConnection {
|
||||||
public void processBatchOfPuts(List<Put> list,
|
public void processBatchOfPuts(List<Put> list,
|
||||||
final byte[] tableName, ExecutorService pool) throws IOException;
|
final byte[] tableName, ExecutorService pool) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enable or disable region cache prefetch for the table. It will be
|
||||||
|
* applied for the given table's all HTable instances within this
|
||||||
|
* connection. By default, the cache prefetch is enabled.
|
||||||
|
* @param tableName name of table to configure.
|
||||||
|
* @param enable Set to true to enable region cache prefetch.
|
||||||
|
*/
|
||||||
|
public void setRegionCachePrefetch(final byte[] tableName,
|
||||||
|
final boolean enable);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether region cache prefetch is enabled or not.
|
||||||
|
* @param tableName name of table to check
|
||||||
|
* @return true if table's region cache prefecth is enabled. Otherwise
|
||||||
|
* it is disabled.
|
||||||
|
*/
|
||||||
|
public boolean getRegionCachePrefetch(final byte[] tableName);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Load the region map and warm up the global region cache for the table.
|
||||||
|
* @param tableName name of the table to perform region cache prewarm.
|
||||||
|
* @param regions a region map.
|
||||||
|
*/
|
||||||
|
public void prewarmRegionCache(final byte[] tableName,
|
||||||
|
final Map<HRegionInfo, HServerAddress> regions);
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,7 @@ import java.util.HashMap;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
@ -63,6 +64,7 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.CopyOnWriteArraySet;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A non-instantiable class that manages connections to multiple tables in
|
* A non-instantiable class that manages connections to multiple tables in
|
||||||
|
@ -245,6 +247,28 @@ public class HConnectionManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* It is provided for unit test cases which verify the behavior of region
|
||||||
|
* location cache prefetch.
|
||||||
|
* @return Number of cached regions for the table.
|
||||||
|
*/
|
||||||
|
static int getCachedRegionCount(Configuration conf,
|
||||||
|
byte[] tableName) {
|
||||||
|
TableServers connection = (TableServers)getConnection(conf);
|
||||||
|
return connection.getNumberOfCachedRegionLocations(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* It's provided for unit test cases which verify the behavior of region
|
||||||
|
* location cache prefetch.
|
||||||
|
* @return true if the region where the table and row reside is cached.
|
||||||
|
*/
|
||||||
|
static boolean isRegionCached(Configuration conf,
|
||||||
|
byte[] tableName, byte[] row) {
|
||||||
|
TableServers connection = (TableServers)getConnection(conf);
|
||||||
|
return connection.isRegionCached(tableName, row);
|
||||||
|
}
|
||||||
|
|
||||||
/* Encapsulates finding the servers for an HBase instance */
|
/* Encapsulates finding the servers for an HBase instance */
|
||||||
static class TableServers implements ServerConnection {
|
static class TableServers implements ServerConnection {
|
||||||
static final Log LOG = LogFactory.getLog(TableServers.class);
|
static final Log LOG = LogFactory.getLog(TableServers.class);
|
||||||
|
@ -253,6 +277,7 @@ public class HConnectionManager {
|
||||||
private final int numRetries;
|
private final int numRetries;
|
||||||
private final int maxRPCAttempts;
|
private final int maxRPCAttempts;
|
||||||
private final long rpcTimeout;
|
private final long rpcTimeout;
|
||||||
|
private final int prefetchRegionLimit;
|
||||||
|
|
||||||
private final Object masterLock = new Object();
|
private final Object masterLock = new Object();
|
||||||
private volatile boolean closed;
|
private volatile boolean closed;
|
||||||
|
@ -276,6 +301,11 @@ public class HConnectionManager {
|
||||||
cachedRegionLocations =
|
cachedRegionLocations =
|
||||||
new HashMap<Integer, SoftValueSortedMap<byte [], HRegionLocation>>();
|
new HashMap<Integer, SoftValueSortedMap<byte [], HRegionLocation>>();
|
||||||
|
|
||||||
|
// region cache prefetch is enabled by default. this set contains all
|
||||||
|
// tables whose region cache prefetch are disabled.
|
||||||
|
private final Set<Integer> regionCachePrefetchDisabledTables =
|
||||||
|
new CopyOnWriteArraySet<Integer>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* constructor
|
* constructor
|
||||||
* @param conf Configuration object
|
* @param conf Configuration object
|
||||||
|
@ -306,6 +336,9 @@ public class HConnectionManager {
|
||||||
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
|
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
|
||||||
HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
|
HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
|
||||||
|
|
||||||
|
this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit",
|
||||||
|
10);
|
||||||
|
|
||||||
this.master = null;
|
this.master = null;
|
||||||
this.masterChecked = false;
|
this.masterChecked = false;
|
||||||
}
|
}
|
||||||
|
@ -649,6 +682,63 @@ public class HConnectionManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Search .META. for the HRegionLocation info that contains the table and
|
||||||
|
* row we're seeking. It will prefetch certain number of regions info and
|
||||||
|
* save them to the global region cache.
|
||||||
|
*/
|
||||||
|
private void prefetchRegionCache(final byte[] tableName,
|
||||||
|
final byte[] row) {
|
||||||
|
// Implement a new visitor for MetaScanner, and use it to walk through
|
||||||
|
// the .META.
|
||||||
|
MetaScannerVisitor visitor = new MetaScannerVisitor() {
|
||||||
|
public boolean processRow(Result result) throws IOException {
|
||||||
|
try {
|
||||||
|
byte[] value = result.getValue(HConstants.CATALOG_FAMILY,
|
||||||
|
HConstants.REGIONINFO_QUALIFIER);
|
||||||
|
HRegionInfo regionInfo = null;
|
||||||
|
|
||||||
|
if (value != null) {
|
||||||
|
// convert the row result into the HRegionLocation we need!
|
||||||
|
regionInfo = Writables.getHRegionInfo(value);
|
||||||
|
|
||||||
|
// possible we got a region of a different table...
|
||||||
|
if (!Bytes.equals(regionInfo.getTableDesc().getName(),
|
||||||
|
tableName)) {
|
||||||
|
return false; // stop scanning
|
||||||
|
}
|
||||||
|
if (regionInfo.isOffline()) {
|
||||||
|
// don't cache offline regions
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
value = result.getValue(HConstants.CATALOG_FAMILY,
|
||||||
|
HConstants.SERVER_QUALIFIER);
|
||||||
|
if (value == null) {
|
||||||
|
return true; // don't cache it
|
||||||
|
}
|
||||||
|
final String serverAddress = Bytes.toString(value);
|
||||||
|
|
||||||
|
// instantiate the location
|
||||||
|
HRegionLocation loc = new HRegionLocation(regionInfo,
|
||||||
|
new HServerAddress(serverAddress));
|
||||||
|
// cache this meta entry
|
||||||
|
cacheLocation(tableName, loc);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
try {
|
||||||
|
// pre-fetch certain number of regions info at region cache.
|
||||||
|
MetaScanner.metaScan(conf, visitor, tableName, row,
|
||||||
|
this.prefetchRegionLimit);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Encounted problems when prefetch META table: ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Search one of the meta tables (-ROOT- or .META.) for the HRegionLocation
|
* Search one of the meta tables (-ROOT- or .META.) for the HRegionLocation
|
||||||
* info that contains the table and row we're seeking.
|
* info that contains the table and row we're seeking.
|
||||||
|
@ -689,6 +779,13 @@ public class HConnectionManager {
|
||||||
// region at the same time. The first will load the meta region and
|
// region at the same time. The first will load the meta region and
|
||||||
// the second will use the value that the first one found.
|
// the second will use the value that the first one found.
|
||||||
synchronized (regionLockObject) {
|
synchronized (regionLockObject) {
|
||||||
|
// If the parent table is META, we may want to pre-fetch some
|
||||||
|
// region info into the global region cache for this table.
|
||||||
|
if (Bytes.equals(parentTable, HConstants.META_TABLE_NAME) &&
|
||||||
|
(getRegionCachePrefetch(tableName)) ) {
|
||||||
|
prefetchRegionCache(tableName, row);
|
||||||
|
}
|
||||||
|
|
||||||
// Check the cache again for a hit in case some other thread made the
|
// Check the cache again for a hit in case some other thread made the
|
||||||
// same query while we were waiting on the lock. If not supposed to
|
// same query while we were waiting on the lock. If not supposed to
|
||||||
// be using the cache, delete any existing cached location so it won't
|
// be using the cache, delete any existing cached location so it won't
|
||||||
|
@ -1457,5 +1554,56 @@ public class HConnectionManager {
|
||||||
}
|
}
|
||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Return the number of cached region for a table. It will only be called
|
||||||
|
* from a unit test.
|
||||||
|
*/
|
||||||
|
int getNumberOfCachedRegionLocations(final byte[] tableName) {
|
||||||
|
Integer key = Bytes.mapKey(tableName);
|
||||||
|
synchronized (this.cachedRegionLocations) {
|
||||||
|
SoftValueSortedMap<byte[], HRegionLocation> tableLocs =
|
||||||
|
this.cachedRegionLocations.get(key);
|
||||||
|
|
||||||
|
if (tableLocs == null) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return tableLocs.values().size();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check the region cache to see whether a region is cached yet or not.
|
||||||
|
* Called by unit tests.
|
||||||
|
* @param tableName tableName
|
||||||
|
* @param row row
|
||||||
|
* @return Region cached or not.
|
||||||
|
*/
|
||||||
|
boolean isRegionCached(final byte[] tableName, final byte[] row) {
|
||||||
|
HRegionLocation location = getCachedLocation(tableName, row);
|
||||||
|
return location != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setRegionCachePrefetch(final byte[] tableName,
|
||||||
|
final boolean enable) {
|
||||||
|
if (!enable) {
|
||||||
|
regionCachePrefetchDisabledTables.add(Bytes.mapKey(tableName));
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
regionCachePrefetchDisabledTables.remove(Bytes.mapKey(tableName));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getRegionCachePrefetch(final byte[] tableName) {
|
||||||
|
return !regionCachePrefetchDisabledTables.contains(Bytes.mapKey(tableName));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void prewarmRegionCache(final byte[] tableName,
|
||||||
|
final Map<HRegionInfo, HServerAddress> regions) {
|
||||||
|
for (Map.Entry<HRegionInfo, HServerAddress> e : regions.entrySet()) {
|
||||||
|
cacheLocation(tableName,
|
||||||
|
new HRegionLocation(e.getKey(), e.getValue()));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,6 +52,8 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import java.io.DataInput;
|
||||||
|
import java.io.DataOutput;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used to communicate with a single HBase table.
|
* Used to communicate with a single HBase table.
|
||||||
|
@ -374,6 +376,95 @@ public class HTable implements HTableInterface {
|
||||||
return regionMap;
|
return regionMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Save the passed region information and the table's regions
|
||||||
|
* cache.
|
||||||
|
* <p>
|
||||||
|
* This is mainly useful for the MapReduce integration. You can call
|
||||||
|
* {@link #deserializeRegionInfo deserializeRegionInfo}
|
||||||
|
* to deserialize regions information from a
|
||||||
|
* {@link DataInput}, then call this method to load them to cache.
|
||||||
|
*
|
||||||
|
* <pre>
|
||||||
|
* {@code
|
||||||
|
* HTable t1 = new HTable("foo");
|
||||||
|
* FileInputStream fis = new FileInputStream("regions.dat");
|
||||||
|
* DataInputStream dis = new DataInputStream(fis);
|
||||||
|
*
|
||||||
|
* Map<HRegionInfo, HServerAddress> hm = t1.deserializeRegionInfo(dis);
|
||||||
|
* t1.prewarmRegionCache(hm);
|
||||||
|
* }
|
||||||
|
* </pre>
|
||||||
|
* @param regionMap This piece of regions information will be loaded
|
||||||
|
* to region cache.
|
||||||
|
*/
|
||||||
|
public void prewarmRegionCache(Map<HRegionInfo, HServerAddress> regionMap) {
|
||||||
|
this.connection.prewarmRegionCache(this.getTableName(), regionMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Serialize the regions information of this table and output
|
||||||
|
* to <code>out</code>.
|
||||||
|
* <p>
|
||||||
|
* This is mainly useful for the MapReduce integration. A client could
|
||||||
|
* perform a large scan for all the regions for the table, serialize the
|
||||||
|
* region info to a file. MR job can ship a copy of the meta for the table in
|
||||||
|
* the DistributedCache.
|
||||||
|
* <pre>
|
||||||
|
* {@code
|
||||||
|
* FileOutputStream fos = new FileOutputStream("regions.dat");
|
||||||
|
* DataOutputStream dos = new DataOutputStream(fos);
|
||||||
|
* table.serializeRegionInfo(dos);
|
||||||
|
* dos.flush();
|
||||||
|
* dos.close();
|
||||||
|
* }
|
||||||
|
* </pre>
|
||||||
|
* @param out {@link DataOutput} to serialize this object into.
|
||||||
|
* @throws IOException if a remote or network exception occurs
|
||||||
|
*/
|
||||||
|
public void serializeRegionInfo(DataOutput out) throws IOException {
|
||||||
|
Map<HRegionInfo, HServerAddress> allRegions = this.getRegionsInfo();
|
||||||
|
// first, write number of regions
|
||||||
|
out.writeInt(allRegions.size());
|
||||||
|
for (Map.Entry<HRegionInfo, HServerAddress> es : allRegions.entrySet()) {
|
||||||
|
es.getKey().write(out);
|
||||||
|
es.getValue().write(out);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read from <code>in</code> and deserialize the regions information.
|
||||||
|
*
|
||||||
|
* <p>It behaves similarly as {@link #getRegionsInfo getRegionsInfo}, except
|
||||||
|
* that it loads the region map from a {@link DataInput} object.
|
||||||
|
*
|
||||||
|
* <p>It is supposed to be followed immediately by {@link
|
||||||
|
* #prewarmRegionCache prewarmRegionCache}.
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* Please refer to {@link #prewarmRegionCache prewarmRegionCache} for usage.
|
||||||
|
*
|
||||||
|
* @param in {@link DataInput} object.
|
||||||
|
* @return A map of HRegionInfo with its server address.
|
||||||
|
* @throws IOException if an I/O exception occurs.
|
||||||
|
*/
|
||||||
|
public Map<HRegionInfo, HServerAddress> deserializeRegionInfo(DataInput in)
|
||||||
|
throws IOException {
|
||||||
|
final Map<HRegionInfo, HServerAddress> allRegions =
|
||||||
|
new TreeMap<HRegionInfo, HServerAddress>();
|
||||||
|
|
||||||
|
// the first integer is expected to be the size of records
|
||||||
|
int regionsCount = in.readInt();
|
||||||
|
for (int i = 0; i < regionsCount; ++i) {
|
||||||
|
HRegionInfo hri = new HRegionInfo();
|
||||||
|
hri.readFields(in);
|
||||||
|
HServerAddress hsa = new HServerAddress();
|
||||||
|
hsa.readFields(in);
|
||||||
|
allRegions.put(hri, hsa);
|
||||||
|
}
|
||||||
|
return allRegions;
|
||||||
|
}
|
||||||
|
|
||||||
public Result getRowOrBefore(final byte[] row, final byte[] family)
|
public Result getRowOrBefore(final byte[] row, final byte[] family)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return connection.getRegionServerWithRetries(
|
return connection.getRegionServerWithRetries(
|
||||||
|
@ -1008,4 +1099,57 @@ public class HTable implements HTableInterface {
|
||||||
return t;
|
return t;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enable or disable region cache prefetch for the table. It will be
|
||||||
|
* applied for the given table's all HTable instances who share the same
|
||||||
|
* connection. By default, the cache prefetch is enabled.
|
||||||
|
* @param tableName name of table to configure.
|
||||||
|
* @param enable Set to true to enable region cache prefetch. Or set to
|
||||||
|
* false to disable it.
|
||||||
|
*/
|
||||||
|
public static void setRegionCachePrefetch(final byte[] tableName,
|
||||||
|
boolean enable) {
|
||||||
|
HConnectionManager.getConnection(HBaseConfiguration.create()).
|
||||||
|
setRegionCachePrefetch(tableName, enable);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enable or disable region cache prefetch for the table. It will be
|
||||||
|
* applied for the given table's all HTable instances who share the same
|
||||||
|
* connection. By default, the cache prefetch is enabled.
|
||||||
|
* @param conf The Configuration object to use.
|
||||||
|
* @param tableName name of table to configure.
|
||||||
|
* @param enable Set to true to enable region cache prefetch. Or set to
|
||||||
|
* false to disable it.
|
||||||
|
*/
|
||||||
|
public static void setRegionCachePrefetch(final Configuration conf,
|
||||||
|
final byte[] tableName, boolean enable) {
|
||||||
|
HConnectionManager.getConnection(conf).setRegionCachePrefetch(
|
||||||
|
tableName, enable);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether region cache prefetch is enabled or not for the table.
|
||||||
|
* @param conf The Configuration object to use.
|
||||||
|
* @param tableName name of table to check
|
||||||
|
* @return true if table's region cache prefecth is enabled. Otherwise
|
||||||
|
* it is disabled.
|
||||||
|
*/
|
||||||
|
public static boolean getRegionCachePrefetch(final Configuration conf,
|
||||||
|
final byte[] tableName) {
|
||||||
|
return HConnectionManager.getConnection(conf).getRegionCachePrefetch(
|
||||||
|
tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether region cache prefetch is enabled or not for the table.
|
||||||
|
* @param tableName name of table to check
|
||||||
|
* @return true if table's region cache prefecth is enabled. Otherwise
|
||||||
|
* it is disabled.
|
||||||
|
*/
|
||||||
|
public static boolean getRegionCachePrefetch(final byte[] tableName) {
|
||||||
|
return HConnectionManager.getConnection(HBaseConfiguration.create()).
|
||||||
|
getRegionCachePrefetch(tableName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,9 @@ package org.apache.hadoop.hbase.client;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.Writables;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
@ -60,32 +62,88 @@ class MetaScanner {
|
||||||
public static void metaScan(Configuration configuration,
|
public static void metaScan(Configuration configuration,
|
||||||
MetaScannerVisitor visitor, byte[] tableName)
|
MetaScannerVisitor visitor, byte[] tableName)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
metaScan(configuration, visitor, tableName, null, Integer.MAX_VALUE);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Scans the meta table and calls a visitor on each RowResult. Uses a table
|
||||||
|
* name and a row name to locate meta regions. And it only scans at most
|
||||||
|
* <code>rowLimit</code> of rows.
|
||||||
|
*
|
||||||
|
* @param configuration HBase configuration.
|
||||||
|
* @param visitor Visitor object.
|
||||||
|
* @param tableName User table name.
|
||||||
|
* @param row Name of the row at the user table. The scan will start from
|
||||||
|
* the region row where the row resides.
|
||||||
|
* @param rowLimit Max of processed rows. If it is less than 0, it
|
||||||
|
* will be set to default value <code>Integer.MAX_VALUE</code>.
|
||||||
|
* @throws IOException e
|
||||||
|
*/
|
||||||
|
public static void metaScan(Configuration configuration,
|
||||||
|
MetaScannerVisitor visitor, byte[] tableName, byte[] row,
|
||||||
|
int rowLimit)
|
||||||
|
throws IOException {
|
||||||
|
int rowUpperLimit = rowLimit > 0 ? rowLimit: Integer.MAX_VALUE;
|
||||||
|
|
||||||
HConnection connection = HConnectionManager.getConnection(configuration);
|
HConnection connection = HConnectionManager.getConnection(configuration);
|
||||||
byte [] startRow = tableName == null || tableName.length == 0 ?
|
byte [] startRow = tableName == null || tableName.length == 0 ?
|
||||||
HConstants.EMPTY_START_ROW :
|
HConstants.EMPTY_START_ROW :
|
||||||
HRegionInfo.createRegionName(tableName, null, HConstants.ZEROES,
|
HRegionInfo.createRegionName(tableName, row, HConstants.ZEROES,
|
||||||
false);
|
false);
|
||||||
|
|
||||||
|
// if row is not null, we want to use the startKey of the row's region as
|
||||||
|
// the startRow for the meta scan.
|
||||||
|
if (row != null) {
|
||||||
|
HTable metaTable = new HTable(HConstants.META_TABLE_NAME);
|
||||||
|
Result startRowResult = metaTable.getRowOrBefore(startRow,
|
||||||
|
HConstants.CATALOG_FAMILY);
|
||||||
|
if (startRowResult == null) {
|
||||||
|
throw new TableNotFoundException("Cannot find row in .META. for table: "
|
||||||
|
+ Bytes.toString(tableName) + ", row=" + Bytes.toString(startRow));
|
||||||
|
}
|
||||||
|
byte[] value = startRowResult.getValue(HConstants.CATALOG_FAMILY,
|
||||||
|
HConstants.REGIONINFO_QUALIFIER);
|
||||||
|
if (value == null || value.length == 0) {
|
||||||
|
throw new IOException("HRegionInfo was null or empty in Meta for " +
|
||||||
|
Bytes.toString(tableName) + ", row=" + Bytes.toString(startRow));
|
||||||
|
}
|
||||||
|
HRegionInfo regionInfo = Writables.getHRegionInfo(value);
|
||||||
|
|
||||||
|
byte[] rowBefore = regionInfo.getStartKey();
|
||||||
|
startRow = HRegionInfo.createRegionName(tableName, rowBefore,
|
||||||
|
HConstants.ZEROES, false);
|
||||||
|
}
|
||||||
|
|
||||||
// Scan over each meta region
|
// Scan over each meta region
|
||||||
ScannerCallable callable;
|
ScannerCallable callable;
|
||||||
int rows = configuration.getInt("hbase.meta.scanner.caching", 100);
|
int rows = Math.min(rowLimit,
|
||||||
|
configuration.getInt("hbase.meta.scanner.caching", 100));
|
||||||
do {
|
do {
|
||||||
final Scan scan = new Scan(startRow).addFamily(HConstants.CATALOG_FAMILY);
|
final Scan scan = new Scan(startRow).addFamily(HConstants.CATALOG_FAMILY);
|
||||||
callable = new ScannerCallable(connection, HConstants.META_TABLE_NAME,
|
callable = new ScannerCallable(connection, HConstants.META_TABLE_NAME,
|
||||||
scan);
|
scan);
|
||||||
// Open scanner
|
// Open scanner
|
||||||
connection.getRegionServerWithRetries(callable);
|
connection.getRegionServerWithRetries(callable);
|
||||||
|
|
||||||
|
int processedRows = 0;
|
||||||
try {
|
try {
|
||||||
callable.setCaching(rows);
|
callable.setCaching(rows);
|
||||||
done: do {
|
done: do {
|
||||||
|
if (processedRows >= rowUpperLimit) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
//we have all the rows here
|
//we have all the rows here
|
||||||
Result [] rrs = connection.getRegionServerWithRetries(callable);
|
Result [] rrs = connection.getRegionServerWithRetries(callable);
|
||||||
if (rrs == null || rrs.length == 0 || rrs[0].size() == 0) {
|
if (rrs == null || rrs.length == 0 || rrs[0].size() == 0) {
|
||||||
break; //exit completely
|
break; //exit completely
|
||||||
}
|
}
|
||||||
for (Result rr : rrs) {
|
for (Result rr : rrs) {
|
||||||
|
if (processedRows >= rowUpperLimit) {
|
||||||
|
break done;
|
||||||
|
}
|
||||||
if (!visitor.processRow(rr))
|
if (!visitor.processRow(rr))
|
||||||
break done; //exit completely
|
break done; //exit completely
|
||||||
|
processedRows++;
|
||||||
}
|
}
|
||||||
//here, we didn't break anywhere. Check if we have more rows
|
//here, we didn't break anywhere. Check if we have more rows
|
||||||
} while(true);
|
} while(true);
|
||||||
|
|
|
@ -25,6 +25,11 @@ import static org.junit.Assert.assertSame;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -35,6 +40,7 @@ import java.util.UUID;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
@ -3541,4 +3547,137 @@ public class TestFromClientSide {
|
||||||
|
|
||||||
assertTrue(scan.getFamilyMap().get(FAMILY).size() == 0);
|
assertTrue(scan.getFamilyMap().get(FAMILY).size() == 0);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
/**
|
||||||
|
* HBASE-2468 use case 1 and 2: region info de/serialization
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRegionCacheDeSerialization() throws Exception {
|
||||||
|
// 1. test serialization.
|
||||||
|
final byte[] TABLENAME = Bytes.toBytes("testCachePrewarm2");
|
||||||
|
final byte[] FAMILY = Bytes.toBytes("family");
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
TEST_UTIL.createTable(TABLENAME, FAMILY);
|
||||||
|
|
||||||
|
// Set up test table:
|
||||||
|
// Create table:
|
||||||
|
HTable table = new HTable(conf, TABLENAME);
|
||||||
|
|
||||||
|
// Create multiple regions for this table
|
||||||
|
TEST_UTIL.createMultiRegions(table, FAMILY);
|
||||||
|
|
||||||
|
Path tempPath = new Path(HBaseTestingUtility.getTestDir(), "regions.dat");
|
||||||
|
|
||||||
|
final String tempFileName = tempPath.toString();
|
||||||
|
|
||||||
|
FileOutputStream fos = new FileOutputStream(tempFileName);
|
||||||
|
DataOutputStream dos = new DataOutputStream(fos);
|
||||||
|
|
||||||
|
// serialize the region info and output to a local file.
|
||||||
|
table.serializeRegionInfo(dos);
|
||||||
|
dos.flush();
|
||||||
|
dos.close();
|
||||||
|
|
||||||
|
// read a local file and deserialize the region info from it.
|
||||||
|
FileInputStream fis = new FileInputStream(tempFileName);
|
||||||
|
DataInputStream dis = new DataInputStream(fis);
|
||||||
|
|
||||||
|
Map<HRegionInfo, HServerAddress> deserRegions =
|
||||||
|
table.deserializeRegionInfo(dis);
|
||||||
|
dis.close();
|
||||||
|
|
||||||
|
// regions obtained from meta scanner.
|
||||||
|
Map<HRegionInfo, HServerAddress> loadedRegions =
|
||||||
|
table.getRegionsInfo();
|
||||||
|
|
||||||
|
// set the deserialized regions to the global cache.
|
||||||
|
table.getConnection().clearRegionCache();
|
||||||
|
|
||||||
|
table.getConnection().prewarmRegionCache(table.getTableName(),
|
||||||
|
deserRegions);
|
||||||
|
|
||||||
|
// verify whether the 2 maps are identical or not.
|
||||||
|
assertEquals("Number of cached region is incorrect",
|
||||||
|
HConnectionManager.getCachedRegionCount(conf, TABLENAME),
|
||||||
|
loadedRegions.size());
|
||||||
|
|
||||||
|
// verify each region is prefetched or not.
|
||||||
|
for (Map.Entry<HRegionInfo, HServerAddress> e: loadedRegions.entrySet()) {
|
||||||
|
HRegionInfo hri = e.getKey();
|
||||||
|
assertTrue(HConnectionManager.isRegionCached(conf,
|
||||||
|
hri.getTableDesc().getName(), hri.getStartKey()));
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete the temp file
|
||||||
|
File f = new java.io.File(tempFileName);
|
||||||
|
f.delete();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HBASE-2468 use case 3:
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRegionCachePreWarm() throws Exception {
|
||||||
|
final byte [] TABLENAME = Bytes.toBytes("testCachePrewarm");
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
|
||||||
|
// Set up test table:
|
||||||
|
// Create table:
|
||||||
|
TEST_UTIL.createTable(TABLENAME, FAMILY);
|
||||||
|
|
||||||
|
// disable region cache for the table.
|
||||||
|
HTable.setRegionCachePrefetch(conf, TABLENAME, false);
|
||||||
|
assertFalse("The table is disabled for region cache prefetch",
|
||||||
|
HTable.getRegionCachePrefetch(conf, TABLENAME));
|
||||||
|
|
||||||
|
HTable table = new HTable(conf, TABLENAME);
|
||||||
|
|
||||||
|
// create many regions for the table.
|
||||||
|
TEST_UTIL.createMultiRegions(table, FAMILY);
|
||||||
|
|
||||||
|
// A Get is suppose to do a region lookup request
|
||||||
|
Get g = new Get(Bytes.toBytes("aaa"));
|
||||||
|
table.get(g);
|
||||||
|
|
||||||
|
// only one region should be cached if the cache prefetch is disabled.
|
||||||
|
assertEquals("Number of cached region is incorrect ", 1,
|
||||||
|
HConnectionManager.getCachedRegionCount(conf, TABLENAME));
|
||||||
|
|
||||||
|
// now we enable cached prefetch.
|
||||||
|
HTable.setRegionCachePrefetch(conf, TABLENAME, true);
|
||||||
|
assertTrue("The table is enabled for region cache prefetch",
|
||||||
|
HTable.getRegionCachePrefetch(conf, TABLENAME));
|
||||||
|
|
||||||
|
HTable.setRegionCachePrefetch(conf, TABLENAME, false);
|
||||||
|
assertFalse("The table is disabled for region cache prefetch",
|
||||||
|
HTable.getRegionCachePrefetch(conf, TABLENAME));
|
||||||
|
|
||||||
|
HTable.setRegionCachePrefetch(conf, TABLENAME, true);
|
||||||
|
assertTrue("The table is enabled for region cache prefetch",
|
||||||
|
HTable.getRegionCachePrefetch(conf, TABLENAME));
|
||||||
|
|
||||||
|
table.getConnection().clearRegionCache();
|
||||||
|
|
||||||
|
assertEquals("Number of cached region is incorrect ", 0,
|
||||||
|
HConnectionManager.getCachedRegionCount(conf, TABLENAME));
|
||||||
|
|
||||||
|
// if there is a cache miss, some additional regions should be prefetched.
|
||||||
|
Get g2 = new Get(Bytes.toBytes("bbb"));
|
||||||
|
table.get(g2);
|
||||||
|
|
||||||
|
// get the configured number of cache read-ahead regions
|
||||||
|
int prefetchRegionNumber = conf.getInt("hbase.client.prefetch.limit", 10);
|
||||||
|
|
||||||
|
// the total number of cached regions == region('aaa") + prefeched regions.
|
||||||
|
assertEquals("Number of cached region is incorrect ", prefetchRegionNumber,
|
||||||
|
HConnectionManager.getCachedRegionCount(conf, TABLENAME));
|
||||||
|
|
||||||
|
table.getConnection().clearRegionCache();
|
||||||
|
|
||||||
|
Get g3 = new Get(Bytes.toBytes("abc"));
|
||||||
|
table.get(g3);
|
||||||
|
assertEquals("Number of cached region is incorrect ", prefetchRegionNumber,
|
||||||
|
HConnectionManager.getCachedRegionCount(conf, TABLENAME));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue