HBASE-1099 Regions assigned while master is splitting logs of recently crashed server; regionserver tries to execute incomplete log

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@732491 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-01-07 21:03:20 +00:00
parent 18815c8879
commit 079c580042
8 changed files with 160 additions and 86 deletions

View File

@ -131,6 +131,8 @@ Release 0.19.0 - Unreleased
HBASE-1083 Will keep scheduling major compactions if last time one ran, we HBASE-1083 Will keep scheduling major compactions if last time one ran, we
didn't. didn't.
HBASE-1101 NPE in HConnectionManager$TableServers.processBatchOfRows HBASE-1101 NPE in HConnectionManager$TableServers.processBatchOfRows
HBASE-1099 Regions assigned while master is splitting logs of recently
crashed server; regionserver tries to execute incomplete log
IMPROVEMENTS IMPROVEMENTS
HBASE-901 Add a limit to key length, check key and value length on client side HBASE-901 Add a limit to key length, check key and value length on client side

View File

@ -55,7 +55,6 @@ public class Leases extends Thread {
private final int leasePeriod; private final int leasePeriod;
private final int leaseCheckFrequency; private final int leaseCheckFrequency;
private volatile DelayQueue<Lease> leaseQueue = new DelayQueue<Lease>(); private volatile DelayQueue<Lease> leaseQueue = new DelayQueue<Lease>();
protected final Map<String, Lease> leases = new HashMap<String, Lease>(); protected final Map<String, Lease> leases = new HashMap<String, Lease>();
private volatile boolean stopRequested = false; private volatile boolean stopRequested = false;
@ -88,15 +87,16 @@ public class Leases extends Thread {
if (lease == null) { if (lease == null) {
continue; continue;
} }
// A lease expired // A lease expired. Run the expired code before removing from queue
// since its presence in queue is used to see if lease exists still.
if (lease.getListener() == null) {
LOG.error("lease listener is null for lease " + lease.getLeaseName());
} else {
lease.getListener().leaseExpired();
}
synchronized (leaseQueue) { synchronized (leaseQueue) {
leases.remove(lease.getLeaseName()); leases.remove(lease.getLeaseName());
if (lease.getListener() == null) {
LOG.error("lease listener is null for lease " + lease.getLeaseName());
continue;
}
} }
lease.getListener().leaseExpired();
} }
close(); close();
} }

View File

