HBASE-7268. Correct local region location cache information can be overwritten w/stale information from an old server (Sergey Shelukhin)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1436583 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Enis Soztutar 2013-01-21 20:07:25 +00:00
parent df99d9db4d
commit f17016bd5b
21 changed files with 395 additions and 123 deletions

View File

@ -344,6 +344,9 @@ public final class HConstants {
/** The startcode column qualifier */
public static final byte [] STARTCODE_QUALIFIER = toBytes("serverstartcode");
/** The open seqnum column qualifier */
public static final byte [] SEQNUM_QUALIFIER = toBytes("seqnumDuringOpen");
/** The lower-half split region column qualifier */
public static final byte [] SPLITA_QUALIFIER = toBytes("splitA");
@ -621,6 +624,12 @@ public final class HConstants {
*/
public static int DEFAULT_HBASE_RPC_TIMEOUT = 60000;
/**
* Value indicating the server name was saved with no sequence number.
*/
public static final long NO_SEQNUM = -1;
/*
* cluster replication constants.
*/

View File

@ -1009,6 +1009,20 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
return new ServerName(hostAndPort, Bytes.toLong(value));
}
/**
* The latest seqnum that the server writing to meta observed when opening the region.
* E.g. the seqNum when the result of {@link #getServerName(Result)} was written.
* @param r Result to pull the seqNum from
* @return SeqNum, or HConstants.NO_SEQNUM if there's no value written.
*/
public static long getSeqNumDuringOpen(final Result r) {
byte[] value = r.getValue(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER);
if (value == null || value.length == 0) return HConstants.NO_SEQNUM;
Long result = Bytes.toLong(value);
if (result == null) return HConstants.NO_SEQNUM;
return result.longValue();
}
/**
* Parses an HRegionInfo instance from the passed in stream. Presumes the HRegionInfo was
* serialized to the stream with {@link #toDelimitedByteArray()}

View File

@ -35,6 +35,7 @@ public class HRegionLocation implements Comparable<HRegionLocation> {
private final HRegionInfo regionInfo;
private final String hostname;
private final int port;
private final long seqNum;
// Cache of the 'toString' result.
private String cachedString = null;
// Cache of the hostname + port
@ -43,14 +44,20 @@ public class HRegionLocation implements Comparable<HRegionLocation> {
/**
* Constructor
* @param regionInfo the HRegionInfo for the region
* @param hostname Hostname
* @param port port
*/
public HRegionLocation(HRegionInfo regionInfo, final String hostname,
final int port) {
final int port, final long seqNum) {
this.regionInfo = regionInfo;
this.hostname = hostname;
this.port = port;
this.seqNum = seqNum;
}
/**
* Test constructor w/o seqNum.
*/
public HRegionLocation(HRegionInfo regionInfo, final String hostname, final int port) {
this(regionInfo, hostname, port, 0);
}
/**
@ -60,7 +67,8 @@ public class HRegionLocation implements Comparable<HRegionLocation> {
public synchronized String toString() {
if (this.cachedString == null) {
this.cachedString = "region=" + this.regionInfo.getRegionNameAsString() +
", hostname=" + this.hostname + ", port=" + this.port;
", hostname=" + this.hostname + ", port=" + this.port
+ ", seqNum=" + seqNum;
}
return this.cachedString;
}
@ -105,6 +113,10 @@ public class HRegionLocation implements Comparable<HRegionLocation> {
return this.port;
}
public long getSeqNum() {
return seqNum;
}
/**
* @return String made of hostname and port formatted as per {@link Addressing#createHostAndPortStr(String, int)}
*/

View File

@ -31,18 +31,22 @@ import org.apache.hadoop.ipc.RemoteException;
@InterfaceStability.Evolving
public class RegionMovedException extends NotServingRegionException {
private static final Log LOG = LogFactory.getLog(RegionMovedException.class);
private static final long serialVersionUID = -7232903522310558397L;
private static final long serialVersionUID = -7232903522310558396L;
private final String hostname;
private final int port;
private final long locationSeqNum;
private static final String HOST_FIELD = "hostname=";
private static final String PORT_FIELD = "port=";
private static final String LOCATIONSEQNUM_FIELD = "locationSeqNum=";
public RegionMovedException(final String hostname, final int port) {
public RegionMovedException(final String hostname, final int port,
final long locationSeqNum) {
super();
this.hostname = hostname;
this.port = port;
this.locationSeqNum = locationSeqNum;
}
public String getHostname() {
@ -53,6 +57,10 @@ public class RegionMovedException extends NotServingRegionException {
return port;
}
public long getLocationSeqNum() {
return locationSeqNum;
}
/**
* For hadoop.ipc internal call. Do NOT use.
* We have to parse the hostname to recreate the exception.
@ -61,24 +69,31 @@ public class RegionMovedException extends NotServingRegionException {
public RegionMovedException(String s) {
int posHostname = s.indexOf(HOST_FIELD) + HOST_FIELD.length();
int posPort = s.indexOf(PORT_FIELD) + PORT_FIELD.length();
int posSeqNum = s.indexOf(LOCATIONSEQNUM_FIELD) + LOCATIONSEQNUM_FIELD.length();
String tmpHostname = null;
int tmpPort = -1;
long tmpSeqNum = HConstants.NO_SEQNUM;
try {
// TODO: this whole thing is extremely brittle.
tmpHostname = s.substring(posHostname, s.indexOf(' ', posHostname));
tmpPort = Integer.parseInt(s.substring(posPort, s.indexOf('.', posPort)));
tmpSeqNum = Long.parseLong(s.substring(posSeqNum, s.indexOf('.', posSeqNum)));
} catch (Exception ignored) {
LOG.warn("Can't parse the hostname and the port from this string: " + s + ", "+
"Continuing");
LOG.warn("Can't parse the hostname and the port from this string: " + s + ", continuing");
}
hostname = tmpHostname;
port = tmpPort;
locationSeqNum = tmpSeqNum;
}
@Override
public String getMessage() {
return "Region moved to: " + HOST_FIELD + hostname + " " + PORT_FIELD + port + ".";
// TODO: deserialization above depends on this. That is bad, but also means this
// should be modified carefully.
return "Region moved to: " + HOST_FIELD + hostname + " " + PORT_FIELD + port + ". As of "
+ LOCATIONSEQNUM_FIELD + locationSeqNum + ".";
}
/**

View File

@ -175,8 +175,8 @@ public class MetaEditor {
* Adds a (single) META row for the specified new region and its daughters. Note that this does
* not add its daughter's as different rows, but adds information about the daughters
* in the same row as the parent. Use
* {@link #offlineParentInMeta(CatalogTracker, HRegionInfo, HRegionInfo, HRegionInfo)}
* and {@link #addDaughter(CatalogTracker, HRegionInfo, ServerName)} if you want to do that.
* {@link #offlineParentInMeta(CatalogTracker, HRegionInfo, HRegionInfo, HRegionInfo)} and
* {@link #addDaughter(CatalogTracker, HRegionInfo, ServerName, long)} if you want to do that.
* @param meta the HTable for META
* @param regionInfo region information
* @param splitA first split daughter of the parent regionInfo
@ -236,12 +236,20 @@ public class MetaEditor {
}
}
/**
* Adds a daughter region entry to meta.
* @param regionInfo the region to put
* @param sn the location of the region
* @param openSeqNum the latest sequence number obtained when the region was open
*/
public static void addDaughter(final CatalogTracker catalogTracker,
final HRegionInfo regionInfo, final ServerName sn)
final HRegionInfo regionInfo, final ServerName sn, final long openSeqNum)
throws NotAllMetaRegionsOnlineException, IOException {
Put put = new Put(regionInfo.getRegionName());
addRegionInfo(put, regionInfo);
if (sn != null) addLocation(put, sn);
if (sn != null) {
addLocation(put, sn, openSeqNum);
}
putToMetaTable(catalogTracker, put);
LOG.info("Added daughter " + regionInfo.getRegionNameAsString() +
(sn == null? ", serverName=null": ", serverName=" + sn.toString()));
@ -257,15 +265,16 @@ public class MetaEditor {
* @param catalogTracker catalog tracker
* @param regionInfo region to update location of
* @param sn Server name
* @param openSeqNum the latest sequence number obtained when the region was open
* @throws IOException
* @throws ConnectException Usually because the regionserver carrying .META.
* is down.
* @throws NullPointerException Because no -ROOT- server connection
*/
public static void updateMetaLocation(CatalogTracker catalogTracker,
HRegionInfo regionInfo, ServerName sn)
HRegionInfo regionInfo, ServerName sn, long openSeqNum)
throws IOException, ConnectException {
updateLocation(catalogTracker, regionInfo, sn);
updateLocation(catalogTracker, regionInfo, sn, openSeqNum);
}
/**
@ -281,9 +290,9 @@ public class MetaEditor {
* @throws IOException
*/
public static void updateRegionLocation(CatalogTracker catalogTracker,
HRegionInfo regionInfo, ServerName sn)
HRegionInfo regionInfo, ServerName sn, long updateSeqNum)
throws IOException {
updateLocation(catalogTracker, regionInfo, sn);
updateLocation(catalogTracker, regionInfo, sn, updateSeqNum);
}
/**
@ -295,14 +304,15 @@ public class MetaEditor {
* @param catalogTracker
* @param regionInfo region to update location of
* @param sn Server name
* @param openSeqNum the latest sequence number obtained when the region was open
* @throws IOException In particular could throw {@link java.net.ConnectException}
* if the server is down on other end.
*/
private static void updateLocation(final CatalogTracker catalogTracker,
HRegionInfo regionInfo, ServerName sn)
HRegionInfo regionInfo, ServerName sn, long openSeqNum)
throws IOException {
Put put = new Put(regionInfo.getRegionName());
addLocation(put, sn);
addLocation(put, sn, openSeqNum);
putToCatalogTable(catalogTracker, put);
LOG.info("Updated row " + regionInfo.getRegionNameAsString() +
" with server=" + sn);
@ -348,11 +358,13 @@ public class MetaEditor {
return p;
}
private static Put addLocation(final Put p, final ServerName sn) {
private static Put addLocation(final Put p, final ServerName sn, long openSeqNum) {
p.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
Bytes.toBytes(sn.getHostAndPort()));
p.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
Bytes.toBytes(sn.getStartcode()));
p.add(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
Bytes.toBytes(openSeqNum));
return p;
}
}

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.GetTableDe
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
import org.apache.hadoop.hbase.util.Triple;
@ -956,8 +957,8 @@ public class HConnectionManager {
LOG.debug("Looked up root region location, connection=" + this +
"; serverName=" + ((servername == null) ? "null" : servername));
if (servername == null) return null;
return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO,
servername.getHostname(), servername.getPort());
return new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, servername.getHostname(),
servername.getPort(), 0);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
@ -1006,10 +1007,9 @@ public class HConnectionManager {
}
// instantiate the location
HRegionLocation loc = new HRegionLocation(regionInfo, serverName.getHostname(),
serverName.getPort());
serverName.getPort(), HRegionInfo.getSeqNumDuringOpen(result));
// cache this meta entry
cacheLocation(tableName, loc);
cacheLocation(tableName, null, loc);
return true;
} catch (RuntimeException e) {
throw new IOException(e);
@ -1131,9 +1131,9 @@ public class HConnectionManager {
}
// Instantiate the location
location =
new HRegionLocation(regionInfo, serverName.getHostname(), serverName.getPort());
cacheLocation(tableName, location);
location = new HRegionLocation(regionInfo, serverName.getHostname(),
serverName.getPort(), HRegionInfo.getSeqNumDuringOpen(regionInfoRow));
cacheLocation(tableName, null, location);
return location;
} catch (TableNotFoundException e) {
// if we got this error, probably means the table just plain doesn't
@ -1226,24 +1226,25 @@ public class HConnectionManager {
* @param row
*/
void deleteCachedLocation(final byte [] tableName, final byte [] row) {
HRegionLocation rl = null;
synchronized (this.cachedRegionLocations) {
Map<byte[], HRegionLocation> tableLocations =
getTableLocations(tableName);
// start to examine the cache. we can only do cache actions
// if there's something in the cache for this table.
if (!tableLocations.isEmpty()) {
HRegionLocation rl = getCachedLocation(tableName, row);
rl = getCachedLocation(tableName, row);
if (rl != null) {
tableLocations.remove(rl.getRegionInfo().getStartKey());
if (LOG.isDebugEnabled()) {
LOG.debug("Removed " +
rl.getRegionInfo().getRegionNameAsString() +
" for tableName=" + Bytes.toString(tableName) +
" from cache " + "because of " + Bytes.toStringBinary(row));
}
}
}
}
if ((rl != null) && LOG.isDebugEnabled()) {
LOG.debug("Removed " + rl.getHostname() + ":" + rl.getPort()
+ " as a location of " + rl.getRegionInfo().getRegionNameAsString() +
" for tableName=" + Bytes.toString(tableName) +
" from cache because of " + Bytes.toStringBinary(row));
}
}
@Override
@ -1315,23 +1316,52 @@ public class HConnectionManager {
}
}
/*
/**
* Put a newly discovered HRegionLocation into the cache.
* @param tableName The table name.
* @param source the source of the new location, if it's not coming from meta
* @param location the new location
*/
private void cacheLocation(final byte [] tableName,
private void cacheLocation(final byte [] tableName, final HRegionLocation source,
final HRegionLocation location) {
boolean isFromMeta = (source == null);
byte [] startKey = location.getRegionInfo().getStartKey();
Map<byte [], HRegionLocation> tableLocations =
getTableLocations(tableName);
boolean hasNewCache;
boolean isNewCacheEntry = false;
boolean isStaleUpdate = false;
HRegionLocation oldLocation = null;
synchronized (this.cachedRegionLocations) {
cachedServers.add(location.getHostnamePort());
hasNewCache = (tableLocations.put(startKey, location) == null);
oldLocation = tableLocations.get(startKey);
isNewCacheEntry = (oldLocation == null);
// If the server in cache sends us a redirect, assume it's always valid.
if (!isNewCacheEntry && !oldLocation.equals(source)) {
long newLocationSeqNum = location.getSeqNum();
// Meta record is stale - some (probably the same) server has closed the region
// with later seqNum and told us about the new location.
boolean isStaleMetaRecord = isFromMeta && (oldLocation.getSeqNum() > newLocationSeqNum);
// Same as above for redirect. However, in this case, if the number is equal to previous
// record, the most common case is that first the region was closed with seqNum, and then
// opened with the same seqNum; hence we will ignore the redirect.
// There are so many corner cases with various combinations of opens and closes that
// an additional counter on top of seqNum would be necessary to handle them all.
boolean isStaleRedirect = !isFromMeta && (oldLocation.getSeqNum() >= newLocationSeqNum);
isStaleUpdate = (isStaleMetaRecord || isStaleRedirect);
}
if (!isStaleUpdate) {
tableLocations.put(startKey, location);
}
}
if (hasNewCache) {
if (isNewCacheEntry) {
LOG.debug("Cached location for " +
location.getRegionInfo().getRegionNameAsString() +
" is " + location.getHostnamePort());
} else if (isStaleUpdate && !location.equals(oldLocation)) {
LOG.debug("Ignoring stale location update for "
+ location.getRegionInfo().getRegionNameAsString() + ": "
+ location.getHostnamePort() + " at " + location.getSeqNum() + "; local "
+ oldLocation.getHostnamePort() + " at " + oldLocation.getSeqNum());
}
}
@ -1734,61 +1764,65 @@ public class HConnectionManager {
};
}
void updateCachedLocation(HRegionLocation hrl, String hostname, int port) {
HRegionLocation newHrl = new HRegionLocation(hrl.getRegionInfo(), hostname, port);
void updateCachedLocation(HRegionInfo hri, HRegionLocation source,
String hostname, int port, long seqNum) {
HRegionLocation newHrl = new HRegionLocation(hri, hostname, port, seqNum);
synchronized (this.cachedRegionLocations) {
cacheLocation(hrl.getRegionInfo().getTableName(), newHrl);
cacheLocation(hri.getTableName(), source, newHrl);
}
}
void deleteCachedLocation(HRegionLocation rl) {
void deleteCachedLocation(HRegionInfo hri, HRegionLocation source) {
boolean isStaleDelete = false;
HRegionLocation oldLocation = null;
synchronized (this.cachedRegionLocations) {
Map<byte[], HRegionLocation> tableLocations =
getTableLocations(rl.getRegionInfo().getTableName());
tableLocations.remove(rl.getRegionInfo().getStartKey());
getTableLocations(hri.getTableName());
oldLocation = tableLocations.get(hri.getStartKey());
// Do not delete the cache entry if it's not for the same server that gave us the error.
isStaleDelete = (source != null) && !oldLocation.equals(source);
if (!isStaleDelete) {
tableLocations.remove(hri.getStartKey());
}
}
if (isStaleDelete) {
LOG.debug("Received an error from " + source.getHostnamePort() + " for region "
+ hri.getRegionNameAsString() + "; not removing "
+ oldLocation.getHostnamePort() + " from cache.");
}
}
private void updateCachedLocations(byte[] tableName, Row row, Object t) {
updateCachedLocations(null, tableName, row, t);
}
/**
* Update the location with the new value (if the exception is a RegionMovedException) or delete
* it from the cache.
* @param hrl - can be null. If it's the case, tableName and row should not be null
* @param tableName - can be null if hrl is not null.
* @param row - can be null if hrl is not null.
* @param exception - An object (to simplify user code) on which we will try to find a nested
* Update the location with the new value (if the exception is a RegionMovedException)
* or delete it from the cache.
* @param exception an object (to simplify user code) on which we will try to find a nested
* or wrapped or both RegionMovedException
* @param source server that is the source of the location update.
*/
private void updateCachedLocations(final HRegionLocation hrl, final byte[] tableName,
Row row, final Object exception) {
if ((row == null || tableName == null) && hrl == null) {
private void updateCachedLocations(final byte[] tableName, Row row,
final Object exception, final HRegionLocation source) {
if (row == null || tableName == null) {
LOG.warn("Coding error, see method javadoc. row=" + (row == null ? "null" : row) +
", tableName=" + (tableName == null ? "null" : Bytes.toString(tableName) +
", hrl= null"));
", tableName=" + (tableName == null ? "null" : Bytes.toString(tableName)));
return;
}
// Is it something we have already updated?
final HRegionLocation myLoc = (hrl != null ?
hrl : getCachedLocation(tableName, row.getRow()));
if (myLoc == null) {
final HRegionLocation oldLocation = getCachedLocation(tableName, row.getRow());
if (oldLocation == null) {
// There is no such location in the cache => it's been removed already => nothing to do
return;
}
HRegionInfo regionInfo = oldLocation.getRegionInfo();
final RegionMovedException rme = RegionMovedException.find(exception);
if (rme != null) {
LOG.info("Region " + myLoc.getRegionInfo().getRegionNameAsString() + " moved from " +
myLoc.getHostnamePort() + ", updating client location cache." +
" New server: " + rme.getHostname() + ":" + rme.getPort());
updateCachedLocation(myLoc, rme.getHostname(), rme.getPort());
LOG.info("Region " + regionInfo.getRegionNameAsString() + " moved to " +
rme.getHostname() + ":" + rme.getPort() + " according to " + source.getHostnamePort());
updateCachedLocation(
regionInfo, source, rme.getHostname(), rme.getPort(), rme.getLocationSeqNum());
} else {
deleteCachedLocation(myLoc);
deleteCachedLocation(regionInfo, source);
}
}
@ -1998,7 +2032,7 @@ public class HConnectionManager {
for (List<Action<R>> actions : currentTask.getFirst().actions.values()) {
for (Action<R> action : actions) {
Row row = action.getAction();
hci.updateCachedLocations(this.tableName, row, exception);
hci.updateCachedLocations(tableName, row, exception, currentTask.getSecond());
if (noRetry) {
errors.add(exception, row, currentTask);
} else {
@ -2024,7 +2058,7 @@ public class HConnectionManager {
// Failure: retry if it's make sense else update the errors lists
if (result == null || result instanceof Throwable) {
Row row = correspondingAction.getAction();
hci.updateCachedLocations(this.tableName, row, result);
hci.updateCachedLocations(this.tableName, row, result, currentTask.getSecond());
if (result instanceof DoNotRetryIOException || noRetry) {
errors.add((Exception)result, row, currentTask);
} else {

View File

@ -22,7 +22,10 @@ package org.apache.hadoop.hbase.client;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@ -116,6 +119,24 @@ extends RetriesExhaustedException {
return s;
}
public String getExhaustiveDescription() {
StringWriter errorWriter = new StringWriter();
for (int i = 0; i < this.exceptions.size(); ++i) {
Throwable t = this.exceptions.get(i);
Row action = this.actions.get(i);
String server = this.hostnameAndPort.get(i);
errorWriter.append("Error #" + i + " from [" + server + "] for ["
+ ((action == null) ? "unknown key" : Bytes.toStringBinary(action.getRow())) + "]");
if (t != null) {
PrintWriter pw = new PrintWriter(errorWriter);
t.printStackTrace(pw);
pw.flush();
}
}
return errorWriter.toString();
}
public static Map<String, Integer> classifyExs(List<Throwable> ths) {
Map<String, Integer> cls = new HashMap<String, Integer>();
for (Throwable t : ths) {

View File

@ -356,7 +356,7 @@ public class ServerShutdownHandler extends EventHandler {
if (daughter == null) return 0;
if (isDaughterMissing(catalogTracker, daughter)) {
LOG.info("Fixup; missing daughter " + daughter.getRegionNameAsString());
MetaEditor.addDaughter(catalogTracker, daughter, null);
MetaEditor.addDaughter(catalogTracker, daughter, null, HConstants.NO_SEQNUM);
// TODO: Log WARN if the regiondir does not exist in the fs. If its not
// there then something wonky about the split -- things will keep going

View File

@ -283,13 +283,17 @@ public class HRegion implements HeapSize { // , Writable{
private final ConcurrentHashMap<RegionScanner, Long> scannerReadPoints;
/**
* The sequence ID that was encountered when this region was opened.
*/
private long openSeqNum = HConstants.NO_SEQNUM;
/**
* The default setting for whether to enable on-demand CF loading for
* scan requests to this region. Requests can override it.
*/
private boolean isLoadingCfsOnDemandDefault = false;
/**
* @return The smallest mvcc readPoint across all the scanners in this
* region. Writes older than this readPoint, are included in every
@ -4037,10 +4041,11 @@ public class HRegion implements HeapSize { // , Writable{
throws IOException {
checkCompressionCodecs();
long seqid = initialize(reporter);
this.openSeqNum = initialize(reporter);
if (this.log != null) {
this.log.setSequenceNumber(seqid);
this.log.setSequenceNumber(this.openSeqNum);
}
return this;
}
@ -4990,7 +4995,7 @@ public class HRegion implements HeapSize { // , Writable{
ClassSize.OBJECT +
ClassSize.ARRAY +
39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(9 * Bytes.SIZEOF_LONG) +
(10 * Bytes.SIZEOF_LONG) +
Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = FIXED_OVERHEAD +
@ -5453,6 +5458,13 @@ public class HRegion implements HeapSize { // , Writable{
}
}
/**
* Gets the latest sequence number that was read from storage when this region was opened.
*/
public long getOpenSeqNum() {
return this.openSeqNum;
}
/**
* Listener class to enable callers of
* bulkLoadHFile() to perform any necessary
@ -5484,6 +5496,5 @@ public class HRegion implements HeapSize { // , Writable{
* @throws IOException
*/
void failedBulkLoad(byte[] family, String srcPath) throws IOException;
}
}

View File

@ -1685,21 +1685,27 @@ public class HRegionServer implements ClientProtocol,
getCompactionRequester().requestCompaction(r, s, "Opening Region");
}
}
long openSeqNum = r.getOpenSeqNum();
if (openSeqNum == HConstants.NO_SEQNUM) {
// If we opened a region, we should have read some sequence number from it.
LOG.error("No sequence number found when opening " + r.getRegionNameAsString());
openSeqNum = 0;
}
// Update ZK, ROOT or META
if (r.getRegionInfo().isRootRegion()) {
RootRegionTracker.setRootLocation(getZooKeeper(),
this.serverNameFromMasterPOV);
} else if (r.getRegionInfo().isMetaRegion()) {
MetaEditor.updateMetaLocation(ct, r.getRegionInfo(),
this.serverNameFromMasterPOV);
this.serverNameFromMasterPOV, openSeqNum);
} else {
if (daughter) {
// If daughter of a split, update whole row, not just location.
MetaEditor.addDaughter(ct, r.getRegionInfo(),
this.serverNameFromMasterPOV);
this.serverNameFromMasterPOV, openSeqNum);
} else {
MetaEditor.updateRegionLocation(ct, r.getRegionInfo(),
this.serverNameFromMasterPOV);
this.serverNameFromMasterPOV, openSeqNum);
}
}
LOG.info("Done with post open deploy task for region=" +
@ -2502,11 +2508,20 @@ public class HRegionServer implements ClientProtocol,
@Override
public boolean removeFromOnlineRegions(final String encodedRegionName, ServerName destination) {
HRegion toReturn = this.onlineRegions.remove(encodedRegionName);
public boolean removeFromOnlineRegions(final HRegion r, ServerName destination) {
HRegion toReturn = this.onlineRegions.remove(r.getRegionInfo().getEncodedName());
if (destination != null){
addToMovedRegions(encodedRegionName, destination);
if (destination != null) {
HLog wal = getWAL();
long closeSeqNum = wal.getEarliestMemstoreSeqNum(r.getRegionInfo().getEncodedNameAsBytes());
if (closeSeqNum == HConstants.NO_SEQNUM) {
// No edits in WAL for this region; get the sequence number when the region was opened.
closeSeqNum = r.getOpenSeqNum();
if (closeSeqNum == HConstants.NO_SEQNUM) {
closeSeqNum = 0;
}
}
addToMovedRegions(r.getRegionInfo().getEncodedName(), destination, closeSeqNum);
}
return toReturn != null;
@ -2528,12 +2543,12 @@ public class HRegionServer implements ClientProtocol,
protected HRegion getRegionByEncodedName(String encodedRegionName)
throws NotServingRegionException {
HRegion region = this.onlineRegions.get(encodedRegionName);
if (region == null) {
ServerName sn = getMovedRegion(encodedRegionName);
if (sn != null) {
throw new RegionMovedException(sn.getHostname(), sn.getPort());
MovedRegionInfo moveInfo = getMovedRegion(encodedRegionName);
if (moveInfo != null) {
throw new RegionMovedException(moveInfo.getServerName().getHostname(),
moveInfo.getServerName().getPort(), moveInfo.getSeqNum());
} else {
throw new NotServingRegionException("Region is not online: " + encodedRegionName);
}
@ -3371,7 +3386,7 @@ public class HRegionServer implements ClientProtocol,
} else {
LOG.warn("The region " + region.getEncodedName() + " is online on this server" +
" but META does not have this server - continue opening.");
removeFromOnlineRegions(region.getEncodedName(), null);
removeFromOnlineRegions(onlineRegion, null);
}
}
LOG.info("Received request to open region: " + region.getRegionNameAsString() + " on "
@ -3850,34 +3865,55 @@ public class HRegionServer implements ClientProtocol,
region.mutateRow(rm);
}
private static class MovedRegionInfo {
private final ServerName serverName;
private final long seqNum;
private final long ts;
public MovedRegionInfo(ServerName serverName, long closeSeqNum) {
this.serverName = serverName;
this.seqNum = closeSeqNum;
ts = EnvironmentEdgeManager.currentTimeMillis();
}
public ServerName getServerName() {
return serverName;
}
public long getSeqNum() {
return seqNum;
}
public long getMoveTime() {
return ts;
}
}
// This map will contains all the regions that we closed for a move.
// We add the time it was moved as we don't want to keep too old information
protected Map<String, Pair<Long, ServerName>> movedRegions =
new ConcurrentHashMap<String, Pair<Long, ServerName>>(3000);
protected Map<String, MovedRegionInfo> movedRegions =
new ConcurrentHashMap<String, MovedRegionInfo>(3000);
// We need a timeout. If not there is a risk of giving a wrong information: this would double
// the number of network calls instead of reducing them.
private static final int TIMEOUT_REGION_MOVED = (2 * 60 * 1000);
protected void addToMovedRegions(HRegionInfo hri, ServerName destination){
addToMovedRegions(hri.getEncodedName(), destination);
}
protected void addToMovedRegions(String encodedName, ServerName destination){
final Long time = System.currentTimeMillis();
protected void addToMovedRegions(String encodedName, ServerName destination, long closeSeqNum) {
LOG.info("Adding moved region record: " + encodedName + " to "
+ destination.getServerName() + ":" + destination.getPort()
+ " as of " + closeSeqNum);
movedRegions.put(
encodedName,
new Pair<Long, ServerName>(time, destination));
new MovedRegionInfo(destination, closeSeqNum));
}
private ServerName getMovedRegion(final String encodedRegionName) {
Pair<Long, ServerName> dest = movedRegions.get(encodedRegionName);
private MovedRegionInfo getMovedRegion(final String encodedRegionName) {
MovedRegionInfo dest = movedRegions.get(encodedRegionName);
long now = EnvironmentEdgeManager.currentTimeMillis();
if (dest != null) {
if (dest.getFirst() > (System.currentTimeMillis() - TIMEOUT_REGION_MOVED)) {
return dest.getSecond();
if (dest.getMoveTime() > (now - TIMEOUT_REGION_MOVED)) {
return dest;
} else {
movedRegions.remove(encodedRegionName);
}
@ -3891,11 +3927,11 @@ public class HRegionServer implements ClientProtocol,
*/
protected void cleanMovedRegions(){
final long cutOff = System.currentTimeMillis() - TIMEOUT_REGION_MOVED;
Iterator<Entry<String, Pair<Long, ServerName>>> it = movedRegions.entrySet().iterator();
Iterator<Entry<String, MovedRegionInfo>> it = movedRegions.entrySet().iterator();
while (it.hasNext()){
Map.Entry<String, Pair<Long, ServerName>> e = it.next();
if (e.getValue().getFirst() < cutOff){
Map.Entry<String, MovedRegionInfo> e = it.next();
if (e.getValue().getMoveTime() < cutOff) {
it.remove();
}
}

View File

@ -40,11 +40,11 @@ interface OnlineRegions extends Server {
/**
* This method removes HRegion corresponding to hri from the Map of onlineRegions.
*
* @param encodedRegionName
* @param destination - destination, if any. Null otherwise
* @param r Region to remove.
* @param destination Destination, if any, null otherwise.
* @return True if we removed a region from online list.
*/
public boolean removeFromOnlineRegions(String encodedRegionName, ServerName destination);
public boolean removeFromOnlineRegions(final HRegion r, ServerName destination);
/**
* Return {@link HRegion} instance.

View File

@ -296,7 +296,7 @@ public class SplitTransaction {
throw new IOException(errorMsg);
}
if (!testing) {
services.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName(), null);
services.removeFromOnlineRegions(this.parent, null);
}
this.journal.add(JournalEntry.OFFLINED_PARENT);

View File

@ -160,7 +160,7 @@ public class CloseRegionHandler extends EventHandler {
throw new RuntimeException(t);
}
this.rsServices.removeFromOnlineRegions(regionInfo.getEncodedName(), destination);
this.rsServices.removeFromOnlineRegions(region, destination);
if (this.zk) {
if (setClosedState(this.expectedVersion, region)) {

View File

@ -443,7 +443,7 @@ class FSHLog implements HLog, Syncable {
!this.logSeqNum.compareAndSet(id, newvalue); id = this.logSeqNum.get()) {
// This could spin on occasion but better the occasional spin than locking
// every increment of sequence number.
LOG.debug("Changed sequenceid from " + logSeqNum + " to " + newvalue);
LOG.debug("Changed sequenceid from " + id + " to " + newvalue);
}
}
@ -1415,6 +1415,12 @@ class FSHLog implements HLog, Syncable {
return lastDeferredTxid > syncedTillHere;
}
@Override
public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
Long result = lastSeqWritten.get(encodedRegionName);
return result == null ? HConstants.NO_SEQNUM : result.longValue();
}
/**
* Pass one or more log file names and it will either dump out a text version
* on <code>stdout</code> or split the specified log files.

View File

@ -398,4 +398,11 @@ public interface HLog {
* @return lowReplicationRollEnabled
*/
public boolean isLowReplicationRollEnabled();
/** Gets the earliest sequence number in the memstore for this particular region.
* This can serve as best-effort "recent" WAL number for this region.
* @param encodedRegionName The region to get the number for.
* @return The number if present, HConstants.NO_SEQNUM if absent.
*/
public long getEarliestMemstoreSeqNum(byte[] encodedRegionName);
}

View File

@ -46,13 +46,16 @@ public class TestHRegionLocation {
hsa1.getHostname(), hsa1.getPort());
assertNotSame(hrl1, hrl3);
// They are equal because they have same location even though they are
// carrying different regions.
// carrying different regions or timestamp.
assertTrue(hrl1.equals(hrl3));
ServerName hsa2 = new ServerName("localhost", 12345, -1L);
HRegionLocation hrl4 = new HRegionLocation(HRegionInfo.ROOT_REGIONINFO,
hsa2.getHostname(), hsa2.getPort());
// These have same HRI but different locations so should be different.
assertFalse(hrl3.equals(hrl4));
HRegionLocation hrl5 = new HRegionLocation(hrl4.getRegionInfo(),
hrl4.getHostname(), hrl4.getPort(), hrl4.getSeqNum() + 1);
assertTrue(hrl4.equals(hrl5));
}
@Test

View File

@ -451,7 +451,7 @@ public class TestCatalogTracker {
// been assigned).
String node = ct.getMetaNodeTracker().getNode();
ZKUtil.createAndFailSilent(this.watcher, node);
MetaEditor.updateMetaLocation(ct, HRegionInfo.FIRST_META_REGIONINFO, SN);
MetaEditor.updateMetaLocation(ct, HRegionInfo.FIRST_META_REGIONINFO, SN, 0);
ZKUtil.deleteNode(this.watcher, node);
// Go get the new meta location. waitForMeta gets and verifies meta.
Assert.assertTrue(ct.waitForMeta(10000).equals(SN));

View File

@ -61,6 +61,7 @@ public class TestHCM {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final byte[] TABLE_NAME = Bytes.toBytes("test");
private static final byte[] TABLE_NAME1 = Bytes.toBytes("test1");
private static final byte[] TABLE_NAME2 = Bytes.toBytes("test2");
private static final byte[] FAM_NAM = Bytes.toBytes("f");
private static final byte[] ROW = Bytes.toBytes("bbb");
@ -149,7 +150,9 @@ public class TestHCM {
Bytes.toString(TABLE_NAME).getBytes() , Bytes.toString(ROW).getBytes()));
final int nextPort = conn.getCachedLocation(TABLE_NAME, ROW).getPort() + 1;
conn.updateCachedLocation(conn.getCachedLocation(TABLE_NAME, ROW), "127.0.0.1", nextPort);
HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW);
conn.updateCachedLocation(loc.getRegionInfo(), loc, "127.0.0.1", nextPort,
HConstants.LATEST_TIMESTAMP);
Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW).getPort(), nextPort);
conn.deleteCachedLocation(TABLE_NAME.clone(), ROW.clone());
@ -293,7 +296,7 @@ public class TestHCM {
}
Assert.assertFalse("Unreachable point", true);
} catch (Throwable e) {
LOG.info("Put done, expected exception caught: " + e.getClass());
LOG.info("Scan done, expected exception caught: " + e.getClass());
}
// Cache is updated with the right value.
@ -330,6 +333,54 @@ public class TestHCM {
pool.shutdownNow();
}
/**
* Test that stale cache updates don't override newer cached values.
*/
@Test(timeout = 60000)
public void testCacheSeqNums() throws Exception{
HTable table = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAM);
TEST_UTIL.createMultiRegions(table, FAM_NAM);
Put put = new Put(ROW);
put.add(FAM_NAM, ROW, ROW);
table.put(put);
HConnectionManager.HConnectionImplementation conn =
(HConnectionManager.HConnectionImplementation)table.getConnection();
HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW);
assertNotNull(location);
HRegionLocation anySource = new HRegionLocation(location.getRegionInfo(),
location.getHostname(), location.getPort() - 1);
// Same server as already in cache reporting - overwrites any value despite seqNum.
int nextPort = location.getPort() + 1;
conn.updateCachedLocation(location.getRegionInfo(), location,
"127.0.0.1", nextPort, location.getSeqNum() - 1);
location = conn.getCachedLocation(TABLE_NAME2, ROW);
Assert.assertEquals(nextPort, location.getPort());
// No source specified - same.
nextPort = location.getPort() + 1;
conn.updateCachedLocation(location.getRegionInfo(), location,
"127.0.0.1", nextPort, location.getSeqNum() - 1);
location = conn.getCachedLocation(TABLE_NAME2, ROW);
Assert.assertEquals(nextPort, location.getPort());
// Higher seqNum - overwrites lower seqNum.
nextPort = location.getPort() + 1;
conn.updateCachedLocation(location.getRegionInfo(), anySource,
"127.0.0.1", nextPort, location.getSeqNum() + 1);
location = conn.getCachedLocation(TABLE_NAME2, ROW);
Assert.assertEquals(nextPort, location.getPort());
// Lower seqNum - does not overwrite higher seqNum.
nextPort = location.getPort() + 1;
conn.updateCachedLocation(location.getRegionInfo(), anySource,
"127.0.0.1", nextPort, location.getSeqNum() - 1);
location = conn.getCachedLocation(TABLE_NAME2, ROW);
Assert.assertEquals(nextPort - 1, location.getPort());
}
/**
* Make sure that {@link Configuration} instances that are essentially the
* same map to the same {@link HConnection} instance.

View File

@ -232,7 +232,7 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer
}
@Override
public boolean removeFromOnlineRegions(String encodedRegionName, ServerName destination) {
public boolean removeFromOnlineRegions(HRegion r, ServerName destination) {
// TODO Auto-generated method stub
return false;
}

View File

@ -60,8 +60,8 @@ public class MockRegionServerServices implements RegionServerServices {
}
@Override
public boolean removeFromOnlineRegions(String encodedRegionName, ServerName destination) {
return this.regions.remove(encodedRegionName) != null;
public boolean removeFromOnlineRegions(HRegion r, ServerName destination) {
return this.regions.remove(r.getRegionInfo().getEncodedName()) != null;
}
@Override

View File

@ -17,6 +17,8 @@
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashSet;
import java.util.Map;
import java.util.PriorityQueue;
@ -31,8 +33,12 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.test.LoadTestDataGenerator;
@ -170,10 +176,45 @@ public class MultiThreadedWriter extends MultiThreadedAction {
totalOpTimeMs.addAndGet(System.currentTimeMillis() - start);
} catch (IOException e) {
failedKeySet.add(keyBase);
LOG.error("Failed to insert: " + keyBase);
e.printStackTrace();
String exceptionInfo;
if (e instanceof RetriesExhaustedWithDetailsException) {
RetriesExhaustedWithDetailsException aggEx = (RetriesExhaustedWithDetailsException)e;
exceptionInfo = aggEx.getExhaustiveDescription();
} else {
StringWriter stackWriter = new StringWriter();
PrintWriter pw = new PrintWriter(stackWriter);
e.printStackTrace(pw);
pw.flush();
exceptionInfo = StringUtils.stringifyException(e);
}
LOG.error("Failed to insert: " + keyBase + "; region information: "
+ getRegionDebugInfoSafe(put.getRow()) + "; errors: "
+ exceptionInfo);
}
}
private String getRegionDebugInfoSafe(byte[] rowKey) {
HRegionLocation cached = null, real = null;
try {
cached = table.getRegionLocation(rowKey, false);
real = table.getRegionLocation(rowKey, true);
} catch (Throwable t) {
// Cannot obtain region information for another catch block - too bad!
}
String result = "no information can be obtained";
if (cached != null) {
result = "cached: " + cached.toString();
}
if (real != null) {
if (real.equals(cached)) {
result += "; cache is up to date";
} else {
result = (cached != null) ? (result + "; ") : "";
result += "real: " + real.toString();
}
}
return result;
}
}
/**