From b8291d673e065fdc24d82aca9e1e4e110e8d81c2 Mon Sep 17 00:00:00 2001 From: Jim Kellerman Date: Thu, 29 Nov 2007 21:10:03 +0000 Subject: [PATCH] HADOOP-2295 Fix assigning a region to multiple servers git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@599578 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + src/java/org/apache/hadoop/hbase/HMaster.java | 53 +++++++++++++------ src/java/org/apache/hadoop/hbase/HMsg.java | 7 +++ .../apache/hadoop/hbase/HRegionServer.java | 8 ++- src/java/org/apache/hadoop/hbase/Leases.java | 18 +++++-- .../hadoop/hbase/LocalHBaseCluster.java | 26 +++++---- .../org/apache/hadoop/hbase/DFSAbort.java | 11 ---- .../apache/hadoop/hbase/TestLogRolling.java | 50 +++++++++-------- 8 files changed, 108 insertions(+), 66 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 3c81729b59a..476a99522c1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -39,6 +39,7 @@ Trunk (unreleased changes) may not restart) HADOOP-2253 getRow can return HBASE::DELETEVAL cells (Bryan Duxbury via Stack) + HADOOP-2295 Fix assigning a region to multiple servers IMPROVEMENTS HADOOP-2401 Add convenience put method that takes writable diff --git a/src/java/org/apache/hadoop/hbase/HMaster.java b/src/java/org/apache/hadoop/hbase/HMaster.java index d2e930d90b7..d5424d36a54 100644 --- a/src/java/org/apache/hadoop/hbase/HMaster.java +++ b/src/java/org/apache/hadoop/hbase/HMaster.java @@ -1136,12 +1136,16 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, // Join up with all threads try { - rootScannerThread.join(); // Wait for the root scanner to finish. + if (rootScannerThread.isAlive()) { + rootScannerThread.join(); // Wait for the root scanner to finish. + } } catch (Exception iex) { LOG.warn("root scanner", iex); } try { - metaScannerThread.join(); // Wait for meta scanner to finish. + if (metaScannerThread.isAlive()) { + metaScannerThread.join(); // Wait for meta scanner to finish. + } } catch(Exception iex) { LOG.warn("meta scanner", iex); } @@ -1460,10 +1464,25 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, // Get reports on what the RegionServer did. for (int i = 0; i < incomingMsgs.length; i++) { + if (LOG.isDebugEnabled()) { + LOG.debug("Received " + incomingMsgs[i].toString() + "from " + + serverName); + } HRegionInfo region = incomingMsgs[i].getRegionInfo(); switch (incomingMsgs[i].getMsg()) { + case HMsg.MSG_REPORT_PROCESS_OPEN: + synchronized (this.assignAttempts) { + // Region server has acknowledged request to open region. + // Extend region open time by 1/2 max region open time. + assignAttempts.put(region.getRegionName(), + Long.valueOf(assignAttempts.get( + region.getRegionName()).longValue() + + (this.maxRegionOpenTime / 2))); + } + break; + case HMsg.MSG_REPORT_OPEN: HRegionInfo regionInfo = unassignedRegions.get(region.getRegionName()); @@ -1484,9 +1503,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, } else { LOG.info(info.getServerAddress().toString() + " serving " + region.getRegionName()); - // Remove from unassigned list so we don't assign it to someone else - this.unassignedRegions.remove(region.getRegionName()); - this.assignAttempts.remove(region.getRegionName()); + if (region.getRegionName().compareTo( HRegionInfo.rootRegionInfo.getRegionName()) == 0) { // Store the Root Region location (in memory) @@ -1495,21 +1512,23 @@ public class HMaster extends Thread implements HConstants, HMasterInterface, new HServerAddress(info.getServerAddress())); this.rootRegionLocation.notifyAll(); } - break; - } + } else { + // Note that the table has been assigned and is waiting for the meta + // table to be updated. - // Note that the table has been assigned and is waiting for the meta - // table to be updated. + pendingRegions.add(region.getRegionName()); - pendingRegions.add(region.getRegionName()); + // Queue up an update to note the region location. - // Queue up an update to note the region location. - - try { - msgQueue.put(new ProcessRegionOpen(info, region)); - } catch (InterruptedException e) { - throw new RuntimeException("Putting into msgQueue was interrupted.", e); - } + try { + msgQueue.put(new ProcessRegionOpen(info, region)); + } catch (InterruptedException e) { + throw new RuntimeException("Putting into msgQueue was interrupted.", e); + } + } + // Remove from unassigned list so we don't assign it to someone else + this.unassignedRegions.remove(region.getRegionName()); + this.assignAttempts.remove(region.getRegionName()); } break; diff --git a/src/java/org/apache/hadoop/hbase/HMsg.java b/src/java/org/apache/hadoop/hbase/HMsg.java index 21e118f1f7b..488ff8f5ef9 100644 --- a/src/java/org/apache/hadoop/hbase/HMsg.java +++ b/src/java/org/apache/hadoop/hbase/HMsg.java @@ -53,6 +53,9 @@ public class HMsg implements Writable { /** region server is no longer serving the specified region */ public static final byte MSG_REPORT_CLOSE = 101; + + /** region server is processing open request */ + public static final byte MSG_REPORT_PROCESS_OPEN = 102; /** * region server split the region associated with this message. @@ -142,6 +145,10 @@ public class HMsg implements Writable { message.append("MSG_REGION_CLOSE_WITHOUT_REPORT : "); break; + case MSG_REPORT_PROCESS_OPEN: + message.append("MSG_REPORT_PROCESS_OPEN : "); + break; + case MSG_REPORT_OPEN: message.append("MSG_REPORT_OPEN : "); break; diff --git a/src/java/org/apache/hadoop/hbase/HRegionServer.java b/src/java/org/apache/hadoop/hbase/HRegionServer.java index db11f31bc44..74c3c7e2176 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/HRegionServer.java @@ -742,6 +742,10 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { throw new RuntimeException("Putting into msgQueue was " + "interrupted.", e); } + if (msgs[i].getMsg() == HMsg.MSG_REGION_OPEN) { + outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_PROCESS_OPEN, + msgs[i].getRegionInfo())); + } } } } @@ -982,11 +986,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { * Presumption is that all closes and stops have already been called. */ void join() { - join(this.workerThread); join(this.logRoller); join(this.cacheFlusher); join(this.compactor); join(this.splitter); + join(this.workerThread); } private void join(final Thread t) { @@ -1161,8 +1165,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { } finally { this.lock.writeLock().unlock(); } + reportOpen(region); } - reportOpen(region); } void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted) diff --git a/src/java/org/apache/hadoop/hbase/Leases.java b/src/java/org/apache/hadoop/hbase/Leases.java index c3219d4ce75..57d28b2fac9 100644 --- a/src/java/org/apache/hadoop/hbase/Leases.java +++ b/src/java/org/apache/hadoop/hbase/Leases.java @@ -108,11 +108,13 @@ public class Leases { public void close() { LOG.info(Thread.currentThread().getName() + " closing leases"); this.stop.set(true); - try { - this.leaseMonitorThread.interrupt(); - this.leaseMonitorThread.join(); - } catch (InterruptedException iex) { - // Ignore + while (this.leaseMonitorThread.isAlive()) { + try { + this.leaseMonitorThread.interrupt(); + this.leaseMonitorThread.join(); + } catch (InterruptedException iex) { + // Ignore + } } synchronized(leases) { synchronized(sortedLeases) { @@ -211,10 +213,16 @@ public class Leases { * Its a daemon thread. */ class LeaseMonitor extends Chore { + /** + * @param p + * @param s + */ public LeaseMonitor(int p, AtomicBoolean s) { super(p, s); } + /** {@inheritDoc} */ + @Override protected void chore() { synchronized(leases) { synchronized(sortedLeases) { diff --git a/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java b/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java index 8a2c50f68c5..151f5420062 100644 --- a/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java +++ b/src/java/org/apache/hadoop/hbase/LocalHBaseCluster.java @@ -53,7 +53,9 @@ public class LocalHBaseCluster implements HConstants { private final HMaster master; private final List regionThreads; private final static int DEFAULT_NO = 1; + /** local mode */ public static final String LOCAL = "local"; + /** 'local:' */ public static final String LOCAL_COLON = LOCAL + ":"; private final HBaseConfiguration conf; @@ -146,12 +148,14 @@ public class LocalHBaseCluster implements HConstants { public String waitOnRegionServer(int serverNumber) { RegionServerThread regionServerThread = this.regionThreads.remove(serverNumber); - try { - LOG.info("Waiting on " + - regionServerThread.getRegionServer().serverInfo.toString()); - regionServerThread.join(); - } catch (InterruptedException e) { - e.printStackTrace(); + while (regionServerThread.isAlive()) { + try { + LOG.info("Waiting on " + + regionServerThread.getRegionServer().serverInfo.toString()); + regionServerThread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } } return regionServerThread.getName(); } @@ -217,10 +221,12 @@ public class LocalHBaseCluster implements HConstants { } } if (this.master != null) { - try { - this.master.join(); - } catch(InterruptedException e) { - // continue + while (this.master.isAlive()) { + try { + this.master.join(); + } catch(InterruptedException e) { + // continue + } } } LOG.info("Shutdown " + diff --git a/src/test/org/apache/hadoop/hbase/DFSAbort.java b/src/test/org/apache/hadoop/hbase/DFSAbort.java index a9c553e3132..4a30a75ac40 100644 --- a/src/test/org/apache/hadoop/hbase/DFSAbort.java +++ b/src/test/org/apache/hadoop/hbase/DFSAbort.java @@ -22,19 +22,10 @@ package org.apache.hadoop.hbase; import junit.framework.TestSuite; import junit.textui.TestRunner; -import java.io.PrintWriter; -import org.apache.hadoop.util.ReflectionUtils; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - /** * Test ability of HBase to handle DFS failure */ public class DFSAbort extends HBaseClusterTestCase { - private static final Log LOG = - LogFactory.getLog(DFSAbort.class.getName()); - /** constructor */ public DFSAbort() { super(); @@ -66,8 +57,6 @@ public class DFSAbort extends HBaseClusterTestCase { // By now the Mini DFS is running, Mini HBase is running and we have // created a table. Now let's yank the rug out from HBase cluster.getDFSCluster().shutdown(); - // Now wait for Mini HBase Cluster to shut down -// cluster.join(); threadDumpingJoin(); } catch (Exception e) { e.printStackTrace(); diff --git a/src/test/org/apache/hadoop/hbase/TestLogRolling.java b/src/test/org/apache/hadoop/hbase/TestLogRolling.java index ce7dd68dbc8..e382cdbf32e 100644 --- a/src/test/org/apache/hadoop/hbase/TestLogRolling.java +++ b/src/test/org/apache/hadoop/hbase/TestLogRolling.java @@ -127,33 +127,41 @@ public class TestLogRolling extends HBaseTestCase { this.server = cluster.getRegionThreads().get(0).getRegionServer(); this.log = server.getLog(); - + // When the META table can be opened, the region servers are running - @SuppressWarnings("unused") HTable meta = new HTable(conf, HConstants.META_TABLE_NAME); - // Create the test table and open it - HTableDescriptor desc = new HTableDescriptor(tableName); - desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY.toString())); - HBaseAdmin admin = new HBaseAdmin(conf); - admin.createTable(desc); - HTable table = new HTable(conf, new Text(tableName)); + try { - for (int i = 1; i <= 2048; i++) { // 2048 writes should cause 8 log rolls - long lockid = - table.startUpdate(new Text("row" + String.format("%1$04d", i))); - table.put(lockid, HConstants.COLUMN_FAMILY, value); - table.commit(lockid); - - if (i % 256 == 0) { - // After every 256 writes sleep to let the log roller run - - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - // continue + // Create the test table and open it + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY.toString())); + HBaseAdmin admin = new HBaseAdmin(conf); + admin.createTable(desc); + HTable table = new HTable(conf, new Text(tableName)); + + try { + for (int i = 1; i <= 2048; i++) { // 2048 writes should cause 8 log rolls + long lockid = + table.startUpdate(new Text("row" + String.format("%1$04d", i))); + table.put(lockid, HConstants.COLUMN_FAMILY, value); + table.commit(lockid); + + if (i % 256 == 0) { + // After every 256 writes sleep to let the log roller run + + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + // continue + } + } } + } finally { + table.close(); } + } finally { + meta.close(); } }