HBASE-2190 HRS should report to master when HMsg are available

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@908849 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-02-11 05:51:42 +00:00
parent eeabb75152
commit af9891965e
5 changed files with 148 additions and 47 deletions

View File

@ -354,6 +354,7 @@ Release 0.21.0 - Unreleased
even when it's not specifically added as input on the sc
(Ferdy via Stack)
HBASE-2189 HCM trashes meta cache even when not needed
HBASE-2190 HRS should report to master when HMsg are available
NEW FEATURES
HBASE-1961 HBase EC2 scripts

View File

@ -91,6 +91,7 @@ public class HMsg implements Writable {
*
* Note that this message is immediately followed by two MSG_REPORT_OPEN
* messages, one for each of the new regions resulting from the split
* @deprecated See MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS
*/
MSG_REPORT_SPLIT,
@ -116,11 +117,21 @@ public class HMsg implements Writable {
* Run Major Compaction
*/
MSG_REGION_MAJOR_COMPACT,
/**
* Region server split the region associated with this message.
*
* Its like MSG_REPORT_SPLIT only it carries the daughters in the message
* rather than send them individually in MSG_REPORT_OPEN messages.
*/
MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS,
}
private Type type = null;
private HRegionInfo info = null;
private byte[] message = null;
private HRegionInfo daughterA = null;
private HRegionInfo daughterB = null;
/** Default constructor. Used during deserialization */
public HMsg() {
@ -153,6 +164,21 @@ public class HMsg implements Writable {
* @param msg Optional message (Stringified exception, etc.)
*/
public HMsg(final HMsg.Type type, final HRegionInfo hri, final byte[] msg) {
this(type, hri, null, null, msg);
}
/**
* Construct a message with the specified message and HRegionInfo
*
* @param type Message type
* @param hri Region to which message <code>type</code> applies. Cannot be
* null. If no info associated, used other Constructor.
* @param daughterA
* @param daughterB
* @param msg Optional message (Stringified exception, etc.)
*/
public HMsg(final HMsg.Type type, final HRegionInfo hri,
final HRegionInfo daughterA, final HRegionInfo daughterB, final byte[] msg) {
if (type == null) {
throw new NullPointerException("Message type cannot be null");
}
@ -162,6 +188,8 @@ public class HMsg implements Writable {
}
this.info = hri;
this.message = msg;
this.daughterA = daughterA;
this.daughterB = daughterB;
}
/**
@ -189,6 +217,22 @@ public class HMsg implements Writable {
return this.message;
}
/**
* @return First daughter if Type is MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS else
* null
*/
public HRegionInfo getDaughterA() {
return this.daughterA;
}
/**
* @return Second daughter if Type is MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS else
* null
*/
public HRegionInfo getDaughterB() {
return this.daughterB;
}
/**
* @see java.lang.Object#toString()
*/
@ -255,6 +299,10 @@ public class HMsg implements Writable {
out.writeBoolean(true);
Bytes.writeByteArray(out, this.message);
}
if (this.type.equals(Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS)) {
this.daughterA.write(out);
this.daughterB.write(out);
}
}
/**
@ -268,5 +316,11 @@ public class HMsg implements Writable {
if (hasMessage) {
this.message = Bytes.readByteArray(in);
}
if (this.type.equals(Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS)) {
this.daughterA = new HRegionInfo();
this.daughterB = new HRegionInfo();
this.daughterA.readFields(in);
this.daughterB.readFields(in);
}
}
}

View File

@ -436,7 +436,13 @@ public class ServerManager implements HConstants {
break;
case MSG_REPORT_SPLIT:
processSplitRegion(region, incomingMsgs[++i], incomingMsgs[++i]);
processSplitRegion(region, incomingMsgs[++i].getRegionInfo(),
incomingMsgs[++i].getRegionInfo());
break;
case MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS:
processSplitRegion(region, incomingMsgs[i].getDaughterA(),
incomingMsgs[i].getDaughterB());
break;
default:
@ -477,14 +483,14 @@ public class ServerManager implements HConstants {
* @param splitB
* @param returnMsgs
*/
private void processSplitRegion(HRegionInfo region, HMsg splitA, HMsg splitB) {
private void processSplitRegion(HRegionInfo region, HRegionInfo a, HRegionInfo b) {
synchronized (master.getRegionManager()) {
// Cancel any actions pending for the affected region.
// This prevents the master from sending a SPLIT message if the table
// has already split by the region server.
this.master.getRegionManager().endActions(region.getRegionName());
assignSplitDaughter(splitA.getRegionInfo());
assignSplitDaughter(splitB.getRegionInfo());
assignSplitDaughter(a);
assignSplitDaughter(b);
if (region.isMetaTable()) {
// A meta region has split.
this.master.getRegionManager().offlineMetaRegion(region.getStartKey());

View File

@ -116,6 +116,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
static final Log LOG = LogFactory.getLog(HRegionServer.class);
private static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING);
private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED);
private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {};
// Set when a report to the master comes back with a message asking us to
// shutdown. Also set by call to stop when debugging or running unit tests
@ -149,8 +150,8 @@ public class HRegionServer implements HConstants, HRegionInterface,
new ConcurrentHashMap<Integer, HRegion>();
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final List<HMsg> outboundMsgs =
Collections.synchronizedList(new ArrayList<HMsg>());
private final LinkedBlockingQueue<HMsg> outboundMsgs =
new LinkedBlockingQueue<HMsg>();
final int numRetries;
protected final int threadWakeFrequency;
@ -426,7 +427,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
LOG.warn("No response from master on reportForDuty. Sleeping and " +
"then trying again.");
}
HMsg outboundArray[] = null;
List<HMsg> outboundMessages = new ArrayList<HMsg>();
long lastMsg = 0;
// Now ask master what it wants us to do and tell it what we have done
for (int tries = 0; !stopRequested.get() && isHealthy();) {
@ -442,10 +443,10 @@ public class HRegionServer implements HConstants, HRegionInterface,
}
}
long now = System.currentTimeMillis();
// Send messages to the master IF this.msgInterval has elapsed OR if
// we have something to tell (and we didn't just fail sending master).
if ((now - lastMsg) >= msgInterval ||
((outboundArray == null || outboundArray.length == 0) && !this.outboundMsgs.isEmpty())) {
// Drop into the send loop if msgInterval has elapsed or if something
// to send. If we fail talking to the master, then we'll sleep below
// on poll of the outboundMsgs blockingqueue.
if ((now - lastMsg) >= msgInterval || !outboundMessages.isEmpty()) {
try {
doMetrics();
MemoryUsage memory =
@ -458,11 +459,13 @@ public class HRegionServer implements HConstants, HRegionInterface,
}
this.serverInfo.setLoad(hsl);
this.requestCount.set(0);
outboundArray = getOutboundMsgs(outboundArray);
HMsg msgs[] = hbaseMaster.regionServerReport(
serverInfo, outboundArray, getMostLoadedRegions());
addOutboundMsgs(outboundMessages);
HMsg msgs[] = this.hbaseMaster.regionServerReport(
serverInfo, outboundMessages.toArray(EMPTY_HMSG_ARRAY),
getMostLoadedRegions());
lastMsg = System.currentTimeMillis();
outboundArray = updateOutboundMsgs(outboundArray);
updateOutboundMsgs(outboundMessages);
outboundMessages.clear();
if (this.quiesced.get() && onlineRegions.size() == 0) {
// We've just told the master we're exiting because we aren't
// serving any regions. So set the stop bit and exit.
@ -565,9 +568,13 @@ public class HRegionServer implements HConstants, HRegionInterface,
lastMsg = System.currentTimeMillis();
}
}
// Do some housekeeping before going to sleep
now = System.currentTimeMillis();
HMsg msg = this.outboundMsgs.poll((msgInterval - (now - lastMsg)),
TimeUnit.MILLISECONDS);
// If we got something, add it to list of things to send.
if (msg != null) outboundMessages.add(msg);
// Do some housekeeping before going back around
housekeeping();
sleeper.sleep(lastMsg);
} // for
} catch (Throwable t) {
if (!checkOOME(t)) {
@ -655,31 +662,39 @@ public class HRegionServer implements HConstants, HRegionInterface,
}
/*
* @param msgs Current outboundMsgs array
* @return Messages to send or returns current outboundMsgs if it already had
* content to send.
* Add to the passed <code>msgs</code> messages to pass to the master.
* @param msgs Current outboundMsgs array; we'll add messages to this List.
*/
private HMsg [] getOutboundMsgs(final HMsg [] msgs) {
// If passed msgs are not null, means we haven't passed them to master yet.
if (msgs != null) return msgs;
synchronized(this.outboundMsgs) {
return this.outboundMsgs.toArray(new HMsg[outboundMsgs.size()]);
private void addOutboundMsgs(final List<HMsg> msgs) {
if (msgs.isEmpty()) {
this.outboundMsgs.drainTo(msgs);
return;
}
OUTER: for (HMsg m: this.outboundMsgs) {
for (HMsg mm: msgs) {
// Be careful don't add duplicates.
if (mm.equals(m)) {
continue OUTER;
}
}
msgs.add(m);
}
}
/*
* Remove from this.outboundMsgs those messsages we sent the master.
* @param msgs Messages we sent the master.
* @return Null
*/
private HMsg [] updateOutboundMsgs(final HMsg [] msgs) {
if (msgs == null) return null;
synchronized(this.outboundMsgs) {
for (HMsg m: msgs) {
int index = this.outboundMsgs.indexOf(m);
if (index != -1) this.outboundMsgs.remove(index);
private void updateOutboundMsgs(final List<HMsg> msgs) {
if (msgs.isEmpty()) return;
for (HMsg m: this.outboundMsgs) {
for (HMsg mm: msgs) {
if (mm.equals(m)) {
this.outboundMsgs.remove(m);
break;
}
}
}
return null;
}
/*
@ -1120,8 +1135,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
}
/*
* Run some housekeeping tasks before we go into 'hibernation' sleeping at
* the end of the main HRegionServer run loop.
* Run some housekeeping tasks.
*/
private void housekeeping() {
// If the todo list has > 0 messages, iterate looking for open region
@ -1267,7 +1281,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
/* Add to the outbound message buffer */
private void reportOpen(HRegionInfo region) {
outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region));
this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region));
}
/* Add to the outbound message buffer */
@ -1277,7 +1291,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
/* Add to the outbound message buffer */
private void reportClose(final HRegionInfo region, final byte[] message) {
outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message));
this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message));
}
/**
@ -1292,12 +1306,11 @@ public class HRegionServer implements HConstants, HRegionInterface,
*/
void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA,
HRegionInfo newRegionB) {
outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT, oldRegion,
("Daughters; " +
this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS,
oldRegion, newRegionA, newRegionB,
Bytes.toBytes("Daughters; " +
newRegionA.getRegionNameAsString() + ", " +
newRegionB.getRegionNameAsString()).getBytes()));
outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionA));
outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionB));
newRegionB.getRegionNameAsString())));
}
//////////////////////////////////////////////////////////////////////////////
@ -2240,7 +2253,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
/**
* @return Queue to which you can add outbound messages.
*/
protected List<HMsg> getOutboundMsgs() {
protected LinkedBlockingQueue<HMsg> getOutboundMsgs() {
return this.outboundMsgs;
}

View File

@ -19,13 +19,15 @@
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.util.Bytes;
import junit.framework.TestCase;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
public class TestHMsg extends TestCase {
public void testList() {
List<HMsg> msgs = new ArrayList<HMsg>();
@ -52,4 +54,29 @@ public class TestHMsg extends TestCase {
new HRegionInfo(new HTableDescriptor(Bytes.toBytes("test")), b, b));
assertNotSame(-1, msgs.indexOf(hmsg));
}
public void testSerialization() throws IOException {
// Check out new HMsg that carries two daughter split regions.
byte [] abytes = Bytes.toBytes("a");
byte [] bbytes = Bytes.toBytes("b");
byte [] parentbytes = Bytes.toBytes("parent");
HRegionInfo parent =
new HRegionInfo(new HTableDescriptor(Bytes.toBytes("parent")),
parentbytes, parentbytes);
// Assert simple HMsg serializes
HMsg hmsg = new HMsg(HMsg.Type.MSG_REGION_CLOSE, parent);
byte [] bytes = Writables.getBytes(hmsg);
HMsg close = (HMsg)Writables.getWritable(bytes, new HMsg());
assertTrue(close.equals(hmsg));
// Assert split serializes
HRegionInfo daughtera =
new HRegionInfo(new HTableDescriptor(Bytes.toBytes("a")), abytes, abytes);
HRegionInfo daughterb =
new HRegionInfo(new HTableDescriptor(Bytes.toBytes("b")), bbytes, bbytes);
HMsg splithmsg = new HMsg(HMsg.Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS,
parent, daughtera, daughterb, Bytes.toBytes("split"));
bytes = Writables.getBytes(splithmsg);
hmsg = (HMsg)Writables.getWritable(bytes, new HMsg());
assertTrue(splithmsg.equals(hmsg));
}
}