HADOOP-2295 Fix assigning a region to multiple servers

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@599578 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-11-29 21:10:03 +00:00
parent 6f14e60b7f
commit b8291d673e
8 changed files with 108 additions and 66 deletions

View File

@ -39,6 +39,7 @@ Trunk (unreleased changes)
may not restart)
HADOOP-2253 getRow can return HBASE::DELETEVAL cells
(Bryan Duxbury via Stack)
HADOOP-2295 Fix assigning a region to multiple servers
IMPROVEMENTS
HADOOP-2401 Add convenience put method that takes writable

View File

@ -1136,12 +1136,16 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
// Join up with all threads
try {
rootScannerThread.join(); // Wait for the root scanner to finish.
if (rootScannerThread.isAlive()) {
rootScannerThread.join(); // Wait for the root scanner to finish.
}
} catch (Exception iex) {
LOG.warn("root scanner", iex);
}
try {
metaScannerThread.join(); // Wait for meta scanner to finish.
if (metaScannerThread.isAlive()) {
metaScannerThread.join(); // Wait for meta scanner to finish.
}
} catch(Exception iex) {
LOG.warn("meta scanner", iex);
}
@ -1460,10 +1464,25 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
// Get reports on what the RegionServer did.
for (int i = 0; i < incomingMsgs.length; i++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received " + incomingMsgs[i].toString() + "from " +
serverName);
}
HRegionInfo region = incomingMsgs[i].getRegionInfo();
switch (incomingMsgs[i].getMsg()) {
case HMsg.MSG_REPORT_PROCESS_OPEN:
synchronized (this.assignAttempts) {
// Region server has acknowledged request to open region.
// Extend region open time by 1/2 max region open time.
assignAttempts.put(region.getRegionName(),
Long.valueOf(assignAttempts.get(
region.getRegionName()).longValue() +
(this.maxRegionOpenTime / 2)));
}
break;
case HMsg.MSG_REPORT_OPEN:
HRegionInfo regionInfo = unassignedRegions.get(region.getRegionName());
@ -1484,9 +1503,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
} else {
LOG.info(info.getServerAddress().toString() + " serving " +
region.getRegionName());
// Remove from unassigned list so we don't assign it to someone else
this.unassignedRegions.remove(region.getRegionName());
this.assignAttempts.remove(region.getRegionName());
if (region.getRegionName().compareTo(
HRegionInfo.rootRegionInfo.getRegionName()) == 0) {
// Store the Root Region location (in memory)
@ -1495,21 +1512,23 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
new HServerAddress(info.getServerAddress()));
this.rootRegionLocation.notifyAll();
}
break;
}
} else {
// Note that the table has been assigned and is waiting for the meta
// table to be updated.
// Note that the table has been assigned and is waiting for the meta
// table to be updated.
pendingRegions.add(region.getRegionName());
pendingRegions.add(region.getRegionName());
// Queue up an update to note the region location.
// Queue up an update to note the region location.
try {
msgQueue.put(new ProcessRegionOpen(info, region));
} catch (InterruptedException e) {
throw new RuntimeException("Putting into msgQueue was interrupted.", e);
}
try {
msgQueue.put(new ProcessRegionOpen(info, region));
} catch (InterruptedException e) {
throw new RuntimeException("Putting into msgQueue was interrupted.", e);
}
}
// Remove from unassigned list so we don't assign it to someone else
this.unassignedRegions.remove(region.getRegionName());
this.assignAttempts.remove(region.getRegionName());
}
break;

View File

@ -53,6 +53,9 @@ public class HMsg implements Writable {
/** region server is no longer serving the specified region */
public static final byte MSG_REPORT_CLOSE = 101;
/** region server is processing open request */
public static final byte MSG_REPORT_PROCESS_OPEN = 102;
/**
* region server split the region associated with this message.
@ -142,6 +145,10 @@ public class HMsg implements Writable {
message.append("MSG_REGION_CLOSE_WITHOUT_REPORT : ");
break;
case MSG_REPORT_PROCESS_OPEN:
message.append("MSG_REPORT_PROCESS_OPEN : ");
break;
case MSG_REPORT_OPEN:
message.append("MSG_REPORT_OPEN : ");
break;

View File

@ -742,6 +742,10 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
throw new RuntimeException("Putting into msgQueue was " +
"interrupted.", e);
}
if (msgs[i].getMsg() == HMsg.MSG_REGION_OPEN) {
outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_PROCESS_OPEN,
msgs[i].getRegionInfo()));
}
}
}
}
@ -982,11 +986,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* Presumption is that all closes and stops have already been called.
*/
void join() {
join(this.workerThread);
join(this.logRoller);
join(this.cacheFlusher);
join(this.compactor);
join(this.splitter);
join(this.workerThread);
}
private void join(final Thread t) {
@ -1161,8 +1165,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
} finally {
this.lock.writeLock().unlock();
}
reportOpen(region);
}
reportOpen(region);
}
void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)

View File

@ -108,11 +108,13 @@ public class Leases {
public void close() {
LOG.info(Thread.currentThread().getName() + " closing leases");
this.stop.set(true);
try {
this.leaseMonitorThread.interrupt();
this.leaseMonitorThread.join();
} catch (InterruptedException iex) {
// Ignore
while (this.leaseMonitorThread.isAlive()) {
try {
this.leaseMonitorThread.interrupt();
this.leaseMonitorThread.join();
} catch (InterruptedException iex) {
// Ignore
}
}
synchronized(leases) {
synchronized(sortedLeases) {
@ -211,10 +213,16 @@ public class Leases {
* Its a daemon thread.
*/
class LeaseMonitor extends Chore {
/**
* @param p
* @param s
*/
public LeaseMonitor(int p, AtomicBoolean s) {
super(p, s);
}
/** {@inheritDoc} */
@Override
protected void chore() {
synchronized(leases) {
synchronized(sortedLeases) {

View File

@ -53,7 +53,9 @@ public class LocalHBaseCluster implements HConstants {
private final HMaster master;
private final List<RegionServerThread> regionThreads;
private final static int DEFAULT_NO = 1;
/** local mode */
public static final String LOCAL = "local";
/** 'local:' */
public static final String LOCAL_COLON = LOCAL + ":";
private final HBaseConfiguration conf;
@ -146,12 +148,14 @@ public class LocalHBaseCluster implements HConstants {
public String waitOnRegionServer(int serverNumber) {
RegionServerThread regionServerThread =
this.regionThreads.remove(serverNumber);
try {
LOG.info("Waiting on " +
regionServerThread.getRegionServer().serverInfo.toString());
regionServerThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
while (regionServerThread.isAlive()) {
try {
LOG.info("Waiting on " +
regionServerThread.getRegionServer().serverInfo.toString());
regionServerThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return regionServerThread.getName();
}
@ -217,10 +221,12 @@ public class LocalHBaseCluster implements HConstants {
}
}
if (this.master != null) {
try {
this.master.join();
} catch(InterruptedException e) {
// continue
while (this.master.isAlive()) {
try {
this.master.join();
} catch(InterruptedException e) {
// continue
}
}
}
LOG.info("Shutdown " +

View File

@ -22,19 +22,10 @@ package org.apache.hadoop.hbase;
import junit.framework.TestSuite;
import junit.textui.TestRunner;
import java.io.PrintWriter;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Test ability of HBase to handle DFS failure
*/
public class DFSAbort extends HBaseClusterTestCase {
private static final Log LOG =
LogFactory.getLog(DFSAbort.class.getName());
/** constructor */
public DFSAbort() {
super();
@ -66,8 +57,6 @@ public class DFSAbort extends HBaseClusterTestCase {
// By now the Mini DFS is running, Mini HBase is running and we have
// created a table. Now let's yank the rug out from HBase
cluster.getDFSCluster().shutdown();
// Now wait for Mini HBase Cluster to shut down
// cluster.join();
threadDumpingJoin();
} catch (Exception e) {
e.printStackTrace();

View File

@ -127,33 +127,41 @@ public class TestLogRolling extends HBaseTestCase {
this.server = cluster.getRegionThreads().get(0).getRegionServer();
this.log = server.getLog();
// When the META table can be opened, the region servers are running
@SuppressWarnings("unused")
HTable meta = new HTable(conf, HConstants.META_TABLE_NAME);
// Create the test table and open it
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY.toString()));
HBaseAdmin admin = new HBaseAdmin(conf);
admin.createTable(desc);
HTable table = new HTable(conf, new Text(tableName));
try {
for (int i = 1; i <= 2048; i++) { // 2048 writes should cause 8 log rolls
long lockid =
table.startUpdate(new Text("row" + String.format("%1$04d", i)));
table.put(lockid, HConstants.COLUMN_FAMILY, value);
table.commit(lockid);
if (i % 256 == 0) {
// After every 256 writes sleep to let the log roller run
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// continue
// Create the test table and open it
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY.toString()));
HBaseAdmin admin = new HBaseAdmin(conf);
admin.createTable(desc);
HTable table = new HTable(conf, new Text(tableName));
try {
for (int i = 1; i <= 2048; i++) { // 2048 writes should cause 8 log rolls
long lockid =
table.startUpdate(new Text("row" + String.format("%1$04d", i)));
table.put(lockid, HConstants.COLUMN_FAMILY, value);
table.commit(lockid);
if (i % 256 == 0) {
// After every 256 writes sleep to let the log roller run
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// continue
}
}
}
} finally {
table.close();
}
} finally {
meta.close();
}
}