diff --git a/CHANGES.txt b/CHANGES.txt index 9e6d7a84d1b..44284fe6be2 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -43,6 +43,8 @@ Hbase Change Log HBASE-621 Make MAX_VERSIONS work like TTL: In scans and gets, check MAX_VERSIONs setting and return that many only rather than wait on compaction (Jean-Daniel Cryans via Stack) + HBASE-504 Allow HMsg's carry a payload: e.g. exception that happened over + on the remote side. Release 0.1.2 - 05/13/2008 diff --git a/src/java/org/apache/hadoop/hbase/HMsg.java b/src/java/org/apache/hadoop/hbase/HMsg.java index 572540e515d..6bca3f644e5 100644 --- a/src/java/org/apache/hadoop/hbase/HMsg.java +++ b/src/java/org/apache/hadoop/hbase/HMsg.java @@ -24,107 +24,156 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -/******************************************************************************* +/** * HMsg is for communicating instructions between the HMaster and the * HRegionServers. - ******************************************************************************/ + * + * Most of the time the messages are simple but some messages are accompanied + * by the region affected. HMsg may also carry optional message. + */ +@SuppressWarnings("serial") public class HMsg implements Writable { - - // Messages sent from master to region server - - /** Start serving the specified region */ - public static final byte MSG_REGION_OPEN = 1; - - /** Stop serving the specified region */ - public static final byte MSG_REGION_CLOSE = 2; - - /** Region server is unknown to master. Restart */ - public static final byte MSG_CALL_SERVER_STARTUP = 4; - - /** Master tells region server to stop */ - public static final byte MSG_REGIONSERVER_STOP = 5; - - /** Stop serving the specified region and don't report back that it's closed */ - public static final byte MSG_REGION_CLOSE_WITHOUT_REPORT = 6; - - /** Stop serving user regions */ - public static final byte MSG_REGIONSERVER_QUIESCE = 7; - - // Messages sent from the region server to the master - - /** region server is now serving the specified region */ - public static final byte MSG_REPORT_OPEN = 100; - - /** region server is no longer serving the specified region */ - public static final byte MSG_REPORT_CLOSE = 101; - - /** region server is processing open request */ - public static final byte MSG_REPORT_PROCESS_OPEN = 102; - /** - * region server split the region associated with this message. - * - * note that this message is immediately followed by two MSG_REPORT_OPEN - * messages, one for each of the new regions resulting from the split + * Message types sent between master and regionservers */ - public static final byte MSG_REPORT_SPLIT = 103; + public static enum Type { + MSG_NONE, + + // Message types sent from master to region server + /** Start serving the specified region */ + MSG_REGION_OPEN, + + /** Stop serving the specified region */ + MSG_REGION_CLOSE, + + /** Region server is unknown to master. Restart */ + MSG_CALL_SERVER_STARTUP, + + /** Master tells region server to stop */ + MSG_REGIONSERVER_STOP, + + /** Stop serving the specified region and don't report back that it's + * closed + */ + MSG_REGION_CLOSE_WITHOUT_REPORT, - /** - * region server is shutting down - * - * note that this message is followed by MSG_REPORT_CLOSE messages for each - * region the region server was serving, unless it was told to quiesce. - */ - public static final byte MSG_REPORT_EXITING = 104; - - /** region server has closed all user regions but is still serving meta regions */ - public static final byte MSG_REPORT_QUIESCED = 105; + /** Stop serving user regions */ + MSG_REGIONSERVER_QUIESCE, - byte msg; - HRegionInfo info; + // Message types sent from the region server to the master + /** region server is now serving the specified region */ + MSG_REPORT_OPEN, + + /** region server is no longer serving the specified region */ + MSG_REPORT_CLOSE, + + /** region server is processing open request */ + MSG_REPORT_PROCESS_OPEN, + + /** + * Region server split the region associated with this message. + * + * Note that this message is immediately followed by two MSG_REPORT_OPEN + * messages, one for each of the new regions resulting from the split + */ + MSG_REPORT_SPLIT, + + /** + * Region server is shutting down + * + * Note that this message is followed by MSG_REPORT_CLOSE messages for each + * region the region server was serving, unless it was told to quiesce. + */ + MSG_REPORT_EXITING, + + /** Region server has closed all user regions but is still serving meta + * regions + */ + MSG_REPORT_QUIESCED, + } + + private Type type = null; + private HRegionInfo info = null; + private Text message = null; + + // Some useful statics. Use these rather than create a new HMsg each time. + public static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING); + public static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED); + public static final HMsg REGIONSERVER_QUIESCE = + new HMsg(Type.MSG_REGIONSERVER_QUIESCE); + public static final HMsg REGIONSERVER_STOP = + new HMsg(Type.MSG_REGIONSERVER_STOP); + public static final HMsg CALL_SERVER_STARTUP = + new HMsg(Type.MSG_CALL_SERVER_STARTUP); + public static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg[0]; + /** Default constructor. Used during deserialization */ public HMsg() { - this.info = new HRegionInfo(); + this(Type.MSG_NONE); } /** - * Construct a message with an empty HRegionInfo - * - * @param msg - message code + * Construct a message with the specified message and HRegionInfo + * @param type Message type */ - public HMsg(byte msg) { - this.msg = msg; - this.info = new HRegionInfo(); + public HMsg(final HMsg.Type type) { + this(type, new HRegionInfo(), null); } /** - * Construct a message with the specified message code and HRegionInfo + * Construct a message with the specified message and HRegionInfo + * @param type Message type + * @param hri Region to which message type applies + */ + public HMsg(final HMsg.Type type, final HRegionInfo hri) { + this(type, hri, null); + } + + /** + * Construct a message with the specified message and HRegionInfo * - * @param msg - message code - * @param info - HRegionInfo + * @param type Message type + * @param hri Region to which message type applies. Cannot be + * null. If no info associated, used other Constructor. + * @param msg Optional message (Stringified exception, etc.) */ - public HMsg(byte msg, HRegionInfo info) { - this.msg = msg; - this.info = info; + public HMsg(final HMsg.Type type, final HRegionInfo hri, final Text msg) { + if (type == null) { + throw new NullPointerException("Message type cannot be null"); + } + this.type = type; + if (hri == null) { + throw new NullPointerException("Region cannot be null"); + } + this.info = hri; + this.message = msg; } /** - * Accessor - * @return message code - */ - public byte getMsg() { - return msg; - } - - /** - * Accessor - * @return HRegionInfo + * @return Region info or null if none associated with this message type. */ public HRegionInfo getRegionInfo() { - return info; + return this.info; + } + + public Type getType() { + return this.type; + } + + /** + * @param other Message type to compare to + * @return True if we are of same message type as other + */ + public boolean isType(final HMsg.Type other) { + return this.type.equals(other); + } + + public Text getMessage() { + return this.message; } /** @@ -132,67 +181,37 @@ public class HMsg implements Writable { */ @Override public String toString() { - StringBuilder message = new StringBuilder(); - switch(msg) { - case MSG_REGION_OPEN: - message.append("MSG_REGION_OPEN : "); - break; - - case MSG_REGION_CLOSE: - message.append("MSG_REGION_CLOSE : "); - break; - - case MSG_CALL_SERVER_STARTUP: - message.append("MSG_CALL_SERVER_STARTUP : "); - break; - - case MSG_REGIONSERVER_STOP: - message.append("MSG_REGIONSERVER_STOP : "); - break; - - case MSG_REGION_CLOSE_WITHOUT_REPORT: - message.append("MSG_REGION_CLOSE_WITHOUT_REPORT : "); - break; - - case MSG_REGIONSERVER_QUIESCE: - message.append("MSG_REGIONSERVER_QUIESCE : "); - break; - - case MSG_REPORT_PROCESS_OPEN: - message.append("MSG_REPORT_PROCESS_OPEN : "); - break; - - case MSG_REPORT_OPEN: - message.append("MSG_REPORT_OPEN : "); - break; - - case MSG_REPORT_CLOSE: - message.append("MSG_REPORT_CLOSE : "); - break; - - case MSG_REPORT_SPLIT: - message.append("MSG_REGION_SPLIT : "); - break; - - case MSG_REPORT_EXITING: - message.append("MSG_REPORT_EXITING : "); - break; - - case MSG_REPORT_QUIESCED: - message.append("MSG_REPORT_QUIESCED : "); - break; - - default: - message.append("unknown message code ("); - message.append(msg); - message.append(") : "); - break; + StringBuilder sb = new StringBuilder(); + sb.append(this.type.toString()); + // If null or empty region, don't bother printing it out. + if (this.info != null && this.info.getRegionName().length > 0) { + sb.append(": "); + sb.append(this.info.getRegionNameAsString()); } - message.append(info == null ? "null": info.getRegionNameAsString()); - return message.toString(); + if (this.message != null && this.message.getLength() > 0) { + sb.append(": " + this.message); + } + return sb.toString(); } - ////////////////////////////////////////////////////////////////////////////// + @Override + public boolean equals(Object obj) { + HMsg that = (HMsg)obj; + return this.type.equals(that.type) && + (this.info != null)? this.info.equals(that.info): + that.info == null; + } + + @Override + public int hashCode() { + int result = this.type.hashCode(); + if (this.info != null) { + result ^= this.info.hashCode(); + } + return result; + } + + // //////////////////////////////////////////////////////////////////////////// // Writable ////////////////////////////////////////////////////////////////////////////// @@ -200,15 +219,29 @@ public class HMsg implements Writable { * {@inheritDoc} */ public void write(DataOutput out) throws IOException { - out.writeByte(msg); - info.write(out); + out.writeInt(this.type.ordinal()); + this.info.write(out); + if (this.message == null || this.message.getLength() == 0) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + this.message.write(out); + } } /** * {@inheritDoc} */ public void readFields(DataInput in) throws IOException { - this.msg = in.readByte(); + int ordinal = in.readInt(); + this.type = HMsg.Type.values()[ordinal]; this.info.readFields(in); + boolean hasMessage = in.readBoolean(); + if (hasMessage) { + if (this.message == null) { + this.message = new Text(); + } + this.message.readFields(in); + } } } \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/HRegionInfo.java b/src/java/org/apache/hadoop/hbase/HRegionInfo.java index 91f96b03696..e647f15f95e 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionInfo.java +++ b/src/java/org/apache/hadoop/hbase/HRegionInfo.java @@ -51,7 +51,7 @@ public class HRegionInfo implements WritableComparable { /** HRegionInfo for first meta region */ public static final HRegionInfo FIRST_META_REGIONINFO = new HRegionInfo(1L, HTableDescriptor.META_TABLEDESC); - + /** * Extracts table name prefix from a region name. * Presumes region names are ASCII characters only. @@ -352,6 +352,9 @@ public class HRegionInfo implements WritableComparable { */ public int compareTo(Object o) { HRegionInfo other = (HRegionInfo) o; + if (other == null) { + return 1; + } // Are regions of same table? int result = this.tableDesc.compareTo(other.tableDesc); diff --git a/src/java/org/apache/hadoop/hbase/HServerAddress.java b/src/java/org/apache/hadoop/hbase/HServerAddress.java index 60576e2b114..ae6fbf91532 100644 --- a/src/java/org/apache/hadoop/hbase/HServerAddress.java +++ b/src/java/org/apache/hadoop/hbase/HServerAddress.java @@ -178,7 +178,11 @@ public class HServerAddress implements WritableComparable { * {@inheritDoc} */ public int compareTo(Object o) { - HServerAddress other = (HServerAddress) o; - return this.toString().compareTo(other.toString()); + HServerAddress that = (HServerAddress)o; + // Addresses as Strings may not compare though address is for the one + // server with only difference being that one address has hostname + // resolved whereas other only has IP. + if (this.address.equals(that.address)) return 0; + return this.toString().compareTo(that.toString()); } -} +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/HServerInfo.java b/src/java/org/apache/hadoop/hbase/HServerInfo.java index 757decbd8f1..e3d3593abd1 100644 --- a/src/java/org/apache/hadoop/hbase/HServerInfo.java +++ b/src/java/org/apache/hadoop/hbase/HServerInfo.java @@ -23,7 +23,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; /** @@ -33,7 +33,7 @@ import org.apache.hadoop.io.Writable; * In the future it will contain information about the source machine and * load statistics. */ -public class HServerInfo implements Writable { +public class HServerInfo implements WritableComparable { private HServerAddress serverAddress; private long startCode; private HServerLoad load; @@ -116,20 +116,7 @@ public class HServerInfo implements Writable { @Override public boolean equals(Object obj) { - if (!(obj instanceof HServerInfo)) { - return false; - } - HServerInfo that = (HServerInfo)obj; - if (!this.serverAddress.equals(that.serverAddress)) { - return false; - } - if (this.infoPort != that.infoPort) { - return false; - } - if (this.startCode != that.startCode) { - return false; - } - return true; + return compareTo(obj) == 0; } @Override @@ -155,4 +142,20 @@ public class HServerInfo implements Writable { this.load.write(out); out.writeInt(this.infoPort); } + + public int compareTo(Object o) { + HServerInfo that = (HServerInfo)o; + int result = getServerAddress().compareTo(that.getServerAddress()); + if (result != 0) { + return result; + } + if (this.infoPort != that.infoPort) { + return this.infoPort - that.infoPort; + } + if (getStartCode() == that.getStartCode()) { + return 0; + } + // Startcodes are timestamps. + return (int)(getStartCode() - that.getStartCode()); + } } diff --git a/src/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java b/src/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java index 1e7c7ba6c05..975aa8af004 100644 --- a/src/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java +++ b/src/java/org/apache/hadoop/hbase/ipc/HMasterRegionInterface.java @@ -32,12 +32,15 @@ import org.apache.hadoop.hbase.HRegionInfo; * goings-on and to obtain data-handling instructions from the HMaster. */ public interface HMasterRegionInterface extends VersionedProtocol { - /** Interface version number. + /** + * Interface version number. * Version 2 was when the regionServerStartup was changed to return a - * MapWritable instead of a HbaseMapWritable. + * MapWritable instead of a HbaseMapWritable as part of HBASE-82 changes. + * Version 3 was when HMsg was refactored so it could carry optional + * messages (HBASE-504). */ - public static final long versionID = 2L; - + public static final long versionID = 3L; + /** * Called when a region server first starts * @param info @@ -46,7 +49,7 @@ public interface HMasterRegionInterface extends VersionedProtocol { * hbase rootdir, etc. */ public MapWritable regionServerStartup(HServerInfo info) throws IOException; - + /** * Called to renew lease, tell master what the region server is doing and to * receive new instructions from the master diff --git a/src/java/org/apache/hadoop/hbase/master/RegionManager.java b/src/java/org/apache/hadoop/hbase/master/RegionManager.java index bcb76ccc034..6ec94a70cc3 100644 --- a/src/java/org/apache/hadoop/hbase/master/RegionManager.java +++ b/src/java/org/apache/hadoop/hbase/master/RegionManager.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.io.Text; /** * Class to manage assigning regions to servers, state of root and meta, etc. @@ -72,6 +73,8 @@ class RegionManager implements HConstants { Collections.synchronizedSortedMap(new TreeMap(Bytes.BYTES_COMPARATOR)); + private static final Text OVERLOADED = new Text("Overloaded"); + /** * The 'unassignedRegions' table maps from a HRegionInfo to a timestamp that * indicates the last time we *tried* to assign the region to a RegionServer. @@ -255,7 +258,7 @@ class RegionManager implements HConstants { Bytes.toString(regionInfo.getRegionName())+ " to server " + serverName); unassignedRegions.put(regionInfo, Long.valueOf(now)); - returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo)); + returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_OPEN, regionInfo)); if (--nregions <= 0) { break; } @@ -380,7 +383,7 @@ class RegionManager implements HConstants { Bytes.toString(regionInfo.getRegionName()) + " to the only server " + serverName); unassignedRegions.put(regionInfo, Long.valueOf(now)); - returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo)); + returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_OPEN, regionInfo)); } } @@ -418,7 +421,8 @@ class RegionManager implements HConstants { LOG.debug("Going to close region " + currentRegion.getRegionName()); // make a message to close the region - returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE, currentRegion)); + returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_CLOSE, currentRegion, + OVERLOADED)); // mark the region as closing setClosing(currentRegion.getRegionName()); // increment the count of regions we've marked diff --git a/src/java/org/apache/hadoop/hbase/master/ServerManager.java b/src/java/org/apache/hadoop/hbase/master/ServerManager.java index 19cbb8cb876..b2740d8e46b 100644 --- a/src/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/src/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -42,13 +42,15 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Leases; import org.apache.hadoop.hbase.LeaseListener; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.io.Text; /** * The ServerManager class manages info about region servers - HServerInfo, * load numbers, dying servers, etc. */ class ServerManager implements HConstants { - static final Log LOG = LogFactory.getLog(ServerManager.class.getName()); + private static final Log LOG = + LogFactory.getLog(ServerManager.class.getName()); private final AtomicInteger quiescedServers = new AtomicInteger(0); @@ -68,10 +70,9 @@ class ServerManager implements HConstants { final Map serversToLoad = new ConcurrentHashMap(); - HMaster master; - + private HMaster master; private final Leases serverLeases; - + /** * @param master */ @@ -83,15 +84,13 @@ class ServerManager implements HConstants { /** * Let the server manager know a new regionserver has come online - * * @param serverInfo */ public void regionServerStartup(HServerInfo serverInfo) { String s = serverInfo.getServerAddress().toString().trim(); - LOG.info("received start message from: " + s); + LOG.info("Received start message from: " + s); // Do the lease check up here. There might already be one out on this - // server expecially if it just shutdown and came back up near-immediately - // after. + // server expecially if it just shutdown and came back up near-immediately. if (!master.closed.get()) { try { serverLeases.createLease(s, new ServerExpirer(s)); @@ -152,15 +151,15 @@ class ServerManager implements HConstants { * * @throws IOException */ - public HMsg [] regionServerReport(HServerInfo serverInfo, HMsg msgs[], - HRegionInfo[] mostLoadedRegions) + public HMsg [] regionServerReport(final HServerInfo serverInfo, + final HMsg msgs[], final HRegionInfo[] mostLoadedRegions) throws IOException { String serverName = serverInfo.getServerAddress().toString().trim(); if (msgs.length > 0) { - if (msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) { + if (msgs[0].isType(HMsg.Type.MSG_REPORT_EXITING)) { processRegionServerExit(serverName, msgs); - return new HMsg[0]; - } else if (msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) { + return HMsg.EMPTY_HMSG_ARRAY; + } else if (msgs[0].isType(HMsg.Type.MSG_REPORT_QUIESCED)) { LOG.info("Region server " + serverName + " quiesced"); quiescedServers.incrementAndGet(); } @@ -175,13 +174,14 @@ class ServerManager implements HConstants { } if (!master.closed.get()) { - if (msgs.length > 0 && msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) { + if (msgs.length > 0 && + msgs[0].isType(HMsg.Type.MSG_REPORT_QUIESCED)) { // Server is already quiesced, but we aren't ready to shut down // return empty response - return new HMsg[0]; + return HMsg.EMPTY_HMSG_ARRAY; } // Tell the server to stop serving any user regions - return new HMsg [] {new HMsg(HMsg.MSG_REGIONSERVER_QUIESCE)}; + return new HMsg [] {HMsg.REGIONSERVER_QUIESCE}; } } @@ -189,7 +189,7 @@ class ServerManager implements HConstants { // Tell server to shut down if we are shutting down. This should // happen after check of MSG_REPORT_EXITING above, since region server // will send us one of these messages after it gets MSG_REGIONSERVER_STOP - return new HMsg [] {new HMsg(HMsg.MSG_REGIONSERVER_STOP)}; + return new HMsg [] {HMsg.REGIONSERVER_STOP}; } HServerInfo storedInfo = serversToServerInfo.get(serverName); @@ -200,7 +200,7 @@ class ServerManager implements HConstants { // The HBaseMaster may have been restarted. // Tell the RegionServer to start over and call regionServerStartup() - return new HMsg[]{new HMsg(HMsg.MSG_CALL_SERVER_STARTUP)}; + return new HMsg[]{HMsg.CALL_SERVER_STARTUP}; } else if (storedInfo.getStartCode() != serverInfo.getStartCode()) { // This state is reachable if: // @@ -220,7 +220,7 @@ class ServerManager implements HConstants { serversToServerInfo.notifyAll(); } - return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)}; + return new HMsg[]{HMsg.REGIONSERVER_STOP}; } else { return processRegionServerAllsWell(serverName, serverInfo, mostLoadedRegions, msgs); @@ -234,11 +234,6 @@ class ServerManager implements HConstants { // HRegionServer is shutting down. Cancel the server's lease. // Note that canceling the server's lease takes care of updating // serversToServerInfo, etc. - if (LOG.isDebugEnabled()) { - LOG.debug("Region server " + serverName + - ": MSG_REPORT_EXITING -- cancelling lease"); - } - if (cancelLease(serverName)) { // Only process the exit message if the server still has a lease. // Otherwise we could end up processing the server exit twice. @@ -248,13 +243,13 @@ class ServerManager implements HConstants { // (if we are not shutting down). if (!master.closed.get()) { for (int i = 1; i < msgs.length; i++) { + LOG.info("Processing " + msgs[i] + " from " + serverName); HRegionInfo info = msgs[i].getRegionInfo(); if (info.isRootRegion()) { master.regionManager.unassignRootRegion(); } else if (info.isMetaTable()) { master.regionManager.offlineMetaRegion(info.getStartKey()); } - if (!master.regionManager.isMarkedToClose( serverName, info.getRegionName())) { master.regionManager.setUnassigned(info); @@ -262,10 +257,8 @@ class ServerManager implements HConstants { } } } - // We don't need to return anything to the server because it isn't // going to do any more work. -/* return new HMsg[0];*/ } finally { serversToServerInfo.notifyAll(); } @@ -328,40 +321,37 @@ class ServerManager implements HConstants { // Get reports on what the RegionServer did. for (int i = 0; i < incomingMsgs.length; i++) { - if (LOG.isDebugEnabled()) { - LOG.debug("Received " + incomingMsgs[i] + " from " + serverName); - } HRegionInfo region = incomingMsgs[i].getRegionInfo(); - - switch (incomingMsgs[i].getMsg()) { - case HMsg.MSG_REPORT_PROCESS_OPEN: + LOG.info("Received " + incomingMsgs[i] + " from " + serverName); + switch (incomingMsgs[i].getType()) { + case MSG_REPORT_PROCESS_OPEN: master.regionManager.updateAssignmentDeadline(region); break; - case HMsg.MSG_REPORT_OPEN: + case MSG_REPORT_OPEN: processRegionOpen(serverName, serverInfo, region, returnMsgs); break; - case HMsg.MSG_REPORT_CLOSE: + case MSG_REPORT_CLOSE: processRegionClose(serverInfo, region); break; - case HMsg.MSG_REPORT_SPLIT: + case MSG_REPORT_SPLIT: processSplitRegion(serverName, serverInfo, region, incomingMsgs[++i], incomingMsgs[++i], returnMsgs); break; default: throw new IOException( - "Impossible state during msg processing. Instruction: " + - incomingMsgs[i].getMsg()); + "Impossible state during message processing. Instruction: " + + incomingMsgs[i].getType()); } } // Tell the region server to close regions that we have marked for closing. if (regionsToKill != null) { for (HRegionInfo i: regionsToKill.values()) { - returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE, i)); + returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_CLOSE, i)); // Transition the region from toClose to closing state master.regionManager.setClosing(i.getRegionName()); } @@ -391,9 +381,6 @@ class ServerManager implements HConstants { HRegionInfo newRegionB = splitB.getRegionInfo(); master.regionManager.setUnassigned(newRegionB); - LOG.info("Region " + region.getRegionName() + " split; new regions: " + - newRegionA.getRegionName() + ", " + newRegionB.getRegionName()); - if (region.isMetaTable()) { // A meta region has split. master.regionManager.offlineMetaRegion(region.getStartKey()); @@ -441,7 +428,8 @@ class ServerManager implements HConstants { // Ask the server to shut it down, but don't report it as closed. // Otherwise the HMaster will think the Region was closed on purpose, // and then try to reopen it elsewhere; that's not what we want. - returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region)); + returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_CLOSE_WITHOUT_REPORT, + region, new Text("Duplicate assignment"))); } else { // it was assigned, and it's not a duplicate assignment, so take it out // of the unassigned list. @@ -467,9 +455,6 @@ class ServerManager implements HConstants { } private void processRegionClose(HServerInfo serverInfo, HRegionInfo region) { - LOG.info(serverInfo.getServerAddress().toString() + " no longer serving " + - region); - if (region.isRootRegion()) { // Root region if (region.isOffline()) { diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 5be3f454ceb..ba12407da27 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -80,6 +80,7 @@ import org.apache.hadoop.hbase.util.InfoServer; import org.apache.hadoop.hbase.util.Sleeper; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.net.DNS; @@ -311,10 +312,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { for(int i = 0; !restart && !stopRequested.get() && i < msgs.length; i++) { - LOG.info(msgs[i].toString()); - switch(msgs[i].getMsg()) { - case HMsg.MSG_CALL_SERVER_STARTUP: + switch(msgs[i].getType()) { + case MSG_CALL_SERVER_STARTUP: // We the MSG_CALL_SERVER_STARTUP on startup but we can also // get it when the master is panicing because for instance // the HDFS has been yanked out from under it. Be wary of @@ -344,11 +344,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } break; - case HMsg.MSG_REGIONSERVER_STOP: + case MSG_REGIONSERVER_STOP: stopRequested.set(true); break; - case HMsg.MSG_REGIONSERVER_QUIESCE: + case MSG_REGIONSERVER_QUIESCE: if (!quiesceRequested) { try { toDo.put(new ToDoEntry(msgs[i])); @@ -445,11 +445,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } try { HMsg[] exitMsg = new HMsg[closedRegions.size() + 1]; - exitMsg[0] = new HMsg(HMsg.MSG_REPORT_EXITING); + exitMsg[0] = HMsg.REPORT_EXITING; // Tell the master what regions we are/were serving int i = 1; for (HRegion region: closedRegions) { - exitMsg[i++] = new HMsg(HMsg.MSG_REPORT_CLOSE, + exitMsg[i++] = new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region.getRegionInfo()); } @@ -608,7 +608,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { // This iterator is 'safe'. We are guaranteed a view on state of the // queue at time iterator was taken out. Apparently goes from oldest. for (ToDoEntry e: this.toDo) { - if (e.msg.getMsg() == HMsg.MSG_REGION_OPEN) { + if (e.msg.isType(HMsg.Type.MSG_REGION_OPEN)) { addProcessingMessage(e.msg.getRegionInfo()); } } @@ -710,15 +710,21 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { return result; } - /** Add to the outbound message buffer */ + /* Add to the outbound message buffer */ private void reportOpen(HRegionInfo region) { - outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_OPEN, region)); + outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region)); } - /** Add to the outbound message buffer */ + /* Add to the outbound message buffer */ private void reportClose(HRegionInfo region) { - outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_CLOSE, region)); + reportClose(region, null); } + + /* Add to the outbound message buffer */ + private void reportClose(final HRegionInfo region, final Text message) { + outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message)); + } + /** * Add to the outbound message buffer @@ -733,18 +739,24 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA, HRegionInfo newRegionB) { - outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_SPLIT, oldRegion)); - outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_OPEN, newRegionA)); - outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_OPEN, newRegionB)); + outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT, oldRegion, + new Text(oldRegion.getRegionNameAsString() + " split; daughters: " + + newRegionA.getRegionNameAsString() + ", " + + newRegionB.getRegionNameAsString()))); + outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionA)); + outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionB)); } ////////////////////////////////////////////////////////////////////////////// // HMaster-given operations ////////////////////////////////////////////////////////////////////////////// + /* + * Data structure to hold a HMsg and retries count. + */ private static class ToDoEntry { - int tries; - HMsg msg; + private int tries; + private final HMsg msg; ToDoEntry(HMsg msg) { this.tries = 0; this.msg = msg; @@ -774,23 +786,23 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { continue; } LOG.info(e.msg); - switch(e.msg.getMsg()) { + switch(e.msg.getType()) { - case HMsg.MSG_REGIONSERVER_QUIESCE: + case MSG_REGIONSERVER_QUIESCE: closeUserRegions(); break; - case HMsg.MSG_REGION_OPEN: + case MSG_REGION_OPEN: // Open a region openRegion(e.msg.getRegionInfo()); break; - case HMsg.MSG_REGION_CLOSE: + case MSG_REGION_CLOSE: // Close a region closeRegion(e.msg.getRegionInfo(), true); break; - case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT: + case MSG_REGION_CLOSE_WITHOUT_REPORT: // Close a region, don't reply closeRegion(e.msg.getRegionInfo(), false); break; @@ -854,7 +866,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { // TODO: add an extra field in HRegionInfo to indicate that there is // an error. We can't do that now because that would be an incompatible // change that would require a migration - reportClose(regionInfo); + reportClose(regionInfo, new Text(StringUtils.stringifyException(e))); return; } this.lock.writeLock().lock(); @@ -876,7 +888,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { * @param hri Region to add the message for */ protected void addProcessingMessage(final HRegionInfo hri) { - getOutboundMsgs().add(new HMsg(HMsg.MSG_REPORT_PROCESS_OPEN, hri)); + getOutboundMsgs().add(new HMsg(HMsg.Type.MSG_REPORT_PROCESS_OPEN, hri)); } void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted) @@ -889,7 +901,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { this.lock.writeLock().unlock(); } - if(region != null) { + if (region != null) { region.close(); if(reportWhenCompleted) { reportClose(hri); @@ -954,9 +966,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } this.quiesced.set(true); if (onlineRegions.size() == 0) { - outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_EXITING)); + outboundMsgs.add(HMsg.REPORT_EXITING); } else { - outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_QUIESCED)); + outboundMsgs.add(HMsg.REPORT_QUIESCED); } } diff --git a/src/test/org/apache/hadoop/hbase/TestSerialization.java b/src/test/org/apache/hadoop/hbase/TestSerialization.java index a4226caab1c..bd87b12b0c6 100644 --- a/src/test/org/apache/hadoop/hbase/TestSerialization.java +++ b/src/test/org/apache/hadoop/hbase/TestSerialization.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.io.HbaseMapWritable; import org.apache.hadoop.hbase.io.RowResult; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.io.Text; /** * Test HBase Writables serializations @@ -42,10 +43,17 @@ public class TestSerialization extends HBaseTestCase { } public void testHMsg() throws Exception { - HMsg m = new HMsg(HMsg.MSG_REGIONSERVER_QUIESCE); + HMsg m = HMsg.REGIONSERVER_QUIESCE; byte [] mb = Writables.getBytes(m); HMsg deserializedHMsg = (HMsg)Writables.getWritable(mb, new HMsg()); - assertTrue(m.getMsg() == deserializedHMsg.getMsg()); + assertTrue(m.equals(deserializedHMsg)); + m = new HMsg(HMsg.Type.MSG_REGIONSERVER_QUIESCE, + new HRegionInfo(new HTableDescriptor(getName()), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY), + new Text("Some message")); + mb = Writables.getBytes(m); + deserializedHMsg = (HMsg)Writables.getWritable(mb, new HMsg()); + assertTrue(m.equals(deserializedHMsg)); } public void testTableDescriptor() throws Exception {