HBASE-1421 Processing a regionserver message -- OPEN, CLOSE, SPLIT, etc. -- and if we're carrying more than one message in payload, if exception, all messages that follow are dropped on floor

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@776050 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2009-05-18 18:42:13 +00:00
parent b4dedf9cbf
commit c40632b210
5 changed files with 52 additions and 42 deletions

View File

@ -133,6 +133,9 @@ Release 0.20.0 - Unreleased
(Clint Morgan via Stack) (Clint Morgan via Stack)
HBASE-1431 NPE in HTable.checkAndSave when row doesn't exist (Guilherme HBASE-1431 NPE in HTable.checkAndSave when row doesn't exist (Guilherme
Mauro Germoglio Barbosa via Andrew Purtell) Mauro Germoglio Barbosa via Andrew Purtell)
HBASE-1421 Processing a regionserver message -- OPEN, CLOSE, SPLIT, etc. --
and if we're carrying more than one message in payload, if
exception, all messages that follow are dropped on floor
IMPROVEMENTS IMPROVEMENTS
HBASE-1089 Add count of regions on filesystem to master UI; add percentage HBASE-1089 Add count of regions on filesystem to master UI; add percentage

View File

@ -40,11 +40,9 @@ class ProcessRegionOpen extends ProcessRegionStatusChange {
* @param master * @param master
* @param info * @param info
* @param regionInfo * @param regionInfo
* @throws IOException
*/ */
public ProcessRegionOpen(HMaster master, HServerInfo info, public ProcessRegionOpen(HMaster master, HServerInfo info,
HRegionInfo regionInfo) HRegionInfo regionInfo) {
throws IOException {
super(master, regionInfo); super(master, regionInfo);
if (info == null) { if (info == null) {
throw new NullPointerException("HServerInfo cannot be null; " + throw new NullPointerException("HServerInfo cannot be null; " +

View File

@ -1230,9 +1230,8 @@ class RegionManager implements HConstants {
*/ */
synchronized void setPendingOpen(final String serverName) { synchronized void setPendingOpen(final String serverName) {
if (!this.unassigned) { if (!this.unassigned) {
throw new IllegalStateException( LOG.warn("Cannot assign a region that is not currently unassigned. " +
"Cannot assign a region that is not currently unassigned. State: " + "FIX!! State: " + toString());
toString());
} }
this.unassigned = false; this.unassigned = false;
this.pendingOpen = true; this.pendingOpen = true;
@ -1250,9 +1249,8 @@ class RegionManager implements HConstants {
synchronized void setOpen() { synchronized void setOpen() {
if (!pendingOpen) { if (!pendingOpen) {
throw new IllegalStateException( LOG.warn("Cannot set a region as open if it has not been pending. " +
"Cannot set a region as open if it has not been pending. State: " + "FIX!! State: " + toString());
toString());
} }
this.unassigned = false; this.unassigned = false;
this.pendingOpen = false; this.pendingOpen = false;
@ -1284,9 +1282,8 @@ class RegionManager implements HConstants {
synchronized void setPendingClose() { synchronized void setPendingClose() {
if (!closing) { if (!closing) {
throw new IllegalStateException( LOG.warn("Cannot set a region as pending close if it has not been " +
"Cannot set a region as pending close if it has not been closing. " + "closing. FIX!! State: " + toString());
"State: " + toString());
} }
this.unassigned = false; this.unassigned = false;
this.pendingOpen = false; this.pendingOpen = false;
@ -1353,4 +1350,4 @@ class RegionManager implements HConstants {
return Bytes.compareTo(getRegionName(), o.getRegionName()); return Bytes.compareTo(getRegionName(), o.getRegionName());
} }
} }
} }

View File

@ -42,7 +42,7 @@ abstract class RegionServerOperation implements Delayed, HConstants {
// DelayQueue we're inserted in on lease expiration. // DelayQueue we're inserted in on lease expiration.
this.expire = System.currentTimeMillis() + this.master.leaseTimeout / 2; this.expire = System.currentTimeMillis() + this.master.leaseTimeout / 2;
} }
public long getDelay(TimeUnit unit) { public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), return unit.convert(this.expire - System.currentTimeMillis(),
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);

View File

@ -388,26 +388,30 @@ class ServerManager implements HConstants {
return processMsgs(serverInfo, mostLoadedRegions, msgs); return processMsgs(serverInfo, mostLoadedRegions, msgs);
} }
/** /*
* Process all the incoming messages from a server that's contacted us. * Process all the incoming messages from a server that's contacted us.
*
* Note that we never need to update the server's load information because * Note that we never need to update the server's load information because
* that has already been done in regionServerReport. * that has already been done in regionServerReport.
* @param serverInfo
* @param mostLoadedRegions
* @param incomingMsgs
* @return
*/ */
private HMsg[] processMsgs(HServerInfo serverInfo, private HMsg[] processMsgs(HServerInfo serverInfo,
HRegionInfo[] mostLoadedRegions, HMsg incomingMsgs[]) HRegionInfo[] mostLoadedRegions, HMsg incomingMsgs[]) {
throws IOException {
ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>(); ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>();
if (serverInfo.getServerAddress() == null) { if (serverInfo.getServerAddress() == null) {
throw new NullPointerException("Server address cannot be null; " + throw new NullPointerException("Server address cannot be null; " +
"hbase-958 debugging"); "hbase-958 debugging");
} }
// Get reports on what the RegionServer did. // Get reports on what the RegionServer did.
// Be careful that in message processors we don't throw exceptions that
// break the switch below because then we might drop messages on the floor.
int openingCount = 0; int openingCount = 0;
for (int i = 0; i < incomingMsgs.length; i++) { for (int i = 0; i < incomingMsgs.length; i++) {
HRegionInfo region = incomingMsgs[i].getRegionInfo(); HRegionInfo region = incomingMsgs[i].getRegionInfo();
LOG.info("Received " + incomingMsgs[i] + " from " + LOG.info("Received " + incomingMsgs[i] + " from " +
serverInfo.getServerName()); serverInfo.getServerName() + "; " + i + " of " + incomingMsgs.length);
switch (incomingMsgs[i].getType()) { switch (incomingMsgs[i].getType()) {
case MSG_REPORT_PROCESS_OPEN: case MSG_REPORT_PROCESS_OPEN:
openingCount++; openingCount++;
@ -422,13 +426,11 @@ class ServerManager implements HConstants {
break; break;
case MSG_REPORT_SPLIT: case MSG_REPORT_SPLIT:
processSplitRegion(region, incomingMsgs[++i], incomingMsgs[++i], processSplitRegion(region, incomingMsgs[++i], incomingMsgs[++i]);
returnMsgs);
break; break;
default: default:
throw new IOException( LOG.warn("Impossible state during message processing. Instruction: " +
"Impossible state during message processing. Instruction: " +
incomingMsgs[i].getType()); incomingMsgs[i].getType());
} }
} }
@ -456,7 +458,7 @@ class ServerManager implements HConstants {
return returnMsgs.toArray(new HMsg[returnMsgs.size()]); return returnMsgs.toArray(new HMsg[returnMsgs.size()]);
} }
/** /*
* A region has split. * A region has split.
* *
* @param region * @param region
@ -464,21 +466,16 @@ class ServerManager implements HConstants {
* @param splitB * @param splitB
* @param returnMsgs * @param returnMsgs
*/ */
private void processSplitRegion(HRegionInfo region, HMsg splitA, HMsg splitB, private void processSplitRegion(HRegionInfo region, HMsg splitA, HMsg splitB) {
ArrayList<HMsg> returnMsgs) {
synchronized (master.regionManager) { synchronized (master.regionManager) {
// Cancel any actions pending for the affected region. // Cancel any actions pending for the affected region.
// This prevents the master from sending a SPLIT message if the table // This prevents the master from sending a SPLIT message if the table
// has already split by the region server. // has already split by the region server.
master.regionManager.endActions(region.getRegionName()); master.regionManager.endActions(region.getRegionName());
HRegionInfo newRegionA = splitA.getRegionInfo(); HRegionInfo newRegionA = splitA.getRegionInfo();
master.regionManager.setUnassigned(newRegionA, false); master.regionManager.setUnassigned(newRegionA, false);
HRegionInfo newRegionB = splitB.getRegionInfo(); HRegionInfo newRegionB = splitB.getRegionInfo();
master.regionManager.setUnassigned(newRegionB, false); master.regionManager.setUnassigned(newRegionB, false);
if (region.isMetaTable()) { if (region.isMetaTable()) {
// A meta region has split. // A meta region has split.
master.regionManager.offlineMetaRegion(region.getStartKey()); master.regionManager.offlineMetaRegion(region.getStartKey());
@ -487,10 +484,14 @@ class ServerManager implements HConstants {
} }
} }
/** Region server is reporting that a region is now opened */ /*
* Region server is reporting that a region is now opened
* @param serverInfo
* @param region
* @param returnMsgs
*/
private void processRegionOpen(HServerInfo serverInfo, private void processRegionOpen(HServerInfo serverInfo,
HRegionInfo region, ArrayList<HMsg> returnMsgs) HRegionInfo region, ArrayList<HMsg> returnMsgs) {
throws IOException {
boolean duplicateAssignment = false; boolean duplicateAssignment = false;
synchronized (master.regionManager) { synchronized (master.regionManager) {
if (!master.regionManager.isUnassigned(region) && if (!master.regionManager.isUnassigned(region) &&
@ -549,19 +550,30 @@ class ServerManager implements HConstants {
// Note that the table has been assigned and is waiting for the // Note that the table has been assigned and is waiting for the
// meta table to be updated. // meta table to be updated.
master.regionManager.setOpen(region.getRegionNameAsString()); master.regionManager.setOpen(region.getRegionNameAsString());
// Queue up an update to note the region location. // Queue up an update to note the region location. Do inside
try { // a retry loop in case interrupted.
master.toDoQueue.put( boolean succeeded = false;
new ProcessRegionOpen(master, serverInfo, region)); for (int i = 0; i < 10; i++) {
} catch (InterruptedException e) { try {
throw new RuntimeException( master.toDoQueue.
"Putting into toDoQueue was interrupted.", e); put(new ProcessRegionOpen(master, serverInfo, region));
succeeded = true;
} catch (InterruptedException e) {
LOG.warn("Putting into toDoQueue was interrupted.", e);
}
} }
} if (!succeeded) {
LOG.warn("FAILED ADDING OPEN TO TODO QUEUE: " + serverInfo);
}
}
} }
} }
} }
/*
* @param region
* @throws Exception
*/
private void processRegionClose(HRegionInfo region) { private void processRegionClose(HRegionInfo region) {
synchronized (master.regionManager) { synchronized (master.regionManager) {
if (region.isRootRegion()) { if (region.isRootRegion()) {