@ -338,17 +338,16 @@ abstract class BaseScanner extends Chore implements HConstants {
throws IOException { throws IOException {
synchronized (regionManager) { synchronized (regionManager) {
// Skip region - if ... // Skip region - if
if(info.isOffline() // offline if(info.isOffline() ||
|| regionManager.isOfflined(info.getRegionName())) { // queued for offline regionManager.isOfflined(info.getRegionName())) { // queued for offline
regionManager.removeRegion(info); regionManager.removeRegion(info);
return; return;
} }
HServerInfo storedInfo = null; HServerInfo storedInfo = null;
boolean deadServerAndLogsSplit = false;
boolean deadServer = false; boolean deadServer = false;
if (serverName.length() != 0) { if (serverName.length() != 0) {
if (regionManager.isOfflined(info.getRegionName())) { if (regionManager.isOfflined(info.getRegionName())) {
// Skip if region is on kill list // Skip if region is on kill list
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
@ -357,31 +356,31 @@ abstract class BaseScanner extends Chore implements HConstants {
} }
return; return;
} }
storedInfo = this.master.serverManager.getServerInfo(serverName);
storedInfo = master.serverManager.getServerInfo(serverName); deadServer = this.master.serverManager.isDead(serverName);
deadServer = master.serverManager.isDead(serverName); deadServerAndLogsSplit =
this.master.serverManager.isDeadServerLogsSplit(serverName);
} }
/* /*
* If the server is a dead server or its startcode is off -- either null * If the server is a dead server and its logs have been split or its
* not on the dead server lists and its startcode is off -- either null
* or doesn't match the start code for the address -- then add it to the * or doesn't match the start code for the address -- then add it to the
* list of unassigned regions IF not already there (or pending open). * list of unassigned regions IF not already there (or pending open).
*/ */
if ((deadServer || if ((deadServerAndLogsSplit ||
(storedInfo == null || storedInfo.getStartCode() != startCode)) && (!deadServer && (storedInfo == null ||
(!regionManager.isUnassigned(info) && (storedInfo.getStartCode() != startCode)))) &&
!regionManager.isPending(info.getRegionName()) && this.regionManager.assignable(info)) {
!regionManager.isAssigned(info.getRegionName()))) {
// The current assignment is invalid // The current assignment is invalid
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Current assignment of " + LOG.debug("Current assignment of " + info.getRegionNameAsString() +
info.getRegionNameAsString() + " is not valid; deadServerAndLogsSplit=" + deadServerAndLogsSplit +
" is not valid." + ", deadServer=" + deadServer + ". " +
(storedInfo == null ? " Server '" + serverName + "' unknown." : (storedInfo == null ? " Server '" + serverName + "' unknown." :
" serverInfo: " + storedInfo + ", passed startCode: " + " serverInfo: " + storedInfo + ", passed startCode: " +
startCode + ", storedInfo.startCode: " + storedInfo.getStartCode()) + startCode + ", storedInfo.startCode: " +
storedInfo.getStartCode()) +
" Region is not unassigned, assigned or pending"); " Region is not unassigned, assigned or pending");
} }
@ -389,7 +388,6 @@ abstract class BaseScanner extends Chore implements HConstants {
// This is only done from here if we are restarting and there is stale // This is only done from here if we are restarting and there is stale
// data in the meta region. Once we are on-line, dead server log // data in the meta region. Once we are on-line, dead server log
// recovery is handled by lease expiration and ProcessServerShutdown // recovery is handled by lease expiration and ProcessServerShutdown
if (!regionManager.isInitialMetaScanComplete() && if (!regionManager.isInitialMetaScanComplete() &&
serverName.length() != 0) { serverName.length() != 0) {
StringBuilder dirName = new StringBuilder("log_"); StringBuilder dirName = new StringBuilder("log_");
@ -418,7 +416,7 @@ abstract class BaseScanner extends Chore implements HConstants {
} }
} }
} }
/** /**
* Notify the thread to die at the end of its next run * Notify the thread to die at the end of its next run
*/ */

View File

@ -91,9 +91,7 @@ class ChangeTableState extends TableOperation {
synchronized (master.regionManager) { synchronized (master.regionManager) {
if (online) { if (online) {
// Bring offline regions on-line // Bring offline regions on-line
if (!master.regionManager.isUnassigned(i) && if (!master.regionManager.assignable(i)) {
!master.regionManager.isAssigned(i.getRegionName()) &&
!master.regionManager.isPending(i.getRegionName())) {
master.regionManager.setUnassigned(i, false); master.regionManager.setUnassigned(i, false);
} }
} else { } else {

View File

@ -532,15 +532,14 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
/* /*
* HMasterRegionInterface * HMasterRegionInterface
*/ */
public MapWritable regionServerStartup(HServerInfo serverInfo) { public MapWritable regionServerStartup(final HServerInfo serverInfo) {
// Set the address for now even tho it will not be persisted on // Set the address for now even tho it will not be persisted on HRS side.
// the HRS side.
String rsAddress = HBaseServer.getRemoteAddress(); String rsAddress = HBaseServer.getRemoteAddress();
serverInfo.setServerAddress(new HServerAddress serverInfo.setServerAddress(new HServerAddress(rsAddress,
(rsAddress, serverInfo.getServerAddress().getPort())); serverInfo.getServerAddress().getPort()));
// register with server manager // Register with server manager
serverManager.regionServerStartup(serverInfo); this.serverManager.regionServerStartup(serverInfo);
// send back some config info // Send back some config info
return createConfigurationSubset(); return createConfigurationSubset();
} }

View File

@ -45,11 +45,13 @@ import org.apache.hadoop.hbase.io.RowResult;
*/ */
class ProcessServerShutdown extends RegionServerOperation { class ProcessServerShutdown extends RegionServerOperation {
private final HServerAddress deadServer; private final HServerAddress deadServer;
private final String deadServerName; /*
* Cache of the server name.
*/
private final String deadServerStr;
private final boolean rootRegionServer; private final boolean rootRegionServer;
private boolean rootRegionReassigned = false; private boolean rootRegionReassigned = false;
private Path oldLogDir; private Path oldLogDir;
private boolean logSplit;
private boolean rootRescanned; private boolean rootRescanned;
@ -74,9 +76,8 @@ class ProcessServerShutdown extends RegionServerOperation {
boolean rootRegionServer) { boolean rootRegionServer) {
super(master); super(master);
this.deadServer = serverInfo.getServerAddress(); this.deadServer = serverInfo.getServerAddress();
this.deadServerName = this.deadServer.toString(); this.deadServerStr = this.deadServer.toString();
this.rootRegionServer = rootRegionServer; this.rootRegionServer = rootRegionServer;
this.logSplit = false;
this.rootRescanned = false; this.rootRescanned = false;
this.oldLogDir = this.oldLogDir =
new Path(master.rootdir, HLog.getHLogDirectoryName(serverInfo)); new Path(master.rootdir, HLog.getHLogDirectoryName(serverInfo));
@ -84,13 +85,14 @@ class ProcessServerShutdown extends RegionServerOperation {
@Override @Override
public String toString() { public String toString() {
return "ProcessServerShutdown of " + this.deadServer.toString(); return "ProcessServerShutdown of " + this.deadServerStr;
} }
/** Finds regions that the dead region server was serving */ /** Finds regions that the dead region server was serving
*/
protected void scanMetaRegion(HRegionInterface server, long scannerId, protected void scanMetaRegion(HRegionInterface server, long scannerId,
byte [] regionName) throws IOException { byte [] regionName)
throws IOException {
List<ToDoEntry> toDoList = new ArrayList<ToDoEntry>(); List<ToDoEntry> toDoList = new ArrayList<ToDoEntry>();
Set<HRegionInfo> regions = new HashSet<HRegionInfo>(); Set<HRegionInfo> regions = new HashSet<HRegionInfo>();
List<byte []> emptyRows = new ArrayList<byte []>(); List<byte []> emptyRows = new ArrayList<byte []>();
@ -107,14 +109,13 @@ class ProcessServerShutdown extends RegionServerOperation {
if (values == null || values.size() == 0) { if (values == null || values.size() == 0) {
break; break;
} }
byte [] row = values.getRow(); byte [] row = values.getRow();
// Check server name. If null, be conservative and treat as though // Check server name. If null, skip (We used to consider it was on
// region had been on shutdown server (could be null because we // shutdown server but that would mean that we'd reassign regions that
// missed edits in hlog because hdfs does not do write-append). // were already out being assigned, ones that were product of a split
// that happened while the shutdown was being processed.
String serverName = Writables.cellToString(values.get(COL_SERVER)); String serverName = Writables.cellToString(values.get(COL_SERVER));
if (serverName != null && serverName.length() > 0 && if (serverName == null || !deadServerStr.equals(serverName)) {
deadServerName.compareTo(serverName) != 0) {
// This isn't the server you're looking for - move along // This isn't the server you're looking for - move along
continue; continue;
} }
@ -159,7 +160,7 @@ class ProcessServerShutdown extends RegionServerOperation {
} }
} }
} finally { } finally {
if(scannerId != -1L) { if (scannerId != -1L) {
try { try {
server.close(scannerId); server.close(scannerId);
} catch (IOException e) { } catch (IOException e) {
@ -222,21 +223,22 @@ class ProcessServerShutdown extends RegionServerOperation {
long scannerId = long scannerId =
server.openScanner(m.getRegionName(), COLUMN_FAMILY_ARRAY, server.openScanner(m.getRegionName(), COLUMN_FAMILY_ARRAY,
EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP, null); EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP, null);
scanMetaRegion(server, scannerId, m.getRegionName());
scanMetaRegion(server, scannerId, m.getRegionName());
return true; return true;
} }
} }
@Override @Override
protected boolean process() throws IOException { protected boolean process() throws IOException {
LOG.info("process shutdown of server " + deadServer + ": logSplit: " + boolean logSplit =
this.logSplit + ", rootRescanned: " + rootRescanned + this.master.serverManager.isDeadServerLogsSplit(this.deadServerStr);
LOG.info("process shutdown of server " + this.deadServerStr +
": logSplit: " +
logSplit + ", rootRescanned: " + rootRescanned +
", numberOfMetaRegions: " + ", numberOfMetaRegions: " +
master.regionManager.numMetaRegions() + master.regionManager.numMetaRegions() +
", onlineMetaRegions.size(): " + ", onlineMetaRegions.size(): " +
master.regionManager.numOnlineMetaRegions()); master.regionManager.numOnlineMetaRegions());
if (!logSplit) { if (!logSplit) {
// Process the old log file // Process the old log file
if (master.fs.exists(oldLogDir)) { if (master.fs.exists(oldLogDir)) {
@ -250,9 +252,9 @@ class ProcessServerShutdown extends RegionServerOperation {
master.regionManager.splitLogLock.unlock(); master.regionManager.splitLogLock.unlock();
} }
} }
logSplit = true; this.master.serverManager.setDeadServerLogsSplit(this.deadServerStr);
} }
if (this.rootRegionServer && !this.rootRegionReassigned) { if (this.rootRegionServer && !this.rootRegionReassigned) {
// avoid multiple root region reassignment // avoid multiple root region reassignment
this.rootRegionReassigned = true; this.rootRegionReassigned = true;
@ -277,7 +279,6 @@ class ProcessServerShutdown extends RegionServerOperation {
new MetaRegion(master.getRootRegionLocation(), new MetaRegion(master.getRootRegionLocation(),
HRegionInfo.ROOT_REGIONINFO.getRegionName(), HRegionInfo.ROOT_REGIONINFO.getRegionName(),
HConstants.EMPTY_START_ROW), this.master).doWithRetries(); HConstants.EMPTY_START_ROW), this.master).doWithRetries();
if (result == null) { if (result == null) {
// Master is closing - give up // Master is closing - give up
return true; return true;
@ -290,7 +291,6 @@ class ProcessServerShutdown extends RegionServerOperation {
} }
rootRescanned = true; rootRescanned = true;
} }
if (!metaTableAvailable()) { if (!metaTableAvailable()) {
// We can't proceed because not all meta regions are online. // We can't proceed because not all meta regions are online.
// metaAvailable() has put this request on the delayedToDoQueue // metaAvailable() has put this request on the delayedToDoQueue
@ -309,7 +309,11 @@ class ProcessServerShutdown extends RegionServerOperation {
Bytes.toString(r.getRegionName()) + " on " + r.getServer()); Bytes.toString(r.getRegionName()) + " on " + r.getServer());
} }
} }
master.serverManager.removeDeadServer(deadServerName); // Remove this server from dead servers list. Finished splitting logs.
this.master.serverManager.removeDeadServer(deadServerStr);
if (LOG.isDebugEnabled()) {
LOG.debug("Removed " + deadServerStr + " from deadservers Map");
}
return true; return true;
} }
} }

View File

@ -694,6 +694,17 @@ class RegionManager implements HConstants {
} }
return false; return false;
} }
/**
* @param hri
* @return True if the passed region is assignable: i.e. not assigned, not
* pending and not unassigned.
*/
public boolean assignable(final HRegionInfo hri) {
return !isUnassigned(hri) &&
!isPending(hri.getRegionName()) &&
!isAssigned(hri.getRegionName());
}
/** /**
* @param regionName * @param regionName

View File

@ -59,9 +59,13 @@ class ServerManager implements HConstants {
final Map<String, HServerInfo> serversToServerInfo = final Map<String, HServerInfo> serversToServerInfo =
new ConcurrentHashMap<String, HServerInfo>(); new ConcurrentHashMap<String, HServerInfo>();
/** Set of known dead servers */ /**
final Set<String> deadServers = * Set of known dead servers. On lease expiration, servers are added here.
Collections.synchronizedSet(new HashSet<String>()); * Boolean holds whether its logs have been split or not. Initially set to
* false.
*/
private final Map<String, Boolean> deadServers =
new ConcurrentHashMap<String, Boolean>();
/** SortedMap server load -> Set of server names */ /** SortedMap server load -> Set of server names */
final SortedMap<HServerLoad, Set<String>> loadToServers = final SortedMap<HServerLoad, Set<String>> loadToServers =
@ -89,24 +93,67 @@ class ServerManager implements HConstants {
this.loggingPeriodForAverageLoad = master.getConfiguration(). this.loggingPeriodForAverageLoad = master.getConfiguration().
getLong("hbase.master.avgload.logging.period", 60000); getLong("hbase.master.avgload.logging.period", 60000);
} }
/*
* Look to see if we have ghost references to this regionserver such as
* still-existing leases or if regionserver is on the dead servers list
* getting its logs processed.
* @param serverInfo
* @return True if still ghost references and we have not been able to clear
* them or the server is shutting down.
*/
private boolean checkForGhostReferences(final HServerInfo serverInfo) {
String s = serverInfo.getServerAddress().toString().trim();
boolean result = false;
boolean lease = false;
for (long sleepTime = -1; !master.closed.get() && !result;) {
if (sleepTime != -1) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
// Continue
}
}
if (!lease) {
try {
this.serverLeases.createLease(s, new ServerExpirer(s));
} catch (Leases.LeaseStillHeldException e) {
LOG.debug("Waiting on current lease to expire for " + e.getName());
sleepTime = this.master.leaseTimeout / 4;
continue;
}
lease = true;
}
// May be on list of dead servers. If so, wait till we've cleared it.
String addr = serverInfo.getServerAddress().toString();
if (isDead(addr) && !isDeadServerLogsSplit(addr)) {
LOG.debug("Waiting on " + addr + " removal from dead list before " +
"processing report-for-duty request");
sleepTime = this.master.threadWakeFrequency;
try {
// Keep up lease. May be here > lease expiration.
this.serverLeases.renewLease(s);
} catch (LeaseException e) {
LOG.warn("Failed renewal. Retrying.", e);
}
continue;
}
result = true;
}
return result;
}
/** /**
* Let the server manager know a new regionserver has come online * Let the server manager know a new regionserver has come online
* @param serverInfo * @param serverInfo
*/ */
public void regionServerStartup(HServerInfo serverInfo) { public void regionServerStartup(final HServerInfo serverInfo) {
String s = serverInfo.getServerAddress().toString().trim(); String s = serverInfo.getServerAddress().toString().trim();
LOG.info("Received start message from: " + s); LOG.info("Received start message from: " + s);
// Do the lease check up here. There might already be one out on this if (!checkForGhostReferences(serverInfo)) {
// server expecially if it just shutdown and came back up near-immediately. return;
if (!master.closed.get()) {
try {
serverLeases.createLease(s, new ServerExpirer(s));
} catch (Leases.LeaseStillHeldException e) {
LOG.debug("Lease still held on " + e.getName());
return;
}
} }
// Go on to process the regionserver registration.
HServerLoad load = serversToLoad.remove(s); HServerLoad load = serversToLoad.remove(s);
if (load != null) { if (load != null) {
// The startup message was from a known server. // The startup message was from a known server.
@ -119,7 +166,6 @@ class ServerManager implements HConstants {
} }
} }
} }
HServerInfo storedInfo = serversToServerInfo.remove(s); HServerInfo storedInfo = serversToServerInfo.remove(s);
if (storedInfo != null && !master.closed.get()) { if (storedInfo != null && !master.closed.get()) {
// The startup message was from a known server with the same name. // The startup message was from a known server with the same name.
@ -137,7 +183,6 @@ class ServerManager implements HConstants {
LOG.error("Insertion into toDoQueue was interrupted", e); LOG.error("Insertion into toDoQueue was interrupted", e);
} }
} }
// record new server // record new server
load = new HServerLoad(); load = new HServerLoad();
serverInfo.setLoad(load); serverInfo.setLoad(load);
@ -703,7 +748,7 @@ class ServerManager implements HConstants {
} }
} }
} }
deadServers.add(server); deadServers.put(server, Boolean.FALSE);
try { try {
master.toDoQueue.put( master.toDoQueue.put(
new ProcessServerShutdown(master, info, rootServer)); new ProcessServerShutdown(master, info, rootServer));
@ -742,6 +787,23 @@ class ServerManager implements HConstants {
* @return true if server is dead * @return true if server is dead
*/ */
public boolean isDead(String serverName) { public boolean isDead(String serverName) {
return deadServers.contains(serverName); return deadServers.containsKey(serverName);
} }
}
/**
* @param serverName
* @return True if this is a dead server and it has had its logs split.
*/
public boolean isDeadServerLogsSplit(final String serverName) {
Boolean b = this.deadServers.get(serverName);
return b == null? false: b.booleanValue();
}
/**
* Set that this deadserver has had its log split.
* @param serverName
*/
public void setDeadServerLogsSplit(final String serverName) {
this.deadServers.put(serverName, Boolean.TRUE);
}
}