HBASE-504 Allow HMsg's carry a payload: e.g. exception that happened

over on the remote side.

M  src/test/org/apache/hadoop/hbase/TestSerialization.java
    Add test that HMsg with region and message serializes.
M  src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    HMsg types have been enumified.  Also use some of the new
    static messages.  On split and close because of error,
    add message to the HMsg we pass back to the master.
M  src/java/org/apache/hadoop/hbase/HServerInfo.java
    (compareTo): Added.
M  src/java/org/apache/hadoop/hbase/HRegionInfo.java
    Allow null in compareTo.
M src/java/org/apache/hadoop/hbase/master/ServerManager.java
    Use the new HMsg.isType figuring message type.
    Redo message logging.  Use convenience HMsg statics.
M  src/java/org/apache/hadoop/hbase/master/RegionManager.java
    Pass back overloaded message if region shutdown because of balancing.
M  src/java/org/apache/hadoop/hbase/HServerAddress.java
    Make it so that two addresses equate even if one has hostname
    and the other IP.
M  src/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java
    Up the protocol version for regionserver reporting master messages.
M  src/java/org/apache/hadoop/hbase/HMsg.java
    Enumify the messge types.
    Define a few static HMsgs for convenience.
    Allow optional message. Improved toString.
    (isType, equals, hashCode): Added.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@658465 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2008-05-20 22:29:36 +00:00
parent bf4536f713
commit a000a20845
10 changed files with 296 additions and 239 deletions

View File

@ -43,6 +43,8 @@ Hbase Change Log
HBASE-621 Make MAX_VERSIONS work like TTL: In scans and gets, check HBASE-621 Make MAX_VERSIONS work like TTL: In scans and gets, check
MAX_VERSIONs setting and return that many only rather than wait on MAX_VERSIONs setting and return that many only rather than wait on
compaction (Jean-Daniel Cryans via Stack) compaction (Jean-Daniel Cryans via Stack)
HBASE-504 Allow HMsg's carry a payload: e.g. exception that happened over
on the remote side.
Release 0.1.2 - 05/13/2008 Release 0.1.2 - 05/13/2008

View File

@ -24,107 +24,156 @@ import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
/******************************************************************************* /**
* HMsg is for communicating instructions between the HMaster and the * HMsg is for communicating instructions between the HMaster and the
* HRegionServers. * HRegionServers.
******************************************************************************/ *
* Most of the time the messages are simple but some messages are accompanied
* by the region affected. HMsg may also carry optional message.
*/
@SuppressWarnings("serial")
public class HMsg implements Writable { public class HMsg implements Writable {
/**
* Message types sent between master and regionservers
*/
public static enum Type {
MSG_NONE,
// Messages sent from master to region server // Message types sent from master to region server
/** Start serving the specified region */ /** Start serving the specified region */
public static final byte MSG_REGION_OPEN = 1; MSG_REGION_OPEN,
/** Stop serving the specified region */ /** Stop serving the specified region */
public static final byte MSG_REGION_CLOSE = 2; MSG_REGION_CLOSE,
/** Region server is unknown to master. Restart */ /** Region server is unknown to master. Restart */
public static final byte MSG_CALL_SERVER_STARTUP = 4; MSG_CALL_SERVER_STARTUP,
/** Master tells region server to stop */ /** Master tells region server to stop */
public static final byte MSG_REGIONSERVER_STOP = 5; MSG_REGIONSERVER_STOP,
/** Stop serving the specified region and don't report back that it's closed */ /** Stop serving the specified region and don't report back that it's
public static final byte MSG_REGION_CLOSE_WITHOUT_REPORT = 6; * closed
*/
MSG_REGION_CLOSE_WITHOUT_REPORT,
/** Stop serving user regions */ /** Stop serving user regions */
public static final byte MSG_REGIONSERVER_QUIESCE = 7; MSG_REGIONSERVER_QUIESCE,
// Messages sent from the region server to the master
// Message types sent from the region server to the master
/** region server is now serving the specified region */ /** region server is now serving the specified region */
public static final byte MSG_REPORT_OPEN = 100; MSG_REPORT_OPEN,
/** region server is no longer serving the specified region */ /** region server is no longer serving the specified region */
public static final byte MSG_REPORT_CLOSE = 101; MSG_REPORT_CLOSE,
/** region server is processing open request */ /** region server is processing open request */
public static final byte MSG_REPORT_PROCESS_OPEN = 102; MSG_REPORT_PROCESS_OPEN,
/** /**
* region server split the region associated with this message. * Region server split the region associated with this message.
* *
* note that this message is immediately followed by two MSG_REPORT_OPEN * Note that this message is immediately followed by two MSG_REPORT_OPEN
* messages, one for each of the new regions resulting from the split * messages, one for each of the new regions resulting from the split
*/ */
public static final byte MSG_REPORT_SPLIT = 103; MSG_REPORT_SPLIT,
/** /**
* region server is shutting down * Region server is shutting down
* *
* note that this message is followed by MSG_REPORT_CLOSE messages for each * Note that this message is followed by MSG_REPORT_CLOSE messages for each
* region the region server was serving, unless it was told to quiesce. * region the region server was serving, unless it was told to quiesce.
*/ */
public static final byte MSG_REPORT_EXITING = 104; MSG_REPORT_EXITING,
/** region server has closed all user regions but is still serving meta regions */ /** Region server has closed all user regions but is still serving meta
public static final byte MSG_REPORT_QUIESCED = 105; * regions
*/
MSG_REPORT_QUIESCED,
}
private Type type = null;
private HRegionInfo info = null;
private Text message = null;
// Some useful statics. Use these rather than create a new HMsg each time.
public static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING);
public static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED);
public static final HMsg REGIONSERVER_QUIESCE =
new HMsg(Type.MSG_REGIONSERVER_QUIESCE);
public static final HMsg REGIONSERVER_STOP =
new HMsg(Type.MSG_REGIONSERVER_STOP);
public static final HMsg CALL_SERVER_STARTUP =
new HMsg(Type.MSG_CALL_SERVER_STARTUP);
public static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg[0];
byte msg;
HRegionInfo info;
/** Default constructor. Used during deserialization */ /** Default constructor. Used during deserialization */
public HMsg() { public HMsg() {
this.info = new HRegionInfo(); this(Type.MSG_NONE);
} }
/** /**
* Construct a message with an empty HRegionInfo * Construct a message with the specified message and HRegionInfo
* @param type Message type
*/
public HMsg(final HMsg.Type type) {
this(type, new HRegionInfo(), null);
}
/**
* Construct a message with the specified message and HRegionInfo
* @param type Message type
* @param hri Region to which message <code>type</code> applies
*/
public HMsg(final HMsg.Type type, final HRegionInfo hri) {
this(type, hri, null);
}
/**
* Construct a message with the specified message and HRegionInfo
* *
* @param msg - message code * @param type Message type
* @param hri Region to which message <code>type</code> applies. Cannot be
* null. If no info associated, used other Constructor.
* @param msg Optional message (Stringified exception, etc.)
*/ */
public HMsg(byte msg) { public HMsg(final HMsg.Type type, final HRegionInfo hri, final Text msg) {
this.msg = msg; if (type == null) {
this.info = new HRegionInfo(); throw new NullPointerException("Message type cannot be null");
}
this.type = type;
if (hri == null) {
throw new NullPointerException("Region cannot be null");
}
this.info = hri;
this.message = msg;
} }
/** /**
* Construct a message with the specified message code and HRegionInfo * @return Region info or null if none associated with this message type.
*
* @param msg - message code
* @param info - HRegionInfo
*/
public HMsg(byte msg, HRegionInfo info) {
this.msg = msg;
this.info = info;
}
/**
* Accessor
* @return message code
*/
public byte getMsg() {
return msg;
}
/**
* Accessor
* @return HRegionInfo
*/ */
public HRegionInfo getRegionInfo() { public HRegionInfo getRegionInfo() {
return info; return this.info;
}
public Type getType() {
return this.type;
}
/**
* @param other Message type to compare to
* @return True if we are of same message type as <code>other</code>
*/
public boolean isType(final HMsg.Type other) {
return this.type.equals(other);
}
public Text getMessage() {
return this.message;
} }
/** /**
@ -132,67 +181,37 @@ public class HMsg implements Writable {
*/ */
@Override @Override
public String toString() { public String toString() {
StringBuilder message = new StringBuilder(); StringBuilder sb = new StringBuilder();
switch(msg) { sb.append(this.type.toString());
case MSG_REGION_OPEN: // If null or empty region, don't bother printing it out.
message.append("MSG_REGION_OPEN : "); if (this.info != null && this.info.getRegionName().length > 0) {
break; sb.append(": ");
sb.append(this.info.getRegionNameAsString());
case MSG_REGION_CLOSE:
message.append("MSG_REGION_CLOSE : ");
break;
case MSG_CALL_SERVER_STARTUP:
message.append("MSG_CALL_SERVER_STARTUP : ");
break;
case MSG_REGIONSERVER_STOP:
message.append("MSG_REGIONSERVER_STOP : ");
break;
case MSG_REGION_CLOSE_WITHOUT_REPORT:
message.append("MSG_REGION_CLOSE_WITHOUT_REPORT : ");
break;
case MSG_REGIONSERVER_QUIESCE:
message.append("MSG_REGIONSERVER_QUIESCE : ");
break;
case MSG_REPORT_PROCESS_OPEN:
message.append("MSG_REPORT_PROCESS_OPEN : ");
break;
case MSG_REPORT_OPEN:
message.append("MSG_REPORT_OPEN : ");
break;
case MSG_REPORT_CLOSE:
message.append("MSG_REPORT_CLOSE : ");
break;
case MSG_REPORT_SPLIT:
message.append("MSG_REGION_SPLIT : ");
break;
case MSG_REPORT_EXITING:
message.append("MSG_REPORT_EXITING : ");
break;
case MSG_REPORT_QUIESCED:
message.append("MSG_REPORT_QUIESCED : ");
break;
default:
message.append("unknown message code (");
message.append(msg);
message.append(") : ");
break;
} }
message.append(info == null ? "null": info.getRegionNameAsString()); if (this.message != null && this.message.getLength() > 0) {
return message.toString(); sb.append(": " + this.message);
}
return sb.toString();
} }
////////////////////////////////////////////////////////////////////////////// @Override
public boolean equals(Object obj) {
HMsg that = (HMsg)obj;
return this.type.equals(that.type) &&
(this.info != null)? this.info.equals(that.info):
that.info == null;
}
@Override
public int hashCode() {
int result = this.type.hashCode();
if (this.info != null) {
result ^= this.info.hashCode();
}
return result;
}
// ////////////////////////////////////////////////////////////////////////////
// Writable // Writable
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -200,15 +219,29 @@ public class HMsg implements Writable {
* {@inheritDoc} * {@inheritDoc}
*/ */
public void write(DataOutput out) throws IOException { public void write(DataOutput out) throws IOException {
out.writeByte(msg); out.writeInt(this.type.ordinal());
info.write(out); this.info.write(out);
if (this.message == null || this.message.getLength() == 0) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
this.message.write(out);
}
} }
/** /**
* {@inheritDoc} * {@inheritDoc}
*/ */
public void readFields(DataInput in) throws IOException { public void readFields(DataInput in) throws IOException {
this.msg = in.readByte(); int ordinal = in.readInt();
this.type = HMsg.Type.values()[ordinal];
this.info.readFields(in); this.info.readFields(in);
boolean hasMessage = in.readBoolean();
if (hasMessage) {
if (this.message == null) {
this.message = new Text();
}
this.message.readFields(in);
}
} }
} }

View File

@ -352,6 +352,9 @@ public class HRegionInfo implements WritableComparable {
*/ */
public int compareTo(Object o) { public int compareTo(Object o) {
HRegionInfo other = (HRegionInfo) o; HRegionInfo other = (HRegionInfo) o;
if (other == null) {
return 1;
}
// Are regions of same table? // Are regions of same table?
int result = this.tableDesc.compareTo(other.tableDesc); int result = this.tableDesc.compareTo(other.tableDesc);

View File

@ -178,7 +178,11 @@ public class HServerAddress implements WritableComparable {
* {@inheritDoc} * {@inheritDoc}
*/ */
public int compareTo(Object o) { public int compareTo(Object o) {
HServerAddress other = (HServerAddress) o; HServerAddress that = (HServerAddress)o;
return this.toString().compareTo(other.toString()); // Addresses as Strings may not compare though address is for the one
// server with only difference being that one address has hostname
// resolved whereas other only has IP.
if (this.address.equals(that.address)) return 0;
return this.toString().compareTo(that.toString());
} }
} }

View File

@ -23,7 +23,7 @@ import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable;
/** /**
@ -33,7 +33,7 @@ import org.apache.hadoop.io.Writable;
* In the future it will contain information about the source machine and * In the future it will contain information about the source machine and
* load statistics. * load statistics.
*/ */
public class HServerInfo implements Writable { public class HServerInfo implements WritableComparable {
private HServerAddress serverAddress; private HServerAddress serverAddress;
private long startCode; private long startCode;
private HServerLoad load; private HServerLoad load;
@ -116,20 +116,7 @@ public class HServerInfo implements Writable {
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (!(obj instanceof HServerInfo)) { return compareTo(obj) == 0;
return false;
}
HServerInfo that = (HServerInfo)obj;
if (!this.serverAddress.equals(that.serverAddress)) {
return false;
}
if (this.infoPort != that.infoPort) {
return false;
}
if (this.startCode != that.startCode) {
return false;
}
return true;
} }
@Override @Override
@ -155,4 +142,20 @@ public class HServerInfo implements Writable {
this.load.write(out); this.load.write(out);
out.writeInt(this.infoPort); out.writeInt(this.infoPort);
} }
public int compareTo(Object o) {
HServerInfo that = (HServerInfo)o;
int result = getServerAddress().compareTo(that.getServerAddress());
if (result != 0) {
return result;
}
if (this.infoPort != that.infoPort) {
return this.infoPort - that.infoPort;
}
if (getStartCode() == that.getStartCode()) {
return 0;
}
// Startcodes are timestamps.
return (int)(getStartCode() - that.getStartCode());
}
} }

View File

@ -32,11 +32,14 @@ import org.apache.hadoop.hbase.HRegionInfo;
* goings-on and to obtain data-handling instructions from the HMaster. * goings-on and to obtain data-handling instructions from the HMaster.
*/ */
public interface HMasterRegionInterface extends VersionedProtocol { public interface HMasterRegionInterface extends VersionedProtocol {
/** Interface version number. /**
* Interface version number.
* Version 2 was when the regionServerStartup was changed to return a * Version 2 was when the regionServerStartup was changed to return a
* MapWritable instead of a HbaseMapWritable. * MapWritable instead of a HbaseMapWritable as part of HBASE-82 changes.
* Version 3 was when HMsg was refactored so it could carry optional
* messages (HBASE-504).
*/ */
public static final long versionID = 2L; public static final long versionID = 3L;
/** /**
* Called when a region server first starts * Called when a region server first starts

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Text;
/** /**
* Class to manage assigning regions to servers, state of root and meta, etc. * Class to manage assigning regions to servers, state of root and meta, etc.
@ -72,6 +73,8 @@ class RegionManager implements HConstants {
Collections.synchronizedSortedMap(new TreeMap<byte [], Collections.synchronizedSortedMap(new TreeMap<byte [],
MetaRegion>(Bytes.BYTES_COMPARATOR)); MetaRegion>(Bytes.BYTES_COMPARATOR));
private static final Text OVERLOADED = new Text("Overloaded");
/** /**
* The 'unassignedRegions' table maps from a HRegionInfo to a timestamp that * The 'unassignedRegions' table maps from a HRegionInfo to a timestamp that
* indicates the last time we *tried* to assign the region to a RegionServer. * indicates the last time we *tried* to assign the region to a RegionServer.
@ -255,7 +258,7 @@ class RegionManager implements HConstants {
Bytes.toString(regionInfo.getRegionName())+ Bytes.toString(regionInfo.getRegionName())+
" to server " + serverName); " to server " + serverName);
unassignedRegions.put(regionInfo, Long.valueOf(now)); unassignedRegions.put(regionInfo, Long.valueOf(now));
returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo)); returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_OPEN, regionInfo));
if (--nregions <= 0) { if (--nregions <= 0) {
break; break;
} }
@ -380,7 +383,7 @@ class RegionManager implements HConstants {
Bytes.toString(regionInfo.getRegionName()) + Bytes.toString(regionInfo.getRegionName()) +
" to the only server " + serverName); " to the only server " + serverName);
unassignedRegions.put(regionInfo, Long.valueOf(now)); unassignedRegions.put(regionInfo, Long.valueOf(now));
returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo)); returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_OPEN, regionInfo));
} }
} }
@ -418,7 +421,8 @@ class RegionManager implements HConstants {
LOG.debug("Going to close region " + currentRegion.getRegionName()); LOG.debug("Going to close region " + currentRegion.getRegionName());
// make a message to close the region // make a message to close the region
returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE, currentRegion)); returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_CLOSE, currentRegion,
OVERLOADED));
// mark the region as closing // mark the region as closing
setClosing(currentRegion.getRegionName()); setClosing(currentRegion.getRegionName());
// increment the count of regions we've marked // increment the count of regions we've marked

View File

@ -42,13 +42,15 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Leases; import org.apache.hadoop.hbase.Leases;
import org.apache.hadoop.hbase.LeaseListener; import org.apache.hadoop.hbase.LeaseListener;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.io.Text;
/** /**
* The ServerManager class manages info about region servers - HServerInfo, * The ServerManager class manages info about region servers - HServerInfo,
* load numbers, dying servers, etc. * load numbers, dying servers, etc.
*/ */
class ServerManager implements HConstants { class ServerManager implements HConstants {
static final Log LOG = LogFactory.getLog(ServerManager.class.getName()); private static final Log LOG =
LogFactory.getLog(ServerManager.class.getName());
private final AtomicInteger quiescedServers = new AtomicInteger(0); private final AtomicInteger quiescedServers = new AtomicInteger(0);
@ -68,8 +70,7 @@ class ServerManager implements HConstants {
final Map<String, HServerLoad> serversToLoad = final Map<String, HServerLoad> serversToLoad =
new ConcurrentHashMap<String, HServerLoad>(); new ConcurrentHashMap<String, HServerLoad>();
HMaster master; private HMaster master;
private final Leases serverLeases; private final Leases serverLeases;
/** /**
@ -83,15 +84,13 @@ class ServerManager implements HConstants {
/** /**
* Let the server manager know a new regionserver has come online * Let the server manager know a new regionserver has come online
*
* @param serverInfo * @param serverInfo
*/ */
public void regionServerStartup(HServerInfo serverInfo) { public void regionServerStartup(HServerInfo serverInfo) {
String s = serverInfo.getServerAddress().toString().trim(); String s = serverInfo.getServerAddress().toString().trim();
LOG.info("received start message from: " + s); LOG.info("Received start message from: " + s);
// Do the lease check up here. There might already be one out on this // Do the lease check up here. There might already be one out on this
// server expecially if it just shutdown and came back up near-immediately // server expecially if it just shutdown and came back up near-immediately.
// after.
if (!master.closed.get()) { if (!master.closed.get()) {
try { try {
serverLeases.createLease(s, new ServerExpirer(s)); serverLeases.createLease(s, new ServerExpirer(s));
@ -152,15 +151,15 @@ class ServerManager implements HConstants {
* *
* @throws IOException * @throws IOException
*/ */
public HMsg [] regionServerReport(HServerInfo serverInfo, HMsg msgs[], public HMsg [] regionServerReport(final HServerInfo serverInfo,
HRegionInfo[] mostLoadedRegions) final HMsg msgs[], final HRegionInfo[] mostLoadedRegions)
throws IOException { throws IOException {
String serverName = serverInfo.getServerAddress().toString().trim(); String serverName = serverInfo.getServerAddress().toString().trim();
if (msgs.length > 0) { if (msgs.length > 0) {
if (msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) { if (msgs[0].isType(HMsg.Type.MSG_REPORT_EXITING)) {
processRegionServerExit(serverName, msgs); processRegionServerExit(serverName, msgs);
return new HMsg[0]; return HMsg.EMPTY_HMSG_ARRAY;
} else if (msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) { } else if (msgs[0].isType(HMsg.Type.MSG_REPORT_QUIESCED)) {
LOG.info("Region server " + serverName + " quiesced"); LOG.info("Region server " + serverName + " quiesced");
quiescedServers.incrementAndGet(); quiescedServers.incrementAndGet();
} }
@ -175,13 +174,14 @@ class ServerManager implements HConstants {
} }
if (!master.closed.get()) { if (!master.closed.get()) {
if (msgs.length > 0 && msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) { if (msgs.length > 0 &&
msgs[0].isType(HMsg.Type.MSG_REPORT_QUIESCED)) {
// Server is already quiesced, but we aren't ready to shut down // Server is already quiesced, but we aren't ready to shut down
// return empty response // return empty response
return new HMsg[0]; return HMsg.EMPTY_HMSG_ARRAY;
} }
// Tell the server to stop serving any user regions // Tell the server to stop serving any user regions
return new HMsg [] {new HMsg(HMsg.MSG_REGIONSERVER_QUIESCE)}; return new HMsg [] {HMsg.REGIONSERVER_QUIESCE};
} }
} }
@ -189,7 +189,7 @@ class ServerManager implements HConstants {
// Tell server to shut down if we are shutting down. This should // Tell server to shut down if we are shutting down. This should
// happen after check of MSG_REPORT_EXITING above, since region server // happen after check of MSG_REPORT_EXITING above, since region server
// will send us one of these messages after it gets MSG_REGIONSERVER_STOP // will send us one of these messages after it gets MSG_REGIONSERVER_STOP
return new HMsg [] {new HMsg(HMsg.MSG_REGIONSERVER_STOP)}; return new HMsg [] {HMsg.REGIONSERVER_STOP};
} }
HServerInfo storedInfo = serversToServerInfo.get(serverName); HServerInfo storedInfo = serversToServerInfo.get(serverName);
@ -200,7 +200,7 @@ class ServerManager implements HConstants {
// The HBaseMaster may have been restarted. // The HBaseMaster may have been restarted.
// Tell the RegionServer to start over and call regionServerStartup() // Tell the RegionServer to start over and call regionServerStartup()
return new HMsg[]{new HMsg(HMsg.MSG_CALL_SERVER_STARTUP)}; return new HMsg[]{HMsg.CALL_SERVER_STARTUP};
} else if (storedInfo.getStartCode() != serverInfo.getStartCode()) { } else if (storedInfo.getStartCode() != serverInfo.getStartCode()) {
// This state is reachable if: // This state is reachable if:
// //
@ -220,7 +220,7 @@ class ServerManager implements HConstants {
serversToServerInfo.notifyAll(); serversToServerInfo.notifyAll();
} }
return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)}; return new HMsg[]{HMsg.REGIONSERVER_STOP};
} else { } else {
return processRegionServerAllsWell(serverName, serverInfo, return processRegionServerAllsWell(serverName, serverInfo,
mostLoadedRegions, msgs); mostLoadedRegions, msgs);
@ -234,11 +234,6 @@ class ServerManager implements HConstants {
// HRegionServer is shutting down. Cancel the server's lease. // HRegionServer is shutting down. Cancel the server's lease.
// Note that canceling the server's lease takes care of updating // Note that canceling the server's lease takes care of updating
// serversToServerInfo, etc. // serversToServerInfo, etc.
if (LOG.isDebugEnabled()) {
LOG.debug("Region server " + serverName +
": MSG_REPORT_EXITING -- cancelling lease");
}
if (cancelLease(serverName)) { if (cancelLease(serverName)) {
// Only process the exit message if the server still has a lease. // Only process the exit message if the server still has a lease.
// Otherwise we could end up processing the server exit twice. // Otherwise we could end up processing the server exit twice.
@ -248,13 +243,13 @@ class ServerManager implements HConstants {
// (if we are not shutting down). // (if we are not shutting down).
if (!master.closed.get()) { if (!master.closed.get()) {
for (int i = 1; i < msgs.length; i++) { for (int i = 1; i < msgs.length; i++) {
LOG.info("Processing " + msgs[i] + " from " + serverName);
HRegionInfo info = msgs[i].getRegionInfo(); HRegionInfo info = msgs[i].getRegionInfo();
if (info.isRootRegion()) { if (info.isRootRegion()) {
master.regionManager.unassignRootRegion(); master.regionManager.unassignRootRegion();
} else if (info.isMetaTable()) { } else if (info.isMetaTable()) {
master.regionManager.offlineMetaRegion(info.getStartKey()); master.regionManager.offlineMetaRegion(info.getStartKey());
} }
if (!master.regionManager.isMarkedToClose( if (!master.regionManager.isMarkedToClose(
serverName, info.getRegionName())) { serverName, info.getRegionName())) {
master.regionManager.setUnassigned(info); master.regionManager.setUnassigned(info);
@ -262,10 +257,8 @@ class ServerManager implements HConstants {
} }
} }
} }
// We don't need to return anything to the server because it isn't // We don't need to return anything to the server because it isn't
// going to do any more work. // going to do any more work.
/* return new HMsg[0];*/
} finally { } finally {
serversToServerInfo.notifyAll(); serversToServerInfo.notifyAll();
} }
@ -328,40 +321,37 @@ class ServerManager implements HConstants {
// Get reports on what the RegionServer did. // Get reports on what the RegionServer did.
for (int i = 0; i < incomingMsgs.length; i++) { for (int i = 0; i < incomingMsgs.length; i++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received " + incomingMsgs[i] + " from " + serverName);
}
HRegionInfo region = incomingMsgs[i].getRegionInfo(); HRegionInfo region = incomingMsgs[i].getRegionInfo();
LOG.info("Received " + incomingMsgs[i] + " from " + serverName);
switch (incomingMsgs[i].getMsg()) { switch (incomingMsgs[i].getType()) {
case HMsg.MSG_REPORT_PROCESS_OPEN: case MSG_REPORT_PROCESS_OPEN:
master.regionManager.updateAssignmentDeadline(region); master.regionManager.updateAssignmentDeadline(region);
break; break;
case HMsg.MSG_REPORT_OPEN: case MSG_REPORT_OPEN:
processRegionOpen(serverName, serverInfo, region, returnMsgs); processRegionOpen(serverName, serverInfo, region, returnMsgs);
break; break;
case HMsg.MSG_REPORT_CLOSE: case MSG_REPORT_CLOSE:
processRegionClose(serverInfo, region); processRegionClose(serverInfo, region);
break; break;
case HMsg.MSG_REPORT_SPLIT: case MSG_REPORT_SPLIT:
processSplitRegion(serverName, serverInfo, region, incomingMsgs[++i], processSplitRegion(serverName, serverInfo, region, incomingMsgs[++i],
incomingMsgs[++i], returnMsgs); incomingMsgs[++i], returnMsgs);
break; break;
default: default:
throw new IOException( throw new IOException(
"Impossible state during msg processing. Instruction: " + "Impossible state during message processing. Instruction: " +
incomingMsgs[i].getMsg()); incomingMsgs[i].getType());
} }
} }
// Tell the region server to close regions that we have marked for closing. // Tell the region server to close regions that we have marked for closing.
if (regionsToKill != null) { if (regionsToKill != null) {
for (HRegionInfo i: regionsToKill.values()) { for (HRegionInfo i: regionsToKill.values()) {
returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE, i)); returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_CLOSE, i));
// Transition the region from toClose to closing state // Transition the region from toClose to closing state
master.regionManager.setClosing(i.getRegionName()); master.regionManager.setClosing(i.getRegionName());
} }
@ -391,9 +381,6 @@ class ServerManager implements HConstants {
HRegionInfo newRegionB = splitB.getRegionInfo(); HRegionInfo newRegionB = splitB.getRegionInfo();
master.regionManager.setUnassigned(newRegionB); master.regionManager.setUnassigned(newRegionB);
LOG.info("Region " + region.getRegionName() + " split; new regions: " +
newRegionA.getRegionName() + ", " + newRegionB.getRegionName());
if (region.isMetaTable()) { if (region.isMetaTable()) {
// A meta region has split. // A meta region has split.
master.regionManager.offlineMetaRegion(region.getStartKey()); master.regionManager.offlineMetaRegion(region.getStartKey());
@ -441,7 +428,8 @@ class ServerManager implements HConstants {
// Ask the server to shut it down, but don't report it as closed. // Ask the server to shut it down, but don't report it as closed.
// Otherwise the HMaster will think the Region was closed on purpose, // Otherwise the HMaster will think the Region was closed on purpose,
// and then try to reopen it elsewhere; that's not what we want. // and then try to reopen it elsewhere; that's not what we want.
returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region)); returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_CLOSE_WITHOUT_REPORT,
region, new Text("Duplicate assignment")));
} else { } else {
// it was assigned, and it's not a duplicate assignment, so take it out // it was assigned, and it's not a duplicate assignment, so take it out
// of the unassigned list. // of the unassigned list.
@ -467,9 +455,6 @@ class ServerManager implements HConstants {
} }
private void processRegionClose(HServerInfo serverInfo, HRegionInfo region) { private void processRegionClose(HServerInfo serverInfo, HRegionInfo region) {
LOG.info(serverInfo.getServerAddress().toString() + " no longer serving " +
region);
if (region.isRootRegion()) { if (region.isRootRegion()) {
// Root region // Root region
if (region.isOffline()) { if (region.isOffline()) {

View File

@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.util.InfoServer;
import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.DNS;
@ -311,10 +312,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
for(int i = 0; for(int i = 0;
!restart && !stopRequested.get() && i < msgs.length; !restart && !stopRequested.get() && i < msgs.length;
i++) { i++) {
LOG.info(msgs[i].toString()); LOG.info(msgs[i].toString());
switch(msgs[i].getMsg()) { switch(msgs[i].getType()) {
case HMsg.MSG_CALL_SERVER_STARTUP: case MSG_CALL_SERVER_STARTUP:
// We the MSG_CALL_SERVER_STARTUP on startup but we can also // We the MSG_CALL_SERVER_STARTUP on startup but we can also
// get it when the master is panicing because for instance // get it when the master is panicing because for instance
// the HDFS has been yanked out from under it. Be wary of // the HDFS has been yanked out from under it. Be wary of
@ -344,11 +344,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
} }
break; break;
case HMsg.MSG_REGIONSERVER_STOP: case MSG_REGIONSERVER_STOP:
stopRequested.set(true); stopRequested.set(true);
break; break;
case HMsg.MSG_REGIONSERVER_QUIESCE: case MSG_REGIONSERVER_QUIESCE:
if (!quiesceRequested) { if (!quiesceRequested) {
try { try {
toDo.put(new ToDoEntry(msgs[i])); toDo.put(new ToDoEntry(msgs[i]));
@ -445,11 +445,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
} }
try { try {
HMsg[] exitMsg = new HMsg[closedRegions.size() + 1]; HMsg[] exitMsg = new HMsg[closedRegions.size() + 1];
exitMsg[0] = new HMsg(HMsg.MSG_REPORT_EXITING); exitMsg[0] = HMsg.REPORT_EXITING;
// Tell the master what regions we are/were serving // Tell the master what regions we are/were serving
int i = 1; int i = 1;
for (HRegion region: closedRegions) { for (HRegion region: closedRegions) {
exitMsg[i++] = new HMsg(HMsg.MSG_REPORT_CLOSE, exitMsg[i++] = new HMsg(HMsg.Type.MSG_REPORT_CLOSE,
region.getRegionInfo()); region.getRegionInfo());
} }
@ -608,7 +608,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// This iterator is 'safe'. We are guaranteed a view on state of the // This iterator is 'safe'. We are guaranteed a view on state of the
// queue at time iterator was taken out. Apparently goes from oldest. // queue at time iterator was taken out. Apparently goes from oldest.
for (ToDoEntry e: this.toDo) { for (ToDoEntry e: this.toDo) {
if (e.msg.getMsg() == HMsg.MSG_REGION_OPEN) { if (e.msg.isType(HMsg.Type.MSG_REGION_OPEN)) {
addProcessingMessage(e.msg.getRegionInfo()); addProcessingMessage(e.msg.getRegionInfo());
} }
} }
@ -710,16 +710,22 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
return result; return result;
} }
/** Add to the outbound message buffer */ /* Add to the outbound message buffer */
private void reportOpen(HRegionInfo region) { private void reportOpen(HRegionInfo region) {
outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_OPEN, region)); outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region));
} }
/** Add to the outbound message buffer */ /* Add to the outbound message buffer */
private void reportClose(HRegionInfo region) { private void reportClose(HRegionInfo region) {
outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_CLOSE, region)); reportClose(region, null);
} }
/* Add to the outbound message buffer */
private void reportClose(final HRegionInfo region, final Text message) {
outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message));
}
/** /**
* Add to the outbound message buffer * Add to the outbound message buffer
* *
@ -733,18 +739,24 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA, void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA,
HRegionInfo newRegionB) { HRegionInfo newRegionB) {
outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_SPLIT, oldRegion)); outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT, oldRegion,
outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_OPEN, newRegionA)); new Text(oldRegion.getRegionNameAsString() + " split; daughters: " +
outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_OPEN, newRegionB)); newRegionA.getRegionNameAsString() + ", " +
newRegionB.getRegionNameAsString())));
outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionA));
outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionB));
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// HMaster-given operations // HMaster-given operations
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
/*
* Data structure to hold a HMsg and retries count.
*/
private static class ToDoEntry { private static class ToDoEntry {
int tries; private int tries;
HMsg msg; private final HMsg msg;
ToDoEntry(HMsg msg) { ToDoEntry(HMsg msg) {
this.tries = 0; this.tries = 0;
this.msg = msg; this.msg = msg;
@ -774,23 +786,23 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
continue; continue;
} }
LOG.info(e.msg); LOG.info(e.msg);
switch(e.msg.getMsg()) { switch(e.msg.getType()) {
case HMsg.MSG_REGIONSERVER_QUIESCE: case MSG_REGIONSERVER_QUIESCE:
closeUserRegions(); closeUserRegions();
break; break;
case HMsg.MSG_REGION_OPEN: case MSG_REGION_OPEN:
// Open a region // Open a region
openRegion(e.msg.getRegionInfo()); openRegion(e.msg.getRegionInfo());
break; break;
case HMsg.MSG_REGION_CLOSE: case MSG_REGION_CLOSE:
// Close a region // Close a region
closeRegion(e.msg.getRegionInfo(), true); closeRegion(e.msg.getRegionInfo(), true);
break; break;
case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT: case MSG_REGION_CLOSE_WITHOUT_REPORT:
// Close a region, don't reply // Close a region, don't reply
closeRegion(e.msg.getRegionInfo(), false); closeRegion(e.msg.getRegionInfo(), false);
break; break;
@ -854,7 +866,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// TODO: add an extra field in HRegionInfo to indicate that there is // TODO: add an extra field in HRegionInfo to indicate that there is
// an error. We can't do that now because that would be an incompatible // an error. We can't do that now because that would be an incompatible
// change that would require a migration // change that would require a migration
reportClose(regionInfo); reportClose(regionInfo, new Text(StringUtils.stringifyException(e)));
return; return;
} }
this.lock.writeLock().lock(); this.lock.writeLock().lock();
@ -876,7 +888,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* @param hri Region to add the message for * @param hri Region to add the message for
*/ */
protected void addProcessingMessage(final HRegionInfo hri) { protected void addProcessingMessage(final HRegionInfo hri) {
getOutboundMsgs().add(new HMsg(HMsg.MSG_REPORT_PROCESS_OPEN, hri)); getOutboundMsgs().add(new HMsg(HMsg.Type.MSG_REPORT_PROCESS_OPEN, hri));
} }
void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted) void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)
@ -889,7 +901,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
this.lock.writeLock().unlock(); this.lock.writeLock().unlock();
} }
if(region != null) { if (region != null) {
region.close(); region.close();
if(reportWhenCompleted) { if(reportWhenCompleted) {
reportClose(hri); reportClose(hri);
@ -954,9 +966,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
} }
this.quiesced.set(true); this.quiesced.set(true);
if (onlineRegions.size() == 0) { if (onlineRegions.size() == 0) {
outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_EXITING)); outboundMsgs.add(HMsg.REPORT_EXITING);
} else { } else {
outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_QUIESCED)); outboundMsgs.add(HMsg.REPORT_QUIESCED);
} }
} }

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.io.HbaseMapWritable;
import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Text;
/** /**
* Test HBase Writables serializations * Test HBase Writables serializations
@ -42,10 +43,17 @@ public class TestSerialization extends HBaseTestCase {
} }
public void testHMsg() throws Exception { public void testHMsg() throws Exception {
HMsg m = new HMsg(HMsg.MSG_REGIONSERVER_QUIESCE); HMsg m = HMsg.REGIONSERVER_QUIESCE;
byte [] mb = Writables.getBytes(m); byte [] mb = Writables.getBytes(m);
HMsg deserializedHMsg = (HMsg)Writables.getWritable(mb, new HMsg()); HMsg deserializedHMsg = (HMsg)Writables.getWritable(mb, new HMsg());
assertTrue(m.getMsg() == deserializedHMsg.getMsg()); assertTrue(m.equals(deserializedHMsg));
m = new HMsg(HMsg.Type.MSG_REGIONSERVER_QUIESCE,
new HRegionInfo(new HTableDescriptor(getName()),
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
new Text("Some message"));
mb = Writables.getBytes(m);
deserializedHMsg = (HMsg)Writables.getWritable(mb, new HMsg());
assertTrue(m.equals(deserializedHMsg));
} }
public void testTableDescriptor() throws Exception { public void testTableDescriptor() throws Exception {