From af9891965e296f80f97ae4762d4a108b8e7fe95d Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Thu, 11 Feb 2010 05:51:42 +0000 Subject: [PATCH] HBASE-2190 HRS should report to master when HMsg are available git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@908849 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + src/java/org/apache/hadoop/hbase/HMsg.java | 56 ++++++++++- .../hadoop/hbase/master/ServerManager.java | 14 ++- .../hbase/regionserver/HRegionServer.java | 93 +++++++++++-------- .../org/apache/hadoop/hbase/TestHMsg.java | 31 ++++++- 5 files changed, 148 insertions(+), 47 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 69b30645303..fa9f4282cbc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -354,6 +354,7 @@ Release 0.21.0 - Unreleased even when it's not specifically added as input on the sc (Ferdy via Stack) HBASE-2189 HCM trashes meta cache even when not needed + HBASE-2190 HRS should report to master when HMsg are available NEW FEATURES HBASE-1961 HBase EC2 scripts diff --git a/src/java/org/apache/hadoop/hbase/HMsg.java b/src/java/org/apache/hadoop/hbase/HMsg.java index 4aad3ece4da..852140bedba 100644 --- a/src/java/org/apache/hadoop/hbase/HMsg.java +++ b/src/java/org/apache/hadoop/hbase/HMsg.java @@ -91,6 +91,7 @@ public class HMsg implements Writable { * * Note that this message is immediately followed by two MSG_REPORT_OPEN * messages, one for each of the new regions resulting from the split + * @deprecated See MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS */ MSG_REPORT_SPLIT, @@ -116,11 +117,21 @@ public class HMsg implements Writable { * Run Major Compaction */ MSG_REGION_MAJOR_COMPACT, + + /** + * Region server split the region associated with this message. + * + * Its like MSG_REPORT_SPLIT only it carries the daughters in the message + * rather than send them individually in MSG_REPORT_OPEN messages. + */ + MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS, } private Type type = null; private HRegionInfo info = null; private byte[] message = null; + private HRegionInfo daughterA = null; + private HRegionInfo daughterB = null; /** Default constructor. Used during deserialization */ public HMsg() { @@ -153,6 +164,21 @@ public class HMsg implements Writable { * @param msg Optional message (Stringified exception, etc.) */ public HMsg(final HMsg.Type type, final HRegionInfo hri, final byte[] msg) { + this(type, hri, null, null, msg); + } + + /** + * Construct a message with the specified message and HRegionInfo + * + * @param type Message type + * @param hri Region to which message type applies. Cannot be + * null. If no info associated, used other Constructor. + * @param daughterA + * @param daughterB + * @param msg Optional message (Stringified exception, etc.) + */ + public HMsg(final HMsg.Type type, final HRegionInfo hri, + final HRegionInfo daughterA, final HRegionInfo daughterB, final byte[] msg) { if (type == null) { throw new NullPointerException("Message type cannot be null"); } @@ -162,6 +188,8 @@ public class HMsg implements Writable { } this.info = hri; this.message = msg; + this.daughterA = daughterA; + this.daughterB = daughterB; } /** @@ -189,6 +217,22 @@ public class HMsg implements Writable { return this.message; } + /** + * @return First daughter if Type is MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS else + * null + */ + public HRegionInfo getDaughterA() { + return this.daughterA; + } + + /** + * @return Second daughter if Type is MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS else + * null + */ + public HRegionInfo getDaughterB() { + return this.daughterB; + } + /** * @see java.lang.Object#toString() */ @@ -255,6 +299,10 @@ public class HMsg implements Writable { out.writeBoolean(true); Bytes.writeByteArray(out, this.message); } + if (this.type.equals(Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS)) { + this.daughterA.write(out); + this.daughterB.write(out); + } } /** @@ -268,5 +316,11 @@ public class HMsg implements Writable { if (hasMessage) { this.message = Bytes.readByteArray(in); } + if (this.type.equals(Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS)) { + this.daughterA = new HRegionInfo(); + this.daughterB = new HRegionInfo(); + this.daughterA.readFields(in); + this.daughterB.readFields(in); + } } -} +} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/master/ServerManager.java b/src/java/org/apache/hadoop/hbase/master/ServerManager.java index ab5cc15fca2..0404e6ae656 100644 --- a/src/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/src/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -436,7 +436,13 @@ public class ServerManager implements HConstants { break; case MSG_REPORT_SPLIT: - processSplitRegion(region, incomingMsgs[++i], incomingMsgs[++i]); + processSplitRegion(region, incomingMsgs[++i].getRegionInfo(), + incomingMsgs[++i].getRegionInfo()); + break; + + case MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS: + processSplitRegion(region, incomingMsgs[i].getDaughterA(), + incomingMsgs[i].getDaughterB()); break; default: @@ -477,14 +483,14 @@ public class ServerManager implements HConstants { * @param splitB * @param returnMsgs */ - private void processSplitRegion(HRegionInfo region, HMsg splitA, HMsg splitB) { + private void processSplitRegion(HRegionInfo region, HRegionInfo a, HRegionInfo b) { synchronized (master.getRegionManager()) { // Cancel any actions pending for the affected region. // This prevents the master from sending a SPLIT message if the table // has already split by the region server. this.master.getRegionManager().endActions(region.getRegionName()); - assignSplitDaughter(splitA.getRegionInfo()); - assignSplitDaughter(splitB.getRegionInfo()); + assignSplitDaughter(a); + assignSplitDaughter(b); if (region.isMetaTable()) { // A meta region has split. this.master.getRegionManager().offlineMetaRegion(region.getStartKey()); diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index bbdc5ba967c..62599f9aeb3 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -116,6 +116,7 @@ public class HRegionServer implements HConstants, HRegionInterface, static final Log LOG = LogFactory.getLog(HRegionServer.class); private static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING); private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED); + private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {}; // Set when a report to the master comes back with a message asking us to // shutdown. Also set by call to stop when debugging or running unit tests @@ -149,8 +150,8 @@ public class HRegionServer implements HConstants, HRegionInterface, new ConcurrentHashMap(); protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - private final List outboundMsgs = - Collections.synchronizedList(new ArrayList()); + private final LinkedBlockingQueue outboundMsgs = + new LinkedBlockingQueue(); final int numRetries; protected final int threadWakeFrequency; @@ -426,7 +427,7 @@ public class HRegionServer implements HConstants, HRegionInterface, LOG.warn("No response from master on reportForDuty. Sleeping and " + "then trying again."); } - HMsg outboundArray[] = null; + List outboundMessages = new ArrayList(); long lastMsg = 0; // Now ask master what it wants us to do and tell it what we have done for (int tries = 0; !stopRequested.get() && isHealthy();) { @@ -442,10 +443,10 @@ public class HRegionServer implements HConstants, HRegionInterface, } } long now = System.currentTimeMillis(); - // Send messages to the master IF this.msgInterval has elapsed OR if - // we have something to tell (and we didn't just fail sending master). - if ((now - lastMsg) >= msgInterval || - ((outboundArray == null || outboundArray.length == 0) && !this.outboundMsgs.isEmpty())) { + // Drop into the send loop if msgInterval has elapsed or if something + // to send. If we fail talking to the master, then we'll sleep below + // on poll of the outboundMsgs blockingqueue. + if ((now - lastMsg) >= msgInterval || !outboundMessages.isEmpty()) { try { doMetrics(); MemoryUsage memory = @@ -458,11 +459,13 @@ public class HRegionServer implements HConstants, HRegionInterface, } this.serverInfo.setLoad(hsl); this.requestCount.set(0); - outboundArray = getOutboundMsgs(outboundArray); - HMsg msgs[] = hbaseMaster.regionServerReport( - serverInfo, outboundArray, getMostLoadedRegions()); + addOutboundMsgs(outboundMessages); + HMsg msgs[] = this.hbaseMaster.regionServerReport( + serverInfo, outboundMessages.toArray(EMPTY_HMSG_ARRAY), + getMostLoadedRegions()); lastMsg = System.currentTimeMillis(); - outboundArray = updateOutboundMsgs(outboundArray); + updateOutboundMsgs(outboundMessages); + outboundMessages.clear(); if (this.quiesced.get() && onlineRegions.size() == 0) { // We've just told the master we're exiting because we aren't // serving any regions. So set the stop bit and exit. @@ -565,9 +568,13 @@ public class HRegionServer implements HConstants, HRegionInterface, lastMsg = System.currentTimeMillis(); } } - // Do some housekeeping before going to sleep + now = System.currentTimeMillis(); + HMsg msg = this.outboundMsgs.poll((msgInterval - (now - lastMsg)), + TimeUnit.MILLISECONDS); + // If we got something, add it to list of things to send. + if (msg != null) outboundMessages.add(msg); + // Do some housekeeping before going back around housekeeping(); - sleeper.sleep(lastMsg); } // for } catch (Throwable t) { if (!checkOOME(t)) { @@ -655,31 +662,39 @@ public class HRegionServer implements HConstants, HRegionInterface, } /* - * @param msgs Current outboundMsgs array - * @return Messages to send or returns current outboundMsgs if it already had - * content to send. + * Add to the passed msgs messages to pass to the master. + * @param msgs Current outboundMsgs array; we'll add messages to this List. */ - private HMsg [] getOutboundMsgs(final HMsg [] msgs) { - // If passed msgs are not null, means we haven't passed them to master yet. - if (msgs != null) return msgs; - synchronized(this.outboundMsgs) { - return this.outboundMsgs.toArray(new HMsg[outboundMsgs.size()]); + private void addOutboundMsgs(final List msgs) { + if (msgs.isEmpty()) { + this.outboundMsgs.drainTo(msgs); + return; + } + OUTER: for (HMsg m: this.outboundMsgs) { + for (HMsg mm: msgs) { + // Be careful don't add duplicates. + if (mm.equals(m)) { + continue OUTER; + } + } + msgs.add(m); } } /* + * Remove from this.outboundMsgs those messsages we sent the master. * @param msgs Messages we sent the master. - * @return Null */ - private HMsg [] updateOutboundMsgs(final HMsg [] msgs) { - if (msgs == null) return null; - synchronized(this.outboundMsgs) { - for (HMsg m: msgs) { - int index = this.outboundMsgs.indexOf(m); - if (index != -1) this.outboundMsgs.remove(index); + private void updateOutboundMsgs(final List msgs) { + if (msgs.isEmpty()) return; + for (HMsg m: this.outboundMsgs) { + for (HMsg mm: msgs) { + if (mm.equals(m)) { + this.outboundMsgs.remove(m); + break; + } } } - return null; } /* @@ -1120,8 +1135,7 @@ public class HRegionServer implements HConstants, HRegionInterface, } /* - * Run some housekeeping tasks before we go into 'hibernation' sleeping at - * the end of the main HRegionServer run loop. + * Run some housekeeping tasks. */ private void housekeeping() { // If the todo list has > 0 messages, iterate looking for open region @@ -1267,7 +1281,7 @@ public class HRegionServer implements HConstants, HRegionInterface, /* Add to the outbound message buffer */ private void reportOpen(HRegionInfo region) { - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region)); + this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region)); } /* Add to the outbound message buffer */ @@ -1277,7 +1291,7 @@ public class HRegionServer implements HConstants, HRegionInterface, /* Add to the outbound message buffer */ private void reportClose(final HRegionInfo region, final byte[] message) { - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message)); + this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message)); } /** @@ -1292,12 +1306,11 @@ public class HRegionServer implements HConstants, HRegionInterface, */ void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA, HRegionInfo newRegionB) { - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT, oldRegion, - ("Daughters; " + - newRegionA.getRegionNameAsString() + ", " + - newRegionB.getRegionNameAsString()).getBytes())); - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionA)); - outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionB)); + this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS, + oldRegion, newRegionA, newRegionB, + Bytes.toBytes("Daughters; " + + newRegionA.getRegionNameAsString() + ", " + + newRegionB.getRegionNameAsString()))); } ////////////////////////////////////////////////////////////////////////////// @@ -2240,7 +2253,7 @@ public class HRegionServer implements HConstants, HRegionInterface, /** * @return Queue to which you can add outbound messages. */ - protected List getOutboundMsgs() { + protected LinkedBlockingQueue getOutboundMsgs() { return this.outboundMsgs; } diff --git a/src/test/org/apache/hadoop/hbase/TestHMsg.java b/src/test/org/apache/hadoop/hbase/TestHMsg.java index ec68801e540..ee3be204bdf 100644 --- a/src/test/org/apache/hadoop/hbase/TestHMsg.java +++ b/src/test/org/apache/hadoop/hbase/TestHMsg.java @@ -19,13 +19,15 @@ */ package org.apache.hadoop.hbase; +import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.hbase.util.Bytes; - import junit.framework.TestCase; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Writables; + public class TestHMsg extends TestCase { public void testList() { List msgs = new ArrayList(); @@ -52,4 +54,29 @@ public class TestHMsg extends TestCase { new HRegionInfo(new HTableDescriptor(Bytes.toBytes("test")), b, b)); assertNotSame(-1, msgs.indexOf(hmsg)); } + + public void testSerialization() throws IOException { + // Check out new HMsg that carries two daughter split regions. + byte [] abytes = Bytes.toBytes("a"); + byte [] bbytes = Bytes.toBytes("b"); + byte [] parentbytes = Bytes.toBytes("parent"); + HRegionInfo parent = + new HRegionInfo(new HTableDescriptor(Bytes.toBytes("parent")), + parentbytes, parentbytes); + // Assert simple HMsg serializes + HMsg hmsg = new HMsg(HMsg.Type.MSG_REGION_CLOSE, parent); + byte [] bytes = Writables.getBytes(hmsg); + HMsg close = (HMsg)Writables.getWritable(bytes, new HMsg()); + assertTrue(close.equals(hmsg)); + // Assert split serializes + HRegionInfo daughtera = + new HRegionInfo(new HTableDescriptor(Bytes.toBytes("a")), abytes, abytes); + HRegionInfo daughterb = + new HRegionInfo(new HTableDescriptor(Bytes.toBytes("b")), bbytes, bbytes); + HMsg splithmsg = new HMsg(HMsg.Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS, + parent, daughtera, daughterb, Bytes.toBytes("split")); + bytes = Writables.getBytes(splithmsg); + hmsg = (HMsg)Writables.getWritable(bytes, new HMsg()); + assertTrue(splithmsg.equals(hmsg)); + } }