HADOOP-2443 Keep lazy cache of regions in client rather than an 'authoritative' list
git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@611727 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fed1075104
commit
27afba4ead
|
@ -38,6 +38,8 @@ Trunk (unreleased changes)
|
|||
HADOOP-2407 Keeping MapFile.Reader open is expensive: Part 2
|
||||
HADOOP-2533 Performance: Scanning, just creating MapWritable in next
|
||||
consumes >20% CPU
|
||||
HADOOP-2443 Keep lazy cache of regions in client rather than an
|
||||
'authoritative' list (Bryan Duxbury via Stack)
|
||||
|
||||
BUG FIXES
|
||||
HADOOP-2059 In tests, exceptions in min dfs shutdown should not fail test
|
||||
|
|
|
@ -117,7 +117,7 @@ public class HBaseAdmin implements HConstants {
|
|||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
try {
|
||||
// Wait for new table to come on-line
|
||||
connection.getTableServers(desc.getName());
|
||||
connection.locateRegion(desc.getName(), EMPTY_START_ROW);
|
||||
break;
|
||||
|
||||
} catch (TableNotFoundException e) {
|
||||
|
@ -541,9 +541,7 @@ public class HBaseAdmin implements HConstants {
|
|||
|
||||
private HRegionLocation getFirstMetaServerForTable(Text tableName)
|
||||
throws IOException {
|
||||
SortedMap<Text, HRegionLocation> metaservers =
|
||||
connection.getTableServers(META_TABLE_NAME);
|
||||
return metaservers.get((metaservers.containsKey(tableName)) ?
|
||||
tableName : metaservers.headMap(tableName).lastKey());
|
||||
Text tableKey = new Text(tableName.toString() + ",,99999999999999");
|
||||
return connection.locateRegion(META_TABLE_NAME, tableKey);
|
||||
}
|
||||
}
|
|
@ -57,24 +57,26 @@ public interface HConnection {
|
|||
public HTableDescriptor[] listTables() throws IOException;
|
||||
|
||||
/**
|
||||
* Gets the servers of the given table.
|
||||
*
|
||||
* @param tableName - the table to be located
|
||||
* @return map of startRow -> RegionLocation
|
||||
* @throws IOException - if the table can not be located after retrying
|
||||
* Find the location of the region of <i>tableName</i> that <i>row</i>
|
||||
* lives in.
|
||||
* @param tableName name of the table <i>row</i> is in
|
||||
* @param row row key you're trying to find the region of
|
||||
* @return HRegionLocation that describes where to find the reigon in
|
||||
* question
|
||||
*/
|
||||
public SortedMap<Text, HRegionLocation> getTableServers(Text tableName)
|
||||
public HRegionLocation locateRegion(Text tableName, Text row)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Reloads servers for the specified table.
|
||||
*
|
||||
* @param tableName name of table whose servers are to be reloaded
|
||||
* @return map of start key -> RegionLocation
|
||||
* @throws IOException
|
||||
* Find the location of the region of <i>tableName</i> that <i>row</i>
|
||||
* lives in, ignoring any value that might be in the cache.
|
||||
* @param tableName name of the table <i>row</i> is in
|
||||
* @param row row key you're trying to find the region of
|
||||
* @return HRegionLocation that describes where to find the reigon in
|
||||
* question
|
||||
*/
|
||||
public SortedMap<Text, HRegionLocation>
|
||||
reloadTableServers(final Text tableName) throws IOException;
|
||||
public HRegionLocation relocateRegion(Text tableName, Text row)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Establishes a connection to the region server at the specified address.
|
||||
|
|
|
@ -95,11 +95,10 @@ public class HConnectionManager implements HConstants {
|
|||
}
|
||||
}
|
||||
|
||||
/* encapsulates finding the servers for an HBase instance */
|
||||
/* Encapsulates finding the servers for an HBase instance */
|
||||
private static class TableServers implements HConnection, HConstants {
|
||||
private static final Log LOG = LogFactory.getLog(TableServers.class);
|
||||
private final Class<? extends HRegionInterface> serverInterfaceClass;
|
||||
private final long threadWakeFrequency;
|
||||
private final long pause;
|
||||
private final int numRetries;
|
||||
|
||||
|
@ -110,21 +109,20 @@ public class HConnectionManager implements HConstants {
|
|||
|
||||
private final Integer rootRegionLock = new Integer(0);
|
||||
private final Integer metaRegionLock = new Integer(0);
|
||||
|
||||
private final Integer userRegionLock = new Integer(0);
|
||||
|
||||
private volatile HBaseConfiguration conf;
|
||||
|
||||
// Map tableName -> (Map startRow -> (HRegionInfo, HServerAddress)
|
||||
private Map<Text, SortedMap<Text, HRegionLocation>> tablesToServers;
|
||||
|
||||
// Set of closed tables
|
||||
private Set<Text> closedTables;
|
||||
|
||||
// Set of tables currently being located
|
||||
private Set<Text> tablesBeingLocated;
|
||||
|
||||
// Known region HServerAddress.toString() -> HRegionInterface
|
||||
private Map<String, HRegionInterface> servers;
|
||||
|
||||
private HRegionLocation rootRegionLocation;
|
||||
|
||||
private Map<Text, SortedMap<Text, HRegionLocation>> cachedRegionLocations;
|
||||
|
||||
/**
|
||||
* constructor
|
||||
* @param conf Configuration object
|
||||
|
@ -147,20 +145,15 @@ public class HConnectionManager implements HConstants {
|
|||
"Unable to find region server interface " + serverClassName, e);
|
||||
}
|
||||
|
||||
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
|
||||
this.pause = conf.getLong("hbase.client.pause", 30 * 1000);
|
||||
this.numRetries = conf.getInt("hbase.client.retries.number", 5);
|
||||
|
||||
this.master = null;
|
||||
this.masterChecked = false;
|
||||
|
||||
this.tablesToServers =
|
||||
this.cachedRegionLocations =
|
||||
new ConcurrentHashMap<Text, SortedMap<Text, HRegionLocation>>();
|
||||
|
||||
this.closedTables = Collections.synchronizedSet(new HashSet<Text>());
|
||||
this.tablesBeingLocated = Collections.synchronizedSet(
|
||||
new HashSet<Text>());
|
||||
|
||||
this.servers = new ConcurrentHashMap<String, HRegionInterface>();
|
||||
}
|
||||
|
||||
|
@ -246,18 +239,30 @@ public class HConnectionManager implements HConstants {
|
|||
/** {@inheritDoc} */
|
||||
public HTableDescriptor[] listTables() throws IOException {
|
||||
HashSet<HTableDescriptor> uniqueTables = new HashSet<HTableDescriptor>();
|
||||
long scannerId = -1L;
|
||||
HRegionInterface server = null;
|
||||
|
||||
Text startRow = EMPTY_START_ROW;
|
||||
HRegionLocation metaLocation = null;
|
||||
|
||||
SortedMap<Text, HRegionLocation> metaTables =
|
||||
getTableServers(META_TABLE_NAME);
|
||||
// scan over the each meta region
|
||||
do {
|
||||
try{
|
||||
// turn the start row into a location
|
||||
metaLocation =
|
||||
locateRegion(META_TABLE_NAME, startRow);
|
||||
|
||||
for (HRegionLocation t: metaTables.values()) {
|
||||
HRegionInterface server = getHRegionConnection(t.getServerAddress());
|
||||
long scannerId = -1L;
|
||||
try {
|
||||
scannerId = server.openScanner(t.getRegionInfo().getRegionName(),
|
||||
COLUMN_FAMILY_ARRAY, EMPTY_START_ROW, System.currentTimeMillis(),
|
||||
null);
|
||||
// connect to the server hosting the .META. region
|
||||
server =
|
||||
getHRegionConnection(metaLocation.getServerAddress());
|
||||
|
||||
// open a scanner over the meta region
|
||||
scannerId = server.openScanner(
|
||||
metaLocation.getRegionInfo().getRegionName(),
|
||||
COLUMN_FAMILY_ARRAY, EMPTY_START_ROW, LATEST_TIMESTAMP,
|
||||
null);
|
||||
|
||||
// iterate through the scanner, accumulating unique table names
|
||||
while (true) {
|
||||
HbaseMapWritable values = server.next(scannerId);
|
||||
if (values == null || values.size() == 0) {
|
||||
|
@ -277,78 +282,330 @@ public class HConnectionManager implements HConstants {
|
|||
}
|
||||
}
|
||||
}
|
||||
} catch (RemoteException ex) {
|
||||
throw RemoteExceptionHandler.decodeRemoteException(ex);
|
||||
|
||||
} finally {
|
||||
|
||||
server.close(scannerId);
|
||||
scannerId = -1L;
|
||||
|
||||
// advance the startRow to the end key of the current region
|
||||
startRow = metaLocation.getRegionInfo().getEndKey();
|
||||
} catch (IOException e) {
|
||||
// need retry logic?
|
||||
throw e;
|
||||
}
|
||||
finally {
|
||||
if (scannerId != -1L) {
|
||||
server.close(scannerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
} while (startRow.compareTo(EMPTY_START_ROW) != 0);
|
||||
|
||||
return uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public SortedMap<Text, HRegionLocation> getTableServers(Text tableName)
|
||||
throws IOException {
|
||||
|
||||
public HRegionLocation locateRegion(Text tableName, Text row)
|
||||
throws IOException{
|
||||
return locateRegion(tableName, row, true);
|
||||
}
|
||||
|
||||
public HRegionLocation relocateRegion(Text tableName, Text row)
|
||||
throws IOException{
|
||||
return locateRegion(tableName, row, false);
|
||||
}
|
||||
|
||||
private HRegionLocation locateRegion(Text tableName, Text row,
|
||||
boolean useCache)
|
||||
throws IOException{
|
||||
if (tableName == null || tableName.getLength() == 0) {
|
||||
throw new IllegalArgumentException(
|
||||
"table name cannot be null or zero length");
|
||||
}
|
||||
|
||||
closedTables.remove(tableName);
|
||||
|
||||
SortedMap<Text, HRegionLocation> tableServers =
|
||||
tablesToServers.get(tableName);
|
||||
|
||||
if (tableServers == null ) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("No servers for " + tableName + ". Doing a find...");
|
||||
}
|
||||
// We don't know where the table is.
|
||||
// Load the information from meta.
|
||||
tableServers = findServersForTable(tableName);
|
||||
}
|
||||
SortedMap<Text, HRegionLocation> servers =
|
||||
new TreeMap<Text, HRegionLocation>();
|
||||
servers.putAll(tableServers);
|
||||
return servers;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public SortedMap<Text, HRegionLocation>
|
||||
reloadTableServers(final Text tableName) throws IOException {
|
||||
closedTables.remove(tableName);
|
||||
SortedMap<Text, HRegionLocation> tableServers =
|
||||
new TreeMap<Text, HRegionLocation>();
|
||||
// Reload information for the whole table
|
||||
tableServers.putAll(findServersForTable(tableName));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
int count = 0;
|
||||
for (HRegionLocation location: tableServers.values()) {
|
||||
if (sb.length() > 0) {
|
||||
sb.append(" ");
|
||||
|
||||
if (tableName.equals(ROOT_TABLE_NAME)) {
|
||||
synchronized (rootRegionLock) {
|
||||
// This block guards against two threads trying to find the root
|
||||
// region at the same time. One will go do the find while the
|
||||
// second waits. The second thread will not do find.
|
||||
|
||||
if (!useCache || rootRegionLocation == null) {
|
||||
return locateRootRegion();
|
||||
}
|
||||
sb.append(count++);
|
||||
sb.append(". ");
|
||||
sb.append("address=");
|
||||
sb.append(location.getServerAddress());
|
||||
sb.append(", ");
|
||||
sb.append(location.getRegionInfo().getRegionName());
|
||||
return rootRegionLocation;
|
||||
}
|
||||
} else if (tableName.equals(META_TABLE_NAME)) {
|
||||
synchronized (metaRegionLock) {
|
||||
// This block guards against two threads trying to load the meta
|
||||
// region at the same time. The first will load the meta region and
|
||||
// the second will use the value that the first one found.
|
||||
|
||||
return locateRegionInMeta(ROOT_TABLE_NAME, tableName, row, useCache);
|
||||
}
|
||||
} else {
|
||||
synchronized(userRegionLock){
|
||||
return locateRegionInMeta(META_TABLE_NAME, tableName, row, useCache);
|
||||
}
|
||||
LOG.debug("Result of findTable on " + tableName.toString() +
|
||||
": " + sb.toString());
|
||||
}
|
||||
|
||||
return tableServers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience method for turning a MapWritable into the underlying
|
||||
* SortedMap we all know and love.
|
||||
*/
|
||||
private SortedMap<Text, byte[]> sortedMapFromMapWritable(
|
||||
HbaseMapWritable writable) {
|
||||
SortedMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
||||
for (Map.Entry<Writable, Writable> e: writable.entrySet()) {
|
||||
HStoreKey key = (HStoreKey) e.getKey();
|
||||
results.put(key.getColumn(),
|
||||
((ImmutableBytesWritable) e.getValue()).get());
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
/**
|
||||
* Search one of the meta tables (-ROOT- or .META.) for the HRegionLocation
|
||||
* info that contains the table and row we're seeking.
|
||||
*/
|
||||
private HRegionLocation locateRegionInMeta(Text parentTable,
|
||||
Text tableName, Text row, boolean useCache)
|
||||
throws IOException{
|
||||
HRegionLocation location = null;
|
||||
|
||||
// if we're supposed to be using the cache, then check it for a possible
|
||||
// hit. otherwise, delete any existing cached location so it won't
|
||||
// interfere.
|
||||
if (useCache) {
|
||||
location = getCachedLocation(tableName, row);
|
||||
if (location != null) {
|
||||
LOG.debug("Looking in " + parentTable + " for "
|
||||
+ tableName + "/" + row
|
||||
+ ", got a cache hit with "
|
||||
+ location.getRegionInfo().getRegionName());
|
||||
return location;
|
||||
}
|
||||
} else{
|
||||
deleteCachedLocation(tableName, row);
|
||||
}
|
||||
|
||||
// build the key of the meta region we should be looking for.
|
||||
// the extra 9's on the end are necessary to allow "exact" matches
|
||||
// without knowing the precise region names.
|
||||
Text metaKey = new Text(tableName.toString() + ","
|
||||
+ row.toString() + ",999999999999999");
|
||||
|
||||
int tries = 0;
|
||||
while (true) {
|
||||
tries++;
|
||||
|
||||
if (tries >= numRetries) {
|
||||
throw new NoServerForRegionException("Unable to find region for "
|
||||
+ row + " after " + numRetries + " tries.");
|
||||
}
|
||||
|
||||
try{
|
||||
// locate the root region
|
||||
HRegionLocation metaLocation = locateRegion(parentTable, metaKey);
|
||||
HRegionInterface server =
|
||||
getHRegionConnection(metaLocation.getServerAddress());
|
||||
|
||||
// query the root region for the location of the meta region
|
||||
HbaseMapWritable regionInfoRow = server.getClosestRowBefore(
|
||||
metaLocation.getRegionInfo().getRegionName(),
|
||||
metaKey, HConstants.LATEST_TIMESTAMP);
|
||||
|
||||
// convert the MapWritable into a Map we can use
|
||||
SortedMap<Text, byte[]> results =
|
||||
sortedMapFromMapWritable(regionInfoRow);
|
||||
|
||||
byte[] bytes = results.get(COL_REGIONINFO);
|
||||
|
||||
if (bytes == null || bytes.length == 0) {
|
||||
throw new IOException("HRegionInfo was null or empty in " +
|
||||
parentTable);
|
||||
}
|
||||
|
||||
// convert the row result into the HRegionLocation we need!
|
||||
HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable(
|
||||
results.get(COL_REGIONINFO), new HRegionInfo());
|
||||
|
||||
if (regionInfo.isOffline()) {
|
||||
throw new IllegalStateException("region offline: " +
|
||||
regionInfo.getRegionName());
|
||||
}
|
||||
|
||||
// possible we got a region of a different table...
|
||||
if (!regionInfo.getTableDesc().getName().equals(tableName)) {
|
||||
throw new TableNotFoundException(
|
||||
"Table '" + tableName + "' was not found.");
|
||||
}
|
||||
|
||||
String serverAddress =
|
||||
Writables.bytesToString(results.get(COL_SERVER));
|
||||
|
||||
if (serverAddress.equals("")) {
|
||||
throw new NoServerForRegionException(
|
||||
"No server address listed in " + parentTable + " for region "
|
||||
+ regionInfo.getRegionName());
|
||||
}
|
||||
|
||||
// instantiate the location
|
||||
location = new HRegionLocation(regionInfo,
|
||||
new HServerAddress(serverAddress));
|
||||
|
||||
cacheLocation(tableName, location);
|
||||
|
||||
return location;
|
||||
} catch (IllegalStateException e) {
|
||||
if (tries < numRetries - 1) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
relocateRegion(parentTable, metaKey);
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException(
|
||||
(RemoteException) e);
|
||||
}
|
||||
if (tries < numRetries - 1) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
relocateRegion(parentTable, metaKey);
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
try{
|
||||
Thread.sleep(pause);
|
||||
} catch (InterruptedException e){
|
||||
// continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Search the cache for a location that fits our table and row key.
|
||||
* Return null if no suitable region is located. TODO: synchronization note
|
||||
*/
|
||||
private HRegionLocation getCachedLocation(Text tableName, Text row) {
|
||||
// find the map of cached locations for this table
|
||||
SortedMap<Text, HRegionLocation> tableLocations =
|
||||
cachedRegionLocations.get(tableName);
|
||||
|
||||
// if tableLocations for this table isn't built yet, make one
|
||||
if (tableLocations == null) {
|
||||
tableLocations = new TreeMap<Text, HRegionLocation>();
|
||||
cachedRegionLocations.put(tableName, tableLocations);
|
||||
}
|
||||
|
||||
// start to examine the cache. we can only do cache actions
|
||||
// if there's something in the cache for this table.
|
||||
if (!tableLocations.isEmpty()) {
|
||||
if (tableLocations.containsKey(row)) {
|
||||
return tableLocations.get(row);
|
||||
}
|
||||
|
||||
// cut the cache so that we only get the part that could contain
|
||||
// regions that match our key
|
||||
SortedMap<Text, HRegionLocation> matchingRegions =
|
||||
tableLocations.headMap(row);
|
||||
|
||||
// if that portion of the map is empty, then we're done. otherwise,
|
||||
// we need to examine the cached location to verify that it is
|
||||
// a match by end key as well.
|
||||
if (!matchingRegions.isEmpty()) {
|
||||
HRegionLocation possibleRegion =
|
||||
matchingRegions.get(matchingRegions.lastKey());
|
||||
|
||||
Text endKey = possibleRegion.getRegionInfo().getEndKey();
|
||||
|
||||
// make sure that the end key is greater than the row we're looking
|
||||
// for, otherwise the row actually belongs in the next region, not
|
||||
// this one. the exception case is when the endkey is EMPTY_START_ROW,
|
||||
// signifying that the region we're checking is actually the last
|
||||
// region in the table.
|
||||
if (endKey.equals(EMPTY_TEXT) || endKey.compareTo(row) > 0) {
|
||||
return possibleRegion;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// passed all the way through, so we got nothin - complete cache miss
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Delete a cached location, if it satisfies the table name and row
|
||||
* requirements.
|
||||
*/
|
||||
private void deleteCachedLocation(Text tableName, Text row){
|
||||
// find the map of cached locations for this table
|
||||
SortedMap<Text, HRegionLocation> tableLocations =
|
||||
cachedRegionLocations.get(tableName);
|
||||
|
||||
// if tableLocations for this table isn't built yet, make one
|
||||
if (tableLocations == null) {
|
||||
tableLocations = new TreeMap<Text, HRegionLocation>();
|
||||
cachedRegionLocations.put(tableName, tableLocations);
|
||||
}
|
||||
|
||||
// start to examine the cache. we can only do cache actions
|
||||
// if there's something in the cache for this table.
|
||||
if (!tableLocations.isEmpty()) {
|
||||
// cut the cache so that we only get the part that could contain
|
||||
// regions that match our key
|
||||
SortedMap<Text, HRegionLocation> matchingRegions =
|
||||
tableLocations.headMap(row);
|
||||
|
||||
// if that portion of the map is empty, then we're done. otherwise,
|
||||
// we need to examine the cached location to verify that it is
|
||||
// a match by end key as well.
|
||||
if (!matchingRegions.isEmpty()) {
|
||||
HRegionLocation possibleRegion =
|
||||
matchingRegions.get(matchingRegions.lastKey());
|
||||
|
||||
Text endKey = possibleRegion.getRegionInfo().getEndKey();
|
||||
|
||||
// by nature of the map, we know that the start key has to be <
|
||||
// otherwise it wouldn't be in the headMap.
|
||||
if (endKey.compareTo(row) <= 0) {
|
||||
// delete any matching entry
|
||||
tableLocations.remove(matchingRegions.lastKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Put a newly discovered HRegionLocation into the cache.
|
||||
*/
|
||||
private void cacheLocation(Text tableName, HRegionLocation location){
|
||||
Text startKey = location.getRegionInfo().getStartKey();
|
||||
|
||||
// find the map of cached locations for this table
|
||||
SortedMap<Text, HRegionLocation> tableLocations =
|
||||
cachedRegionLocations.get(tableName);
|
||||
|
||||
// if tableLocations for this table isn't built yet, make one
|
||||
if (tableLocations == null) {
|
||||
tableLocations = new TreeMap<Text, HRegionLocation>();
|
||||
cachedRegionLocations.put(tableName, tableLocations);
|
||||
}
|
||||
|
||||
// save the HRegionLocation under the startKey
|
||||
tableLocations.put(startKey, location);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public HRegionInterface getHRegionConnection(
|
||||
HServerAddress regionServer) throws IOException {
|
||||
HServerAddress regionServer)
|
||||
throws IOException {
|
||||
|
||||
HRegionInterface server;
|
||||
synchronized (this.servers) {
|
||||
|
@ -390,192 +647,58 @@ public class HConnectionManager implements HConstants {
|
|||
throw new IllegalArgumentException(
|
||||
"table name cannot be null or zero length");
|
||||
}
|
||||
|
||||
|
||||
if (closedTables.contains(tableName)) {
|
||||
// Table already closed. Ignore it.
|
||||
return;
|
||||
}
|
||||
|
||||
SortedMap<Text, HRegionLocation> tableServers =
|
||||
tablesToServers.remove(tableName);
|
||||
|
||||
if (tableServers == null) {
|
||||
// Table not open. Ignore it.
|
||||
return;
|
||||
}
|
||||
|
||||
closedTables.add(tableName);
|
||||
|
||||
// Shut down connections to the HRegionServers
|
||||
|
||||
synchronized (this.servers) {
|
||||
for (HRegionLocation r: tableServers.values()) {
|
||||
this.servers.remove(r.getServerAddress().toString());
|
||||
if (cachedRegionLocations.containsKey(tableName)) {
|
||||
SortedMap<Text, HRegionLocation> tableServers =
|
||||
cachedRegionLocations.remove(tableName);
|
||||
|
||||
// Shut down connections to the HRegionServers
|
||||
synchronized (this.servers) {
|
||||
for (HRegionLocation r: tableServers.values()) {
|
||||
this.servers.remove(r.getServerAddress().toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Convenience method for closing all open tables.*/
|
||||
void closeAll() {
|
||||
this.closed = true;
|
||||
ArrayList<Text> tables = new ArrayList<Text>(tablesToServers.keySet());
|
||||
ArrayList<Text> tables =
|
||||
new ArrayList<Text>(cachedRegionLocations.keySet());
|
||||
for (Text tableName: tables) {
|
||||
close(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Clears the cache of all known information about the specified table and
|
||||
* locates a table by searching the META or ROOT region (as appropriate) or
|
||||
* by querying the master for the location of the root region if that is the
|
||||
* table requested.
|
||||
*
|
||||
* @param tableName - name of table to find servers for
|
||||
* @return - map of first row to table info for all regions in the table
|
||||
* @throws IOException
|
||||
*/
|
||||
private SortedMap<Text, HRegionLocation> findServersForTable(Text tableName)
|
||||
throws IOException {
|
||||
|
||||
// Wipe out everything we know about this table
|
||||
if (this.tablesToServers.remove(tableName) != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Wiping out all we know of " + tableName);
|
||||
}
|
||||
}
|
||||
|
||||
SortedMap<Text, HRegionLocation> srvrs =
|
||||
new TreeMap<Text, HRegionLocation>();
|
||||
|
||||
if (tableName.equals(ROOT_TABLE_NAME)) {
|
||||
synchronized (rootRegionLock) {
|
||||
// This block guards against two threads trying to find the root
|
||||
// region at the same time. One will go do the find while the
|
||||
// second waits. The second thread will not do find.
|
||||
|
||||
srvrs = this.tablesToServers.get(ROOT_TABLE_NAME);
|
||||
|
||||
if (srvrs == null) {
|
||||
srvrs = locateRootRegion();
|
||||
}
|
||||
this.tablesToServers.put(tableName, srvrs);
|
||||
}
|
||||
|
||||
} else if (tableName.equals(META_TABLE_NAME)) {
|
||||
synchronized (metaRegionLock) {
|
||||
// This block guards against two threads trying to load the meta
|
||||
// region at the same time. The first will load the meta region and
|
||||
// the second will use the value that the first one found.
|
||||
|
||||
SortedMap<Text, HRegionLocation> rootServers =
|
||||
tablesToServers.get(ROOT_TABLE_NAME);
|
||||
|
||||
for (boolean refindRoot = true; refindRoot; ) {
|
||||
if (rootServers == null || rootServers.size() == 0) {
|
||||
// (re)find the root region
|
||||
rootServers = findServersForTable(ROOT_TABLE_NAME);
|
||||
// but don't try again
|
||||
refindRoot = false;
|
||||
}
|
||||
try {
|
||||
srvrs = getTableServers(rootServers, META_TABLE_NAME);
|
||||
break;
|
||||
|
||||
} catch (NotServingRegionException e) {
|
||||
if (!refindRoot) {
|
||||
// Already found root once. Give up.
|
||||
throw e;
|
||||
}
|
||||
// The root region must have moved - refind it
|
||||
rootServers.clear();
|
||||
}
|
||||
}
|
||||
this.tablesToServers.put(tableName, srvrs);
|
||||
}
|
||||
} else {
|
||||
boolean waited = false;
|
||||
synchronized (this.tablesBeingLocated) {
|
||||
// This block ensures that only one thread will actually try to
|
||||
// find a table. If a second thread comes along it will wait
|
||||
// until the first thread finishes finding the table.
|
||||
|
||||
while (this.tablesBeingLocated.contains(tableName)) {
|
||||
waited = true;
|
||||
try {
|
||||
this.tablesBeingLocated.wait(threadWakeFrequency);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
if (!waited) {
|
||||
this.tablesBeingLocated.add(tableName);
|
||||
|
||||
} else {
|
||||
SortedMap<Text, HRegionLocation> tableServers =
|
||||
this.tablesToServers.get(tableName);
|
||||
|
||||
if (tableServers == null) {
|
||||
throw new TableNotFoundException("table not found: " + tableName);
|
||||
}
|
||||
srvrs.putAll(tableServers);
|
||||
}
|
||||
}
|
||||
if (!waited) {
|
||||
try {
|
||||
SortedMap<Text, HRegionLocation> metaServers =
|
||||
this.tablesToServers.get(META_TABLE_NAME);
|
||||
|
||||
for (boolean refindMeta = true; refindMeta; ) {
|
||||
if (metaServers == null || metaServers.size() == 0) {
|
||||
// (re)find the meta table
|
||||
metaServers = findServersForTable(META_TABLE_NAME);
|
||||
// but don't try again
|
||||
refindMeta = false;
|
||||
}
|
||||
try {
|
||||
srvrs = getTableServers(metaServers, tableName);
|
||||
break;
|
||||
|
||||
} catch (NotServingRegionException e) {
|
||||
if (!refindMeta) {
|
||||
// Already refound meta once. Give up.
|
||||
throw e;
|
||||
}
|
||||
// The meta table must have moved - refind it
|
||||
metaServers.clear();
|
||||
}
|
||||
}
|
||||
this.tablesToServers.put(tableName, srvrs);
|
||||
} finally {
|
||||
synchronized (this.tablesBeingLocated) {
|
||||
// Wake up the threads waiting for us to find the table
|
||||
this.tablesBeingLocated.remove(tableName);
|
||||
this.tablesBeingLocated.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
this.tablesToServers.put(tableName, srvrs);
|
||||
return srvrs;
|
||||
}
|
||||
|
||||
/*
|
||||
* Repeatedly try to find the root region by asking the master for where it is
|
||||
* @return TreeMap<Text, TableInfo> for root regin if found
|
||||
* @return HRegionLocation for root region if found
|
||||
* @throws NoServerForRegionException - if the root region can not be located
|
||||
* after retrying
|
||||
* @throws IOException
|
||||
*/
|
||||
private TreeMap<Text, HRegionLocation> locateRootRegion()
|
||||
private HRegionLocation locateRootRegion()
|
||||
throws IOException {
|
||||
|
||||
getMaster();
|
||||
|
||||
HServerAddress rootRegionLocation = null;
|
||||
HServerAddress rootRegionAddress = null;
|
||||
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
int localTimeouts = 0;
|
||||
while (rootRegionLocation == null && localTimeouts < numRetries) {
|
||||
rootRegionLocation = master.findRootRegion();
|
||||
if (rootRegionLocation == null) {
|
||||
|
||||
// ask the master which server has the root region
|
||||
while (rootRegionAddress == null && localTimeouts < numRetries) {
|
||||
rootRegionAddress = master.findRootRegion();
|
||||
if (rootRegionAddress == null) {
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Sleeping. Waiting for root region.");
|
||||
|
@ -591,15 +714,18 @@ public class HConnectionManager implements HConstants {
|
|||
}
|
||||
}
|
||||
|
||||
if (rootRegionLocation == null) {
|
||||
if (rootRegionAddress == null) {
|
||||
throw new NoServerForRegionException(
|
||||
"Timed out trying to locate root region");
|
||||
}
|
||||
|
||||
HRegionInterface rootRegion = getHRegionConnection(rootRegionLocation);
|
||||
// get a connection to the region server
|
||||
HRegionInterface server = getHRegionConnection(rootRegionAddress);
|
||||
|
||||
try {
|
||||
rootRegion.getRegionInfo(HRegionInfo.rootRegionInfo.getRegionName());
|
||||
// if this works, then we're good, and we have an acceptable address,
|
||||
// so we can stop doing retries and return the result.
|
||||
server.getRegionInfo(HRegionInfo.rootRegionInfo.getRegionName());
|
||||
break;
|
||||
} catch (IOException e) {
|
||||
if (tries == numRetries - 1) {
|
||||
|
@ -624,183 +750,20 @@ public class HConnectionManager implements HConstants {
|
|||
// continue
|
||||
}
|
||||
}
|
||||
rootRegionLocation = null;
|
||||
|
||||
rootRegionAddress = null;
|
||||
}
|
||||
|
||||
if (rootRegionLocation == null) {
|
||||
// if the adress is null by this point, then the retries have failed,
|
||||
// and we're sort of sunk
|
||||
if (rootRegionAddress == null) {
|
||||
throw new NoServerForRegionException(
|
||||
"unable to locate root region server");
|
||||
}
|
||||
|
||||
TreeMap<Text, HRegionLocation> rootServer =
|
||||
new TreeMap<Text, HRegionLocation>();
|
||||
|
||||
rootServer.put(EMPTY_START_ROW,
|
||||
new HRegionLocation(HRegionInfo.rootRegionInfo, rootRegionLocation));
|
||||
|
||||
return rootServer;
|
||||
}
|
||||
|
||||
/*
|
||||
* @param metaServers the meta servers that would know where the table is
|
||||
* @param tableName name of the table
|
||||
* @return map of region start key -> server location
|
||||
* @throws IOException
|
||||
*/
|
||||
private SortedMap<Text, HRegionLocation> getTableServers(
|
||||
final SortedMap<Text, HRegionLocation> metaServers,
|
||||
final Text tableName) throws IOException {
|
||||
|
||||
// If there is more than one meta server, find the first one that should
|
||||
// know about the table we are looking for, and reduce the number of
|
||||
// servers we need to query.
|
||||
|
||||
SortedMap<Text, HRegionLocation> metaServersForTable = metaServers;
|
||||
if (metaServersForTable.size() > 1) {
|
||||
Text firstMetaRegion = metaServersForTable.headMap(tableName).lastKey();
|
||||
metaServersForTable = metaServersForTable.tailMap(firstMetaRegion);
|
||||
}
|
||||
|
||||
SortedMap<Text, HRegionLocation> tableServers =
|
||||
new TreeMap<Text, HRegionLocation>();
|
||||
|
||||
int tries = 0;
|
||||
do {
|
||||
if (tries >= numRetries - 1) {
|
||||
throw new NoServerForRegionException(
|
||||
"failed to find server for " + tableName + " after "
|
||||
+ numRetries + " retries");
|
||||
|
||||
} else if (tries > 0) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Sleeping. Table " + tableName +
|
||||
" not currently being served.");
|
||||
}
|
||||
try {
|
||||
Thread.sleep(pause);
|
||||
} catch (InterruptedException ie) {
|
||||
// continue
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Wake. Retry finding table " + tableName);
|
||||
}
|
||||
}
|
||||
for (HRegionLocation t: metaServersForTable.values()) {
|
||||
tableServers.putAll(scanOneMetaRegion(t, tableName));
|
||||
}
|
||||
tries += 1;
|
||||
} while (tableServers.size() == 0);
|
||||
return tableServers;
|
||||
}
|
||||
|
||||
/*
|
||||
* Scans a single meta region
|
||||
* @param t the meta region we're going to scan
|
||||
* @param tableName the name of the table we're looking for
|
||||
* @return returns a map of startingRow to TableInfo
|
||||
* @throws TableNotFoundException - if table does not exist
|
||||
* @throws IllegalStateException - if table is offline
|
||||
* @throws IOException
|
||||
*/
|
||||
private SortedMap<Text, HRegionLocation> scanOneMetaRegion(
|
||||
final HRegionLocation t, final Text tableName) throws IOException {
|
||||
|
||||
HRegionInterface server = getHRegionConnection(t.getServerAddress());
|
||||
TreeMap<Text, HRegionLocation> servers =
|
||||
new TreeMap<Text, HRegionLocation>();
|
||||
|
||||
long scannerId = -1L;
|
||||
try {
|
||||
scannerId = server.openScanner(t.getRegionInfo().getRegionName(),
|
||||
COLUMN_FAMILY_ARRAY, tableName, System.currentTimeMillis(), null);
|
||||
|
||||
while (true) {
|
||||
HbaseMapWritable values = server.next(scannerId);
|
||||
if (values == null || values.size() == 0) {
|
||||
if (servers.size() == 0) {
|
||||
// If we didn't find any servers then the table does not exist
|
||||
throw new TableNotFoundException("table '" + tableName +
|
||||
"' does not exist in " + t);
|
||||
}
|
||||
|
||||
// We found at least one server for the table and now we're done.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Found " + servers.size() + " region(s) for " +
|
||||
tableName + " at " + t);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
SortedMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
||||
for (Map.Entry<Writable, Writable> e: values.entrySet()) {
|
||||
HStoreKey key = (HStoreKey) e.getKey();
|
||||
results.put(key.getColumn(),
|
||||
((ImmutableBytesWritable) e.getValue()).get());
|
||||
}
|
||||
|
||||
byte[] bytes = results.get(COL_REGIONINFO);
|
||||
if (bytes == null || bytes.length == 0) {
|
||||
// This can be null. Looks like an info:splitA or info:splitB
|
||||
// is only item in the row.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(COL_REGIONINFO.toString() + " came back empty: " +
|
||||
results.toString());
|
||||
}
|
||||
servers.clear();
|
||||
break;
|
||||
}
|
||||
|
||||
HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable(
|
||||
results.get(COL_REGIONINFO), new HRegionInfo());
|
||||
|
||||
if (!regionInfo.getTableDesc().getName().equals(tableName)) {
|
||||
// We're done
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Found " + servers.size() + " servers for table " +
|
||||
tableName);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if (regionInfo.isSplit()) {
|
||||
// Region is a split parent. Skip it.
|
||||
continue;
|
||||
}
|
||||
|
||||
if (regionInfo.isOffline()) {
|
||||
throw new IllegalStateException("table offline: " + tableName);
|
||||
}
|
||||
|
||||
bytes = results.get(COL_SERVER);
|
||||
if (bytes == null || bytes.length == 0) {
|
||||
// We need to rescan because the table we want is unassigned.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("no server address for " + regionInfo.toString());
|
||||
}
|
||||
servers.clear();
|
||||
break;
|
||||
}
|
||||
|
||||
String serverAddress = Writables.bytesToString(bytes);
|
||||
servers.put(regionInfo.getStartKey(), new HRegionLocation(
|
||||
regionInfo, new HServerAddress(serverAddress)));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
throw e;
|
||||
|
||||
} finally {
|
||||
if (scannerId != -1L) {
|
||||
try {
|
||||
server.close(scannerId);
|
||||
} catch (Exception ex) {
|
||||
LOG.warn(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
return servers;
|
||||
// return the region location
|
||||
return new HRegionLocation(
|
||||
HRegionInfo.rootRegionInfo, rootRegionAddress);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -551,6 +551,7 @@ public class HRegion implements HConstants {
|
|||
if (closed.get() || !needsSplit(midKey)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
Path splits = new Path(this.regiondir, SPLITDIR);
|
||||
if(!this.fs.exists(splits)) {
|
||||
|
@ -1036,6 +1037,57 @@ public class HRegion implements HConstants {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return all the data for the row that matches <i>row</i> exactly,
|
||||
* or the one that immediately preceeds it, at or immediately before
|
||||
* <i>ts</i>.
|
||||
*
|
||||
* @param row row key
|
||||
* @return map of values
|
||||
* @throws IOException
|
||||
*/
|
||||
public Map<Text, byte[]> getClosestRowBefore(final Text row, final long ts)
|
||||
throws IOException{
|
||||
// look across all the HStores for this region and determine what the
|
||||
// closest key is across all column families, since the data may be sparse
|
||||
|
||||
HStoreKey key = null;
|
||||
checkRow(row);
|
||||
lock.readLock().lock();
|
||||
try {
|
||||
// examine each column family for the preceeding or matching key
|
||||
for(Text colFamily : stores.keySet()){
|
||||
HStore store = stores.get(colFamily);
|
||||
|
||||
// get the closest key
|
||||
Text closestKey = store.getRowKeyAtOrBefore(row, ts);
|
||||
|
||||
// if it happens to be an exact match, we can stop looping
|
||||
if (row.equals(closestKey)) {
|
||||
key = new HStoreKey(closestKey, ts);
|
||||
break;
|
||||
}
|
||||
|
||||
// otherwise, we need to check if it's the max and move to the next
|
||||
if (closestKey != null
|
||||
&& (key == null || closestKey.compareTo(key.getRow()) > 0) ) {
|
||||
key = new HStoreKey(closestKey, ts);
|
||||
}
|
||||
}
|
||||
|
||||
// now that we've found our key, get the values
|
||||
TreeMap<Text, byte []> result = new TreeMap<Text, byte[]>();
|
||||
for (Text colFamily: stores.keySet()) {
|
||||
HStore targetStore = stores.get(colFamily);
|
||||
targetStore.getFull(key, result);
|
||||
}
|
||||
|
||||
return result;
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get <code>versions</code> keys matching the origin key's
|
||||
* row/column/timestamp and those of an older vintage
|
||||
|
|
|
@ -251,7 +251,8 @@ public class HRegionInfo implements WritableComparable {
|
|||
@Override
|
||||
public String toString() {
|
||||
return "regionname: " + this.regionName.toString() + ", startKey: <" +
|
||||
this.startKey.toString() + ">, encodedName(" + getEncodedName() + ")" +
|
||||
this.startKey.toString() + ">, endKey: <" + this.endKey.toString() +
|
||||
">, encodedName(" + getEncodedName() + ")" +
|
||||
(isOffline()? " offline: true,": "") + (isSplit()? " split: true,": "") +
|
||||
" tableDesc: {" + this.tableDesc.toString() + "}";
|
||||
}
|
||||
|
|
|
@ -110,6 +110,31 @@ public interface HRegionInterface extends VersionedProtocol {
|
|||
public HbaseMapWritable getRow(final Text regionName, final Text row, final long ts)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Return all the data for the row that matches <i>row</i> exactly,
|
||||
* or the one that immediately preceeds it.
|
||||
*
|
||||
* @param regionName region name
|
||||
* @param row row key
|
||||
* @return map of values
|
||||
* @throws IOException
|
||||
*/
|
||||
public HbaseMapWritable getClosestRowBefore(final Text regionName, final Text row)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Return all the data for the row that matches <i>row</i> exactly,
|
||||
* or the one that immediately preceeds it, at or immediately before
|
||||
* <i>ts</i>.
|
||||
*
|
||||
* @param regionName region name
|
||||
* @param row row key
|
||||
* @return map of values
|
||||
* @throws IOException
|
||||
*/
|
||||
public HbaseMapWritable getClosestRowBefore(final Text regionName,
|
||||
final Text row, final long ts)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Applies a batch of updates via one RPC
|
||||
|
|
|
@ -1409,6 +1409,38 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public HbaseMapWritable getClosestRowBefore(final Text regionName,
|
||||
final Text row)
|
||||
throws IOException {
|
||||
return getClosestRowBefore(regionName, row, HConstants.LATEST_TIMESTAMP);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public HbaseMapWritable getClosestRowBefore(final Text regionName,
|
||||
final Text row, final long ts)
|
||||
throws IOException {
|
||||
|
||||
checkOpen();
|
||||
requestCount.incrementAndGet();
|
||||
try {
|
||||
// locate the region we're operating on
|
||||
HRegion region = getRegion(regionName);
|
||||
HbaseMapWritable result = new HbaseMapWritable();
|
||||
// ask the region for all the data
|
||||
Map<Text, byte[]> map = region.getClosestRowBefore(row, ts);
|
||||
// convert to a MapWritable
|
||||
for (Map.Entry<Text, byte []> es: map.entrySet()) {
|
||||
result.put(new HStoreKey(row, es.getKey()),
|
||||
new ImmutableBytesWritable(es.getValue()));
|
||||
}
|
||||
return result;
|
||||
|
||||
} catch (IOException e) {
|
||||
checkFileSystem();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public HbaseMapWritable next(final long scannerId) throws IOException {
|
||||
|
@ -1428,6 +1460,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||
HStoreKey key = new HStoreKey();
|
||||
TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
|
||||
while (s.next(key, results)) {
|
||||
/* LOG.debug("RegionServer scanning on row " + key.getRow());*/
|
||||
for(Map.Entry<Text, byte []> e: results.entrySet()) {
|
||||
values.put(new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp()),
|
||||
new ImmutableBytesWritable(e.getValue()));
|
||||
|
|
|
@ -191,6 +191,10 @@ public class HStore implements HConstants {
|
|||
private void internalGetFull(SortedMap<HStoreKey, byte []> map, HStoreKey key,
|
||||
SortedMap<Text, byte []> results) {
|
||||
|
||||
if (map.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
SortedMap<HStoreKey, byte []> tailMap = map.tailMap(key);
|
||||
for (Map.Entry<HStoreKey, byte []> es: tailMap.entrySet()) {
|
||||
HStoreKey itKey = es.getKey();
|
||||
|
@ -208,6 +212,96 @@ public class HStore implements HConstants {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the key that matches <i>row</i> exactly, or the one that immediately
|
||||
* preceeds it.
|
||||
*/
|
||||
public Text getRowKeyAtOrBefore(final Text row, long timestamp)
|
||||
throws IOException{
|
||||
this.lock.readLock().lock();
|
||||
|
||||
Text key_memcache = null;
|
||||
Text key_snapshot = null;
|
||||
|
||||
try {
|
||||
synchronized (memcache) {
|
||||
key_memcache = internalGetRowKeyAtOrBefore(memcache, row, timestamp);
|
||||
}
|
||||
synchronized (snapshot) {
|
||||
key_snapshot = internalGetRowKeyAtOrBefore(snapshot, row, timestamp);
|
||||
}
|
||||
|
||||
if (key_memcache == null && key_snapshot == null) {
|
||||
// didn't find any candidates, return null
|
||||
return null;
|
||||
} else if (key_memcache == null && key_snapshot != null) {
|
||||
return key_snapshot;
|
||||
} else if (key_memcache != null && key_snapshot == null) {
|
||||
return key_memcache;
|
||||
} else {
|
||||
// if either is a precise match, return the original row.
|
||||
if ( (key_memcache != null && key_memcache.equals(row))
|
||||
|| (key_snapshot != null && key_snapshot.equals(row)) ) {
|
||||
return row;
|
||||
} else {
|
||||
// no precise matches, so return the one that is closer to the search
|
||||
// key (greatest)
|
||||
return key_memcache.compareTo(key_snapshot) > 0 ?
|
||||
key_memcache : key_snapshot;
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private Text internalGetRowKeyAtOrBefore(SortedMap<HStoreKey, byte []> map,
|
||||
Text key, long timestamp) {
|
||||
// TODO: account for deleted cells
|
||||
|
||||
HStoreKey search_key = new HStoreKey(key, timestamp);
|
||||
|
||||
// get all the entries that come equal or after our search key
|
||||
SortedMap<HStoreKey, byte []> tailMap = map.tailMap(search_key);
|
||||
|
||||
// if the first item in the tail has a matching row, then we have an
|
||||
// exact match, and we should return that item
|
||||
if (!tailMap.isEmpty() && tailMap.firstKey().getRow().equals(key)) {
|
||||
// seek forward past any cells that don't fulfill the timestamp
|
||||
// argument
|
||||
Iterator<HStoreKey> key_iterator = tailMap.keySet().iterator();
|
||||
HStoreKey found_key = key_iterator.next();
|
||||
|
||||
// keep seeking so long as we're in the same row, and the timstamp
|
||||
// isn't as small as we'd like, and there are more cells to check
|
||||
while (found_key.getRow().equals(key)
|
||||
&& found_key.getTimestamp() > timestamp && key_iterator.hasNext()) {
|
||||
found_key = key_iterator.next();
|
||||
}
|
||||
|
||||
// if this check fails, then we've iterated through all the keys that
|
||||
// match by row, but none match by timestamp, so we fall through to
|
||||
// the headMap case.
|
||||
if (found_key.getTimestamp() <= timestamp) {
|
||||
// we didn't find a key that matched by timestamp, so we have to
|
||||
// return null;
|
||||
/* LOG.debug("Went searching for " + key + ", found " + found_key.getRow());*/
|
||||
return found_key.getRow();
|
||||
}
|
||||
}
|
||||
|
||||
// the tail didn't contain the key we're searching for, so we should
|
||||
// use the last key in the headmap as the closest before
|
||||
SortedMap<HStoreKey, byte []> headMap = map.headMap(search_key);
|
||||
if (headMap.isEmpty()) {
|
||||
/* LOG.debug("Went searching for " + key + ", found nothing!");*/
|
||||
return null;
|
||||
} else {
|
||||
/* LOG.debug("Went searching for " + key + ", found " + headMap.lastKey().getRow());*/
|
||||
return headMap.lastKey().getRow();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Examine a single map for the desired key.
|
||||
*
|
||||
|
@ -1706,6 +1800,116 @@ public class HStore implements HConstants {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the key that matches <i>row</i> exactly, or the one that immediately
|
||||
* preceeds it.
|
||||
*/
|
||||
public Text getRowKeyAtOrBefore(final Text row, final long timestamp)
|
||||
throws IOException{
|
||||
// if the exact key is found, return that key
|
||||
// if we find a key that is greater than our search key, then use the
|
||||
// last key we processed, and if that was null, return null.
|
||||
|
||||
Text foundKey = memcache.getRowKeyAtOrBefore(row, timestamp);
|
||||
if (foundKey != null) {
|
||||
return foundKey;
|
||||
}
|
||||
|
||||
// obtain read lock
|
||||
this.lock.readLock().lock();
|
||||
try {
|
||||
MapFile.Reader[] maparray = getReaders();
|
||||
|
||||
Text bestSoFar = null;
|
||||
|
||||
HStoreKey rowKey = new HStoreKey(row, timestamp);
|
||||
|
||||
// process each store file
|
||||
for(int i = maparray.length - 1; i >= 0; i--) {
|
||||
Text row_from_mapfile =
|
||||
rowAtOrBeforeFromMapFile(maparray[i], row, timestamp);
|
||||
|
||||
// for when we have MapFile.Reader#getClosest before functionality
|
||||
/* Text row_from_mapfile = null;
|
||||
WritableComparable value = null;
|
||||
|
||||
HStoreKey hskResult =
|
||||
(HStoreKey)maparray[i].getClosest(rowKey, value, true);
|
||||
|
||||
if (hskResult != null) {
|
||||
row_from_mapfile = hskResult.getRow();
|
||||
}*/
|
||||
|
||||
/* LOG.debug("Best from this mapfile was " + row_from_mapfile);*/
|
||||
|
||||
// short circuit on an exact match
|
||||
if (row.equals(row_from_mapfile)) {
|
||||
return row;
|
||||
}
|
||||
|
||||
// check to see if we've found a new closest row key as a result
|
||||
if (bestSoFar == null || bestSoFar.compareTo(row_from_mapfile) < 0) {
|
||||
bestSoFar = row_from_mapfile;
|
||||
}
|
||||
}
|
||||
|
||||
/* LOG.debug("Went searching for " + row + ", found " + bestSoFar);*/
|
||||
return bestSoFar;
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check an individual MapFile for the row at or before a given key
|
||||
* and timestamp
|
||||
*/
|
||||
private Text rowAtOrBeforeFromMapFile(MapFile.Reader map, Text row,
|
||||
long timestamp)
|
||||
throws IOException {
|
||||
Text previousRow = null;
|
||||
ImmutableBytesWritable readval = new ImmutableBytesWritable();
|
||||
HStoreKey readkey = new HStoreKey();
|
||||
|
||||
synchronized(map) {
|
||||
// start at the beginning of the map
|
||||
// TODO: this sucks. do a clever binary search instead.
|
||||
map.reset();
|
||||
|
||||
while(map.next(readkey, readval)){
|
||||
if (readkey.getRow().compareTo(row) == 0) {
|
||||
// exact match on row
|
||||
if (readkey.getTimestamp() <= timestamp) {
|
||||
// timestamp fits, return this key
|
||||
return readkey.getRow();
|
||||
}
|
||||
|
||||
// getting here means that we matched the row, but the timestamp
|
||||
// is too recent - hopefully one of the next cells will match
|
||||
// better, so keep rolling
|
||||
}
|
||||
// if the row key we just read is beyond the key we're searching for,
|
||||
// then we're done; return the last key we saw before this one
|
||||
else if (readkey.getRow().toString().compareTo(row.toString()) > 0 ) {
|
||||
return previousRow;
|
||||
} else {
|
||||
// so, the row key doesn't match, and we haven't gone past the row
|
||||
// we're seeking yet, so this row is a candidate for closest, as
|
||||
// long as the timestamp is correct.
|
||||
if (readkey.getTimestamp() <= timestamp) {
|
||||
previousRow = new Text(readkey.getRow());
|
||||
}
|
||||
// otherwise, ignore this key, because it doesn't fulfill our
|
||||
// requirements.
|
||||
}
|
||||
}
|
||||
}
|
||||
// getting here means we exhausted all of the cells in the mapfile.
|
||||
// whatever satisfying row we reached previously is the row we should
|
||||
// return
|
||||
return previousRow;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the <i>target</i> matches the <i>origin</i>. If the
|
||||
* <i>origin</i> has an empty column, then it's assumed to mean any column
|
||||
|
@ -1727,7 +1931,7 @@ public class HStore implements HConstants {
|
|||
// otherwise, we want to match on row and column
|
||||
return target.matchesRowCol(origin);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test that the <i>target</i> matches the <i>origin</i>. If the <i>origin</i>
|
||||
* has an empty column, then it just tests row equivalence. Otherwise, it uses
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
|
@ -55,7 +56,6 @@ public class HTable implements HConstants {
|
|||
protected final long pause;
|
||||
protected final int numRetries;
|
||||
protected Random rand;
|
||||
protected volatile SortedMap<Text, HRegionLocation> tableServers;
|
||||
protected AtomicReference<BatchUpdate> batch;
|
||||
|
||||
protected volatile boolean tableDoesNotExist;
|
||||
|
@ -89,7 +89,6 @@ public class HTable implements HConstants {
|
|||
this.numRetries = conf.getInt("hbase.client.retries.number", 5);
|
||||
this.rand = new Random();
|
||||
this.batch = new AtomicReference<BatchUpdate>();
|
||||
tableServers = connection.getTableServers(tableName);
|
||||
tableDoesNotExist = false;
|
||||
closed = false;
|
||||
}
|
||||
|
@ -99,18 +98,22 @@ public class HTable implements HConstants {
|
|||
* @param row Row to find.
|
||||
* @return Location of row.
|
||||
*/
|
||||
HRegionLocation getRegionLocation(Text row) {
|
||||
HRegionLocation getRegionLocation(Text row) throws IOException {
|
||||
checkClosed();
|
||||
if (this.tableServers == null) {
|
||||
throw new IllegalStateException("Must open table first");
|
||||
}
|
||||
|
||||
// Only one server will have the row we are looking for
|
||||
Text serverKey = (this.tableServers.containsKey(row)) ?
|
||||
row : this.tableServers.headMap(row).lastKey();
|
||||
return this.tableServers.get(serverKey);
|
||||
return this.connection.locateRegion(this.tableName, row);
|
||||
}
|
||||
|
||||
/**
|
||||
* Find region location hosting passed row using cached info
|
||||
* @param row Row to find.
|
||||
* @return Location of row.
|
||||
*/
|
||||
HRegionLocation getRegionLocation(Text row, boolean reload) throws IOException {
|
||||
checkClosed();
|
||||
return this.connection.relocateRegion(this.tableName, row);
|
||||
}
|
||||
|
||||
|
||||
/** @return the connection */
|
||||
public HConnection getConnection() {
|
||||
checkClosed();
|
||||
|
@ -124,7 +127,6 @@ public class HTable implements HConstants {
|
|||
public synchronized void close() {
|
||||
if (!closed) {
|
||||
closed = true;
|
||||
tableServers = null;
|
||||
batch.set(null);
|
||||
connection.close(tableName);
|
||||
}
|
||||
|
@ -185,14 +187,78 @@ public class HTable implements HConstants {
|
|||
* Gets the starting row key for every region in the currently open table
|
||||
* @return Array of region starting row keys
|
||||
*/
|
||||
public Text[] getStartKeys() {
|
||||
public Text[] getStartKeys() throws IOException {
|
||||
checkClosed();
|
||||
Text[] keys = new Text[tableServers.size()];
|
||||
int i = 0;
|
||||
for(Text key: tableServers.keySet()){
|
||||
keys[i++] = key;
|
||||
List<Text> keyList = new ArrayList<Text>();
|
||||
|
||||
long scannerId = -1L;
|
||||
|
||||
Text startRow = new Text(tableName.toString() + ",,999999999999999");
|
||||
HRegionLocation metaLocation = null;
|
||||
|
||||
// scan over the each meta region
|
||||
do {
|
||||
try{
|
||||
// turn the start row into a location
|
||||
metaLocation =
|
||||
connection.locateRegion(META_TABLE_NAME, startRow);
|
||||
|
||||
// connect to the server hosting the .META. region
|
||||
HRegionInterface server =
|
||||
connection.getHRegionConnection(metaLocation.getServerAddress());
|
||||
|
||||
// open a scanner over the meta region
|
||||
scannerId = server.openScanner(
|
||||
metaLocation.getRegionInfo().getRegionName(),
|
||||
COLUMN_FAMILY_ARRAY, EMPTY_START_ROW, LATEST_TIMESTAMP,
|
||||
null);
|
||||
|
||||
// iterate through the scanner, accumulating unique table names
|
||||
SCANNER_LOOP: while (true) {
|
||||
HbaseMapWritable values = server.next(scannerId);
|
||||
if (values == null || values.size() == 0) {
|
||||
break;
|
||||
}
|
||||
for (Map.Entry<Writable, Writable> e: values.entrySet()) {
|
||||
HStoreKey key = (HStoreKey) e.getKey();
|
||||
if (key.getColumn().equals(COL_REGIONINFO)) {
|
||||
HRegionInfo info = new HRegionInfo();
|
||||
info = (HRegionInfo) Writables.getWritable(
|
||||
((ImmutableBytesWritable) e.getValue()).get(), info);
|
||||
|
||||
if (!info.getTableDesc().getName().equals(this.tableName)) {
|
||||
break SCANNER_LOOP;
|
||||
}
|
||||
|
||||
if (info.isOffline()) {
|
||||
LOG.debug("Region " + info + " was offline!");
|
||||
break;
|
||||
}
|
||||
|
||||
if (info.isSplit()) {
|
||||
LOG.debug("Region " + info + " was split!");
|
||||
break;
|
||||
}
|
||||
|
||||
keyList.add(info.getStartKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// advance the startRow to the end key of the current region
|
||||
startRow = metaLocation.getRegionInfo().getEndKey();
|
||||
} catch (IOException e) {
|
||||
// need retry logic?
|
||||
throw e;
|
||||
}
|
||||
} while (startRow.compareTo(EMPTY_START_ROW) != 0);
|
||||
|
||||
Text[] arr = new Text[keyList.size()];
|
||||
for (int i = 0; i < keyList.size(); i++ ){
|
||||
arr[i] = keyList.get(i);
|
||||
}
|
||||
return keys;
|
||||
|
||||
return arr;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -225,7 +291,7 @@ public class HTable implements HConstants {
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
tableServers = connection.reloadTableServers(tableName);
|
||||
r = getRegionLocation(row, true);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
|
@ -270,7 +336,7 @@ public class HTable implements HConstants {
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
tableServers = connection.reloadTableServers(tableName);
|
||||
r = getRegionLocation(row, true);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
|
@ -326,7 +392,7 @@ public class HTable implements HConstants {
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
tableServers = connection.reloadTableServers(tableName);
|
||||
r = getRegionLocation(row, true);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
|
@ -387,7 +453,7 @@ public class HTable implements HConstants {
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
tableServers = connection.reloadTableServers(tableName);
|
||||
r = getRegionLocation(row, true);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
|
@ -728,7 +794,7 @@ public class HTable implements HConstants {
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
tableServers = connection.reloadTableServers(tableName);
|
||||
r = getRegionLocation(row, true);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
|
@ -765,7 +831,8 @@ public class HTable implements HConstants {
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
tableServers = connection.reloadTableServers(tableName);
|
||||
/* tableServers = connection.reloadTableServers(tableName);*/
|
||||
r = getRegionLocation(row, true);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
|
@ -814,7 +881,7 @@ public class HTable implements HConstants {
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
tableServers = connection.reloadTableServers(tableName);
|
||||
r = getRegionLocation(row, true);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(this.pause);
|
||||
|
@ -900,11 +967,11 @@ public class HTable implements HConstants {
|
|||
e = RemoteExceptionHandler.decodeRemoteException(
|
||||
(RemoteException) e);
|
||||
}
|
||||
if (tries < numRetries -1) {
|
||||
if (tries < numRetries - 1) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
tableServers = connection.reloadTableServers(tableName);
|
||||
r = getRegionLocation(batch.get().getRow(), true);
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
|
@ -945,87 +1012,89 @@ public class HTable implements HConstants {
|
|||
private long scanTime;
|
||||
@SuppressWarnings("hiding")
|
||||
private boolean closed;
|
||||
private AtomicReferenceArray<HRegionLocation> regions;
|
||||
@SuppressWarnings("hiding")
|
||||
private int currentRegion;
|
||||
private HRegionLocation currentRegionLocation;
|
||||
private HRegionInterface server;
|
||||
private long scannerId;
|
||||
private RowFilterInterface filter;
|
||||
|
||||
private void loadRegions() {
|
||||
checkClosed();
|
||||
Text firstServer = null;
|
||||
if (this.startRow == null || this.startRow.getLength() == 0) {
|
||||
firstServer = tableServers.firstKey();
|
||||
|
||||
} else if(tableServers.containsKey(startRow)) {
|
||||
firstServer = startRow;
|
||||
|
||||
} else {
|
||||
firstServer = tableServers.headMap(startRow).lastKey();
|
||||
}
|
||||
Collection<HRegionLocation> info =
|
||||
tableServers.tailMap(firstServer).values();
|
||||
|
||||
this.regions = new AtomicReferenceArray<HRegionLocation>(
|
||||
info.toArray(new HRegionLocation[info.size()]));
|
||||
}
|
||||
|
||||
protected ClientScanner(Text[] columns, Text startRow, long timestamp,
|
||||
RowFilterInterface filter) throws IOException {
|
||||
|
||||
RowFilterInterface filter)
|
||||
throws IOException {
|
||||
|
||||
LOG.info("Creating scanner over " + tableName + " starting at key " + startRow);
|
||||
|
||||
// defaults
|
||||
this.closed = false;
|
||||
this.server = null;
|
||||
this.scannerId = -1L;
|
||||
|
||||
// save off the simple parameters
|
||||
this.columns = columns;
|
||||
this.startRow = startRow;
|
||||
this.scanTime = timestamp;
|
||||
this.closed = false;
|
||||
|
||||
// save the filter, and make sure that the filter applies to the data
|
||||
// we're expecting to pull back
|
||||
this.filter = filter;
|
||||
if (filter != null) {
|
||||
filter.validate(columns);
|
||||
}
|
||||
loadRegions();
|
||||
this.currentRegion = -1;
|
||||
this.server = null;
|
||||
this.scannerId = -1L;
|
||||
|
||||
nextScanner();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Gets a scanner for the next region.
|
||||
* Returns false if there are no more scanners.
|
||||
*/
|
||||
private boolean nextScanner() throws IOException {
|
||||
checkClosed();
|
||||
|
||||
// close the previous scanner if it's open
|
||||
if (this.scannerId != -1L) {
|
||||
this.server.close(this.scannerId);
|
||||
this.scannerId = -1L;
|
||||
}
|
||||
this.currentRegion += 1;
|
||||
if (this.currentRegion == this.regions.length()) {
|
||||
close();
|
||||
return false;
|
||||
}
|
||||
|
||||
// if we're at the end of the table, then close and return false
|
||||
// to stop iterating
|
||||
if (this.currentRegionLocation != null){
|
||||
LOG.debug("Advancing forward from region "
|
||||
+ this.currentRegionLocation.getRegionInfo());
|
||||
|
||||
if (this.currentRegionLocation.getRegionInfo().getEndKey() == null
|
||||
|| this.currentRegionLocation.getRegionInfo().getEndKey().equals(EMPTY_TEXT)) {
|
||||
LOG.debug("We're at the end of the region, returning.");
|
||||
close();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
HRegionLocation oldLocation = this.currentRegionLocation;
|
||||
|
||||
Text localStartKey = oldLocation == null ?
|
||||
startRow : oldLocation.getRegionInfo().getEndKey();
|
||||
|
||||
// advance to the region that starts with the current region's end key
|
||||
LOG.debug("Advancing internal scanner to startKey " + localStartKey);
|
||||
this.currentRegionLocation = getRegionLocation(localStartKey);
|
||||
|
||||
LOG.debug("New region: " + this.currentRegionLocation);
|
||||
|
||||
try {
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
HRegionLocation r = this.regions.get(currentRegion);
|
||||
this.server =
|
||||
connection.getHRegionConnection(r.getServerAddress());
|
||||
// connect to the server
|
||||
server = connection.getHRegionConnection(
|
||||
this.currentRegionLocation.getServerAddress());
|
||||
|
||||
try {
|
||||
if (this.filter == null) {
|
||||
this.scannerId =
|
||||
this.server.openScanner(r.getRegionInfo().getRegionName(),
|
||||
this.columns, currentRegion == 0 ? this.startRow
|
||||
: EMPTY_START_ROW, scanTime, null);
|
||||
// open a scanner on the region server starting at the
|
||||
// beginning of the region
|
||||
scannerId = server.openScanner(
|
||||
this.currentRegionLocation.getRegionInfo().getRegionName(),
|
||||
this.columns, localStartKey, scanTime, filter);
|
||||
|
||||
} else {
|
||||
this.scannerId =
|
||||
this.server.openScanner(r.getRegionInfo().getRegionName(),
|
||||
this.columns, currentRegion == 0 ? this.startRow
|
||||
: EMPTY_START_ROW, scanTime, filter);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
} catch (IOException e) {
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException(
|
||||
|
@ -1038,8 +1107,7 @@ public class HTable implements HConstants {
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
tableServers = connection.reloadTableServers(tableName);
|
||||
loadRegions();
|
||||
currentRegionLocation = getRegionLocation(localStartKey, true);
|
||||
}
|
||||
}
|
||||
try {
|
||||
|
@ -1068,7 +1136,7 @@ public class HTable implements HConstants {
|
|||
// calls to next.
|
||||
results.clear();
|
||||
do {
|
||||
values = this.server.next(this.scannerId);
|
||||
values = server.next(scannerId);
|
||||
} while (values != null && values.size() == 0 && nextScanner());
|
||||
|
||||
if (values != null && values.size() != 0) {
|
||||
|
@ -1089,9 +1157,9 @@ public class HTable implements HConstants {
|
|||
*/
|
||||
public void close() throws IOException {
|
||||
checkClosed();
|
||||
if (this.scannerId != -1L) {
|
||||
if (scannerId != -1L) {
|
||||
try {
|
||||
this.server.close(this.scannerId);
|
||||
server.close(scannerId);
|
||||
|
||||
} catch (IOException e) {
|
||||
if (e instanceof RemoteException) {
|
||||
|
@ -1101,10 +1169,10 @@ public class HTable implements HConstants {
|
|||
throw e;
|
||||
}
|
||||
}
|
||||
this.scannerId = -1L;
|
||||
scannerId = -1L;
|
||||
}
|
||||
this.server = null;
|
||||
this.closed = true;
|
||||
server = null;
|
||||
closed = true;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
|
|
|
@ -136,39 +136,47 @@ public class MultiRegionTable extends HBaseTestCase {
|
|||
Path parentDir = HRegion.getRegionDir(new Path(d, tableName),
|
||||
parent.getEncodedName());
|
||||
assertTrue(fs.exists(parentDir));
|
||||
LOG.info("Split happened. Parent is " + parent.getRegionName() +
|
||||
" and daughters are " +
|
||||
((splitA != null)? splitA.getRegionName(): "-") + ", " +
|
||||
((splitB != null)? splitB.getRegionName(): "-"));
|
||||
|
||||
|
||||
LOG.info("Split happened. Parent is " + parent.getRegionName());
|
||||
|
||||
// Recalibrate will cause us to wait on new regions' deployment
|
||||
recalibrate(t, new Text(columnName), retries, waitTime);
|
||||
|
||||
// Compact a region at a time so we can test case where one region has
|
||||
// no references but the other still has some
|
||||
compact(cluster, splitA);
|
||||
|
||||
// Wait till the parent only has reference to remaining split, one that
|
||||
// still has references.
|
||||
while (true) {
|
||||
data = getSplitParentInfo(meta, parent);
|
||||
if (data == null || data.size() == 3) {
|
||||
try {
|
||||
Thread.sleep(waitTime);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
|
||||
if (splitA == null) {
|
||||
LOG.info("splitA was already null. Assuming it was previously compacted.");
|
||||
} else {
|
||||
LOG.info("Daughter splitA: " + splitA.getRegionName());
|
||||
// Compact a region at a time so we can test case where one region has
|
||||
// no references but the other still has some
|
||||
compact(cluster, splitA);
|
||||
|
||||
// Wait till the parent only has reference to remaining split, one that
|
||||
// still has references.
|
||||
while (true) {
|
||||
data = getSplitParentInfo(meta, parent);
|
||||
if (data == null || data.size() == 3) {
|
||||
try {
|
||||
Thread.sleep(waitTime);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
continue;
|
||||
}
|
||||
continue;
|
||||
break;
|
||||
}
|
||||
break;
|
||||
LOG.info("Parent split info returned " + data.keySet().toString());
|
||||
}
|
||||
LOG.info("Parent split info returned " + data.keySet().toString());
|
||||
|
||||
// Call second split.
|
||||
compact(cluster, splitB);
|
||||
|
||||
// Now wait until parent disappears.
|
||||
|
||||
|
||||
if (splitB == null) {
|
||||
LOG.info("splitB was already null. Assuming it was previously compacted.");
|
||||
} else {
|
||||
LOG.info("Daughter splitB: " + splitA.getRegionName());
|
||||
|
||||
// Call second split.
|
||||
compact(cluster, splitB);
|
||||
}
|
||||
|
||||
// Now wait until parent disappears.
|
||||
LOG.info("Waiting on parent " + parent.getRegionName() + " to disappear");
|
||||
for (int i = 0; i < retries; i++) {
|
||||
if (getSplitParentInfo(meta, parent) == null) {
|
||||
|
|
|
@ -140,6 +140,82 @@ public class TestGet2 extends HBaseTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
/** For HADOOP-2443 */
|
||||
public void testGetClosestRowBefore() throws IOException{
|
||||
|
||||
HRegion region = null;
|
||||
HRegionIncommon region_incommon = null;
|
||||
|
||||
try {
|
||||
HTableDescriptor htd = createTableDescriptor(getName());
|
||||
HRegionInfo hri = new HRegionInfo(htd, null, null);
|
||||
region = createNewHRegion(htd, null, null);
|
||||
region_incommon = new HRegionIncommon(region);
|
||||
|
||||
// set up some test data
|
||||
Text t10 = new Text("010");
|
||||
Text t20 = new Text("020");
|
||||
Text t30 = new Text("030");
|
||||
Text t40 = new Text("040");
|
||||
|
||||
long lockid = region_incommon.startBatchUpdate(t10);
|
||||
region_incommon.put(lockid, COLUMNS[0], "t10 bytes".getBytes());
|
||||
region_incommon.commit(lockid);
|
||||
|
||||
lockid = region_incommon.startBatchUpdate(t20);
|
||||
region_incommon.put(lockid, COLUMNS[0], "t20 bytes".getBytes());
|
||||
region_incommon.commit(lockid);
|
||||
|
||||
lockid = region_incommon.startBatchUpdate(t30);
|
||||
region_incommon.put(lockid, COLUMNS[0], "t30 bytes".getBytes());
|
||||
region_incommon.commit(lockid);
|
||||
|
||||
lockid = region_incommon.startBatchUpdate(t40);
|
||||
region_incommon.put(lockid, COLUMNS[0], "t40 bytes".getBytes());
|
||||
region_incommon.commit(lockid);
|
||||
|
||||
// try finding "015"
|
||||
Text t15 = new Text("015");
|
||||
Map<Text, byte[]> results =
|
||||
region.getClosestRowBefore(t15, HConstants.LATEST_TIMESTAMP);
|
||||
assertEquals(new String(results.get(COLUMNS[0])), "t10 bytes");
|
||||
|
||||
// try "020", we should get that row exactly
|
||||
results = region.getClosestRowBefore(t20, HConstants.LATEST_TIMESTAMP);
|
||||
assertEquals(new String(results.get(COLUMNS[0])), "t20 bytes");
|
||||
|
||||
// try "050", should get stuff from "040"
|
||||
Text t50 = new Text("050");
|
||||
results = region.getClosestRowBefore(t50, HConstants.LATEST_TIMESTAMP);
|
||||
assertEquals(new String(results.get(COLUMNS[0])), "t40 bytes");
|
||||
|
||||
// force a flush
|
||||
region.flushcache();
|
||||
|
||||
// try finding "015"
|
||||
results = region.getClosestRowBefore(t15, HConstants.LATEST_TIMESTAMP);
|
||||
assertEquals(new String(results.get(COLUMNS[0])), "t10 bytes");
|
||||
|
||||
// try "020", we should get that row exactly
|
||||
results = region.getClosestRowBefore(t20, HConstants.LATEST_TIMESTAMP);
|
||||
assertEquals(new String(results.get(COLUMNS[0])), "t20 bytes");
|
||||
|
||||
// try "050", should get stuff from "040"
|
||||
t50 = new Text("050");
|
||||
results = region.getClosestRowBefore(t50, HConstants.LATEST_TIMESTAMP);
|
||||
assertEquals(new String(results.get(COLUMNS[0])), "t40 bytes");
|
||||
} finally {
|
||||
if (region != null) {
|
||||
try {
|
||||
region.close();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
region.getLog().closeAndDelete();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void assertCellValueEquals(final HRegion region, final Text row,
|
||||
final Text column, final long timestamp, final String value)
|
||||
|
|
|
@ -55,7 +55,7 @@ public class TestToString extends TestCase {
|
|||
HRegionInfo hri = HRegionInfo.rootRegionInfo;
|
||||
System.out.println(hri.toString());
|
||||
assertEquals("HRegionInfo",
|
||||
"regionname: -ROOT-,,0, startKey: <>, encodedName(70236052) tableDesc: " +
|
||||
"regionname: -ROOT-,,0, startKey: <>, endKey: <>, encodedName(70236052) tableDesc: " +
|
||||
"{name: -ROOT-, families: {info:={name: info, max versions: 1, " +
|
||||
"compression: NONE, in memory: false, max length: 2147483647, bloom " +
|
||||
"filter: none}}}", hri.toString());
|
||||
|
|
|
@ -126,6 +126,7 @@ public class TestTableIndex extends MultiRegionTable {
|
|||
StaticTestEnvironment.shutdownDfs(dfsCluster);
|
||||
throw e;
|
||||
}
|
||||
LOG.debug("\n\n\n\n\t\t\tSetup Complete\n\n\n\n");
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
|
@ -252,7 +253,7 @@ public class TestTableIndex extends MultiRegionTable {
|
|||
// its snapshot, all the updates have made it into the cache.
|
||||
try {
|
||||
Thread.sleep(conf.getLong("hbase.regionserver.optionalcacheflushinterval",
|
||||
60L * 1000L));
|
||||
60L * 1000L));
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
|
@ -295,13 +296,16 @@ public class TestTableIndex extends MultiRegionTable {
|
|||
int count = 0;
|
||||
while (scanner.next(key, results)) {
|
||||
String value = key.getRow().toString();
|
||||
LOG.debug("Scanned over " + key.getRow());
|
||||
Term term = new Term(rowkeyName, value);
|
||||
int hitCount = searcher.search(new TermQuery(term)).length();
|
||||
assertEquals("check row " + value, 1, hitCount);
|
||||
count++;
|
||||
}
|
||||
int maxDoc = searcher.maxDoc();
|
||||
assertEquals("check number of rows", count, maxDoc);
|
||||
LOG.debug("Searcher.maxDoc: " + searcher.maxDoc());
|
||||
LOG.debug("IndexReader.numDocs: " + ((IndexSearcher)searcher).getIndexReader().numDocs());
|
||||
int maxDoc = ((IndexSearcher)searcher).getIndexReader().numDocs();
|
||||
assertEquals("check number of rows", maxDoc, count);
|
||||
} finally {
|
||||
if (null != searcher)
|
||||
searcher.close();
|
||||
|
|
Loading…
Reference in New Issue