From 0c7ac6795f6f37dc014341efe184998ccbeb59bc Mon Sep 17 00:00:00 2001 From: Jim Kellerman Date: Tue, 14 Aug 2007 03:37:01 +0000 Subject: [PATCH] HADOOP-1678 On region split, master should designate which host should serve daughter splits. Phase 2: Master assigns children of split region instead of HRegionServer serving both children. git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@565616 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 4 + .../org/apache/hadoop/hbase/HBaseAdmin.java | 18 +- .../hadoop/hbase/HConnectionManager.java | 19 +- .../org/apache/hadoop/hbase/HConstants.java | 5 + src/java/org/apache/hadoop/hbase/HLog.java | 2 +- src/java/org/apache/hadoop/hbase/HMaster.java | 183 ++++++------ src/java/org/apache/hadoop/hbase/HMerge.java | 6 +- src/java/org/apache/hadoop/hbase/HRegion.java | 269 +----------------- .../org/apache/hadoop/hbase/HRegionInfo.java | 13 - .../apache/hadoop/hbase/HRegionServer.java | 89 +++--- .../apache/hadoop/hbase/HRegiondirReader.java | 4 +- .../org/apache/hadoop/hbase/HStoreFile.java | 4 +- .../org/apache/hadoop/hbase/HStoreKey.java | 13 + src/java/org/apache/hadoop/hbase/HTable.java | 12 +- src/java/org/apache/hadoop/hbase/Leases.java | 32 +-- .../org/apache/hadoop/hbase/util/Keying.java | 65 ----- .../apache/hadoop/hbase/util/Writables.java | 72 ++++- src/test/org/apache/hadoop/hbase/TestGet.java | 19 +- .../org/apache/hadoop/hbase/TestScanner.java | 22 +- .../org/apache/hadoop/hbase/TestScanner2.java | 64 ++++- .../org/apache/hadoop/hbase/TestSplit.java | 135 +++++---- 21 files changed, 454 insertions(+), 596 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 052a78ebfb1..ff24570c556 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -89,3 +89,7 @@ Trunk (unreleased changes) 56. HADOOP-1678 On region split, master should designate which host should serve daughter splits. Phase 1: Master balances load for new regions and when a region server fails. + 57. HADOOP-1678 On region split, master should designate which host should + serve daughter splits. Phase 2: Master assigns children of split region + instead of HRegionServer serving both children. + diff --git a/src/java/org/apache/hadoop/hbase/HBaseAdmin.java b/src/java/org/apache/hadoop/hbase/HBaseAdmin.java index 6a9b7baf07a..b99e17602f1 100644 --- a/src/java/org/apache/hadoop/hbase/HBaseAdmin.java +++ b/src/java/org/apache/hadoop/hbase/HBaseAdmin.java @@ -28,10 +28,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.io.KeyedData; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.hbase.util.Writables; + /** * Provides administrative functions for HBase */ @@ -170,7 +171,6 @@ public class HBaseAdmin implements HConstants { // Wait until first region is deleted HRegionInterface server = connection.getHRegionConnection(firstMetaServer.getServerAddress()); - DataInputBuffer inbuf = new DataInputBuffer(); HRegionInfo info = new HRegionInfo(); for (int tries = 0; tries < numRetries; tries++) { long scannerId = -1L; @@ -185,8 +185,8 @@ public class HBaseAdmin implements HConstants { boolean found = false; for (int j = 0; j < values.length; j++) { if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) { - inbuf.reset(values[j].getData(), values[j].getData().length); - info.readFields(inbuf); + info = + (HRegionInfo) Writables.getWritable(values[j].getData(), info); if (info.tableDesc.getName().equals(tableName)) { found = true; } @@ -249,7 +249,6 @@ public class HBaseAdmin implements HConstants { HRegionInterface server = connection.getHRegionConnection(firstMetaServer.getServerAddress()); - DataInputBuffer inbuf = new DataInputBuffer(); HRegionInfo info = new HRegionInfo(); for (int tries = 0; tries < numRetries; tries++) { int valuesfound = 0; @@ -272,8 +271,8 @@ public class HBaseAdmin implements HConstants { valuesfound += 1; for (int j = 0; j < values.length; j++) { if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) { - inbuf.reset(values[j].getData(), values[j].getData().length); - info.readFields(inbuf); + info = + (HRegionInfo) Writables.getWritable(values[j].getData(), info); isenabled = !info.offLine; break; } @@ -349,7 +348,6 @@ public class HBaseAdmin implements HConstants { HRegionInterface server = connection.getHRegionConnection(firstMetaServer.getServerAddress()); - DataInputBuffer inbuf = new DataInputBuffer(); HRegionInfo info = new HRegionInfo(); for(int tries = 0; tries < numRetries; tries++) { int valuesfound = 0; @@ -371,8 +369,8 @@ public class HBaseAdmin implements HConstants { valuesfound += 1; for (int j = 0; j < values.length; j++) { if (values[j].getKey().getColumn().equals(COL_REGIONINFO)) { - inbuf.reset(values[j].getData(), values[j].getData().length); - info.readFields(inbuf); + info = + (HRegionInfo) Writables.getWritable(values[j].getData(), info); disabled = info.offLine; break; } diff --git a/src/java/org/apache/hadoop/hbase/HConnectionManager.java b/src/java/org/apache/hadoop/hbase/HConnectionManager.java index 629177f9f07..a79f045b6f8 100644 --- a/src/java/org/apache/hadoop/hbase/HConnectionManager.java +++ b/src/java/org/apache/hadoop/hbase/HConnectionManager.java @@ -34,10 +34,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.hbase.io.KeyedData; +import org.apache.hadoop.hbase.util.Writables; /** * A non-instantiable class that manages connections to multiple tables in @@ -237,7 +237,6 @@ public class HConnectionManager implements HConstants { COLUMN_FAMILY_ARRAY, EMPTY_START_ROW, System.currentTimeMillis(), null); - DataInputBuffer inbuf = new DataInputBuffer(); while (true) { KeyedData[] values = server.next(scannerId); if (values.length == 0) { @@ -245,9 +244,9 @@ public class HConnectionManager implements HConstants { } for (int i = 0; i < values.length; i++) { if (values[i].getKey().getColumn().equals(COL_REGIONINFO)) { - inbuf.reset(values[i].getData(), values[i].getData().length); - HRegionInfo info = new HRegionInfo(); - info.readFields(inbuf); + HRegionInfo info = + (HRegionInfo) Writables.getWritable(values[i].getData(), + new HRegionInfo()); // Only examine the rows where the startKey is zero length if (info.startKey.getLength() == 0) { @@ -658,7 +657,6 @@ public class HConnectionManager implements HConstants { server.openScanner(t.getRegionInfo().getRegionName(), COLUMN_FAMILY_ARRAY, tableName, System.currentTimeMillis(), null); - DataInputBuffer inbuf = new DataInputBuffer(); while (true) { HRegionInfo regionInfo = null; String serverAddress = null; @@ -684,9 +682,8 @@ public class HConnectionManager implements HConstants { results.put(values[i].getKey().getColumn(), values[i].getData()); } regionInfo = new HRegionInfo(); - bytes = results.get(COL_REGIONINFO); - inbuf.reset(bytes, bytes.length); - regionInfo.readFields(inbuf); + regionInfo = (HRegionInfo) Writables.getWritable( + results.get(COL_REGIONINFO), regionInfo); if (!regionInfo.tableDesc.getName().equals(tableName)) { // We're done @@ -697,7 +694,7 @@ public class HConnectionManager implements HConstants { break; } - if (regionInfo.offLine) { + if (regionInfo.isOffline() && !regionInfo.isSplit()) { throw new IllegalStateException("table offline: " + tableName); } @@ -710,7 +707,7 @@ public class HConnectionManager implements HConstants { servers.clear(); break; } - serverAddress = new String(bytes, UTF8_ENCODING); + serverAddress = Writables.bytesToString(bytes); servers.put(regionInfo.startKey, new HRegionLocation( regionInfo, new HServerAddress(serverAddress))); } diff --git a/src/java/org/apache/hadoop/hbase/HConstants.java b/src/java/org/apache/hadoop/hbase/HConstants.java index 453fda10f69..17845939179 100644 --- a/src/java/org/apache/hadoop/hbase/HConstants.java +++ b/src/java/org/apache/hadoop/hbase/HConstants.java @@ -120,6 +120,11 @@ public interface HConstants { /** ROOT/META column family member - contains server start code (a long) */ static final Text COL_STARTCODE = new Text(COLUMN_FAMILY + "serverstartcode"); + /** the lower half of a split region */ + static final Text COL_SPLITA = new Text(COLUMN_FAMILY_STR + "splitA"); + + /** the upper half of a split region */ + static final Text COL_SPLITB = new Text(COLUMN_FAMILY_STR + "splitB"); // Other constants /** used by scanners, etc when they want to start at the beginning of a region */ diff --git a/src/java/org/apache/hadoop/hbase/HLog.java b/src/java/org/apache/hadoop/hbase/HLog.java index a80038c1bae..d51a71ee753 100644 --- a/src/java/org/apache/hadoop/hbase/HLog.java +++ b/src/java/org/apache/hadoop/hbase/HLog.java @@ -97,7 +97,7 @@ public class HLog implements HConstants { */ static void splitLog(Path rootDir, Path srcDir, FileSystem fs, Configuration conf) throws IOException { - Path logfiles[] = fs.listPaths(srcDir); + Path logfiles[] = fs.listPaths(new Path[] {srcDir}); LOG.info("splitting " + logfiles.length + " log(s) in " + srcDir.toString()); HashMap logWriters = diff --git a/src/java/org/apache/hadoop/hbase/HMaster.java b/src/java/org/apache/hadoop/hbase/HMaster.java index 61c44f4aa29..bfe24aad76c 100644 --- a/src/java/org/apache/hadoop/hbase/HMaster.java +++ b/src/java/org/apache/hadoop/hbase/HMaster.java @@ -19,8 +19,6 @@ */ package org.apache.hadoop.hbase; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; @@ -46,15 +44,16 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.io.KeyedData; -import org.apache.hadoop.hbase.util.Keying; import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.hbase.io.BatchUpdate; + /** * HMaster is the "master server" for a HBase. @@ -208,9 +207,11 @@ HMasterRegionInterface, Runnable { results.put(values[i].getKey().getColumn(), values[i].getData()); } - HRegionInfo info = HRegion.getRegionInfo(results); - String serverName = HRegion.getServerName(results); - long startCode = HRegion.getStartCode(results); + HRegionInfo info = (HRegionInfo) Writables.getWritable( + results.get(COL_REGIONINFO), new HRegionInfo()); + + String serverName = Writables.bytesToString(results.get(COL_SERVER)); + long startCode = Writables.bytesToLong(results.get(COL_STARTCODE)); if (LOG.isDebugEnabled()) { LOG.debug(Thread.currentThread().getName() + " scanner: " + @@ -263,9 +264,11 @@ HMasterRegionInterface, Runnable { splitParents.entrySet()) { TreeMap results = e.getValue(); - cleanupSplits(e.getKey(), - HRegion.getSplit(results, HRegion.COL_SPLITA), - HRegion.getSplit(results, HRegion.COL_SPLITB)); + cleanupSplits(region.regionName, regionServer, e.getKey(), + (HRegionInfo) Writables.getWritable(results.get(COL_SPLITA), + new HRegionInfo()), + (HRegionInfo) Writables.getWritable(results.get(COL_SPLITB), + new HRegionInfo())); } } LOG.info(Thread.currentThread().getName() + " scan of meta region " + @@ -286,17 +289,19 @@ HMasterRegionInterface, Runnable { return true; } - /* - * @param info - * @param splitA - * @param splitB + /** + * @param metaRegionName + * @param server HRegionInterface of meta server to talk to + * @param info HRegionInfo of split parent + * @param splitA low key range child region + * @param splitB upper key range child region * @return True if we removed info and this region has * been cleaned up. * @throws IOException */ - private boolean cleanupSplits(final HRegionInfo info, - final HRegionInfo splitA, final HRegionInfo splitB) - throws IOException { + private boolean cleanupSplits(final Text metaRegionName, + final HRegionInterface server, final HRegionInfo info, + final HRegionInfo splitA, final HRegionInfo splitB) throws IOException { boolean result = false; if (LOG.isDebugEnabled()) { @@ -308,11 +313,11 @@ HMasterRegionInterface, Runnable { if (!noReferencesA) { noReferencesA = - hasReferences(info.getRegionName(), splitA, HRegion.COL_SPLITA); + hasReferences(metaRegionName, server, info.getRegionName(), splitA, COL_SPLITA); } if (!noReferencesB) { noReferencesB = - hasReferences(info.getRegionName(), splitB, HRegion.COL_SPLITB); + hasReferences(metaRegionName, server, info.getRegionName(), splitB, COL_SPLITB); } if (!(noReferencesA && noReferencesB)) { @@ -322,9 +327,16 @@ HMasterRegionInterface, Runnable { LOG.info("Deleting region " + info.getRegionName() + " because daughter splits no longer hold references"); - HRegion.deleteRegion(fs, dir, info.getRegionName()); - HRegion.removeRegionFromMETA(conf, this.tableName, - info.getRegionName()); + if (!HRegion.deleteRegion(fs, dir, info.getRegionName())) { + LOG.warn("Deletion of " + info.getRegionName() + " failed"); + } + + BatchUpdate b = new BatchUpdate(); + long lockid = b.startUpdate(info.getRegionName()); + b.delete(lockid, COL_REGIONINFO); + b.delete(lockid, COL_SERVER); + b.delete(lockid, COL_STARTCODE); + server.batchUpdate(metaRegionName, System.currentTimeMillis(), b); result = true; } @@ -336,11 +348,30 @@ HMasterRegionInterface, Runnable { return result; } - protected boolean hasReferences(final Text regionName, + protected boolean hasReferences(final Text metaRegionName, + final HRegionInterface server, final Text regionName, final HRegionInfo split, final Text column) throws IOException { - boolean result = - HRegion.hasReferences(fs, fs.makeQualified(dir), split); + boolean result = false; + for (Text family: split.getTableDesc().families().keySet()) { + Path p = HStoreFile.getMapDir(fs.makeQualified(dir), + split.getRegionName(), HStoreKey.extractFamily(family)); + + // Look for reference files. + + Path [] ps = fs.listPaths(p, + new PathFilter () { + public boolean accept(Path path) { + return HStoreFile.isReference(path); + } + } + ); + + if (ps != null && ps.length > 0) { + result = true; + break; + } + } if (result) { return result; @@ -351,12 +382,11 @@ HMasterRegionInterface, Runnable { +" no longer has references to " + regionName.toString()); } - HTable t = new HTable(conf, this.tableName); - try { - HRegion.removeSplitFromMETA(t, regionName, column); - } finally { - t.close(); - } + BatchUpdate b = new BatchUpdate(); + long lockid = b.startUpdate(regionName); + b.delete(lockid, column); + server.batchUpdate(metaRegionName, System.currentTimeMillis(), b); + return result; } @@ -380,7 +410,7 @@ HMasterRegionInterface, Runnable { } HServerInfo storedInfo = null; - if (serverName != null) { + if (serverName.length() != 0) { Map regionsToKill = killList.get(serverName); if (regionsToKill != null && regionsToKill.containsKey(info.regionName)) { @@ -691,7 +721,7 @@ HMasterRegionInterface, Runnable { * We fill 'unassignedRecords' by scanning ROOT and META tables, learning the * set of all known valid regions. */ - Map unassignedRegions; + SortedMap unassignedRegions; /** * The 'assignAttempts' table maps from regions to a timestamp that indicates @@ -775,10 +805,12 @@ HMasterRegionInterface, Runnable { if (!fs.exists(rootRegionDir)) { LOG.info("bootstrap: creating ROOT and first META regions"); try { - HRegion root = HRegion.createHRegion(0L, HGlobals.rootTableDesc, - this.dir, this.conf); - HRegion meta = HRegion.createHRegion(1L, HGlobals.metaTableDesc, - this.dir, this.conf); + HRegion root = HRegion.createHRegion(HGlobals.rootRegionInfo, this.dir, + this.conf, null); + + HRegion meta = + HRegion.createHRegion(new HRegionInfo(1L, HGlobals.metaTableDesc, + null, null), this.dir, this.conf, null); // Add first region from the META table to the ROOT region. @@ -842,7 +874,7 @@ HMasterRegionInterface, Runnable { this.metaScannerThread = new Thread(metaScanner, "HMaster.metaScanner"); this.unassignedRegions = - Collections.synchronizedMap(new HashMap()); + Collections.synchronizedSortedMap(new TreeMap()); this.unassignedRegions.put(HGlobals.rootRegionInfo.regionName, HGlobals.rootRegionInfo); @@ -1372,7 +1404,12 @@ HMasterRegionInterface, Runnable { // A region has split. HRegionInfo newRegionA = incomingMsgs[++i].getRegionInfo(); + unassignedRegions.put(newRegionA.getRegionName(), newRegionA); + assignAttempts.put(newRegionA.getRegionName(), Long.valueOf(0L)); + HRegionInfo newRegionB = incomingMsgs[++i].getRegionInfo(); + unassignedRegions.put(newRegionB.getRegionName(), newRegionB); + assignAttempts.put(newRegionB.getRegionName(), Long.valueOf(0L)); LOG.info("region " + region.regionName + " split. New regions are: " + newRegionA.regionName + ", " + newRegionB.regionName); @@ -1381,14 +1418,6 @@ HMasterRegionInterface, Runnable { // A meta region has split. onlineMetaRegions.remove(region.getStartKey()); - onlineMetaRegions.put(newRegionA.getStartKey(), - new MetaRegion(info.getServerAddress(), - newRegionA.getRegionName(), newRegionA.getStartKey())); - - onlineMetaRegions.put(newRegionB.getStartKey(), - new MetaRegion(info.getServerAddress(), - newRegionB.getRegionName(), newRegionB.getStartKey())); - numberOfMetaRegions.incrementAndGet(); } break; @@ -1673,15 +1702,15 @@ HMasterRegionInterface, Runnable { // region had been on shutdown server (could be null because we // missed edits in hlog because hdfs does not do write-append). - String serverName = null; + String serverName; try { - serverName = Keying.bytesToString(results.get(COL_SERVER)); + serverName = Writables.bytesToString(results.get(COL_SERVER)); } catch(UnsupportedEncodingException e) { LOG.error("Server name", e); break; } - if (serverName != null && serverName.length() > 0 && + if (serverName.length() > 0 && deadServerName.compareTo(serverName) != 0) { // This isn't the server you're looking for - move along @@ -1776,11 +1805,8 @@ HMasterRegionInterface, Runnable { } else if (e.regionOffline) { e.info.offLine = true; - ByteArrayOutputStream byteValue = new ByteArrayOutputStream(); - DataOutputStream s = new DataOutputStream(byteValue); - e.info.write(s); server.put(regionName, clientId, lockid, COL_REGIONINFO, - byteValue.toByteArray()); + Writables.getBytes(e.info)); } server.delete(regionName, clientId, lockid, COL_SERVER); server.delete(regionName, clientId, lockid, COL_STARTCODE); @@ -2037,12 +2063,8 @@ HMasterRegionInterface, Runnable { } else if (!reassignRegion ) { regionInfo.offLine = true; - ByteArrayOutputStream byteValue = new ByteArrayOutputStream(); - DataOutputStream s = new DataOutputStream(byteValue); - regionInfo.write(s); - server.put(metaRegionName, clientId, lockid, COL_REGIONINFO, - byteValue.toByteArray()); + Writables.getBytes(regionInfo)); } server.delete(metaRegionName, clientId, lockid, COL_SERVER); server.delete(metaRegionName, clientId, lockid, COL_STARTCODE); @@ -2097,7 +2119,7 @@ HMasterRegionInterface, Runnable { private HServerAddress serverAddress; private byte [] startCode; - PendingOpenReport(HServerInfo info, HRegionInfo region) { + PendingOpenReport(HServerInfo info, HRegionInfo region) throws IOException { if (region.tableDesc.getName().equals(META_TABLE_NAME)) { // The region which just came on-line is a META region. // We need to look in the ROOT region for its information. @@ -2111,12 +2133,7 @@ HMasterRegionInterface, Runnable { } this.region = region; this.serverAddress = info.getServerAddress(); - try { - this.startCode = - String.valueOf(info.getStartCode()).getBytes(UTF8_ENCODING); - } catch(UnsupportedEncodingException e) { - LOG.error("Start code", e); - } + this.startCode = Writables.longToBytes(info.getStartCode()); } /** {@inheritDoc} */ @@ -2188,7 +2205,7 @@ HMasterRegionInterface, Runnable { region.getRegionName()); server.put(metaRegionName, clientId, lockid, COL_SERVER, - serverAddress.toString().getBytes(UTF8_ENCODING)); + Writables.stringToBytes(serverAddress.toString())); server.put(metaRegionName, clientId, lockid, COL_STARTCODE, startCode); @@ -2343,21 +2360,18 @@ HMasterRegionInterface, Runnable { // 2. Create the HRegion - HRegion region = HRegion.createHRegion(newRegion.regionId, - newRegion.getTableDesc(), this.dir, this.conf); + HRegion region = + HRegion.createHRegion(newRegion, this.dir, this.conf, null); // 3. Insert into meta HRegionInfo info = region.getRegionInfo(); Text regionName = region.getRegionName(); - ByteArrayOutputStream byteValue = new ByteArrayOutputStream(); - DataOutputStream s = new DataOutputStream(byteValue); - info.write(s); long clientId = rand.nextLong(); long lockid = r.startUpdate(metaRegionName, clientId, regionName); r.put(metaRegionName, clientId, lockid, COL_REGIONINFO, - byteValue.toByteArray()); + Writables.getBytes(info)); r.commit(metaRegionName, clientId, lockid, System.currentTimeMillis()); @@ -2468,7 +2482,6 @@ HMasterRegionInterface, Runnable { System.currentTimeMillis(), null); try { - DataInputBuffer inbuf = new DataInputBuffer(); while (true) { HRegionInfo info = new HRegionInfo(); String serverName = null; @@ -2486,14 +2499,13 @@ HMasterRegionInterface, Runnable { Text column = values[i].getKey().getColumn(); if (column.equals(COL_REGIONINFO)) { haveRegionInfo = true; - inbuf.reset(values[i].getData(), - values[i].getData().length); - info.readFields(inbuf); + info = (HRegionInfo) Writables.getWritable( + values[i].getData(), info); } else if (column.equals(COL_SERVER)) { try { serverName = - new String(values[i].getData(), UTF8_ENCODING); + Writables.bytesToString(values[i].getData()); } catch (UnsupportedEncodingException e) { assert(false); @@ -2501,8 +2513,7 @@ HMasterRegionInterface, Runnable { } else if (column.equals(COL_STARTCODE)) { try { - startCode = Long.valueOf(new String(values[i].getData(), - UTF8_ENCODING)).longValue(); + startCode = Writables.bytesToLong(values[i].getData()); } catch (UnsupportedEncodingException e) { assert(false); @@ -2568,7 +2579,7 @@ HMasterRegionInterface, Runnable { protected boolean isBeingServed(String serverName, long startCode) { boolean result = false; - if (serverName != null && startCode != -1L) { + if (serverName != null && serverName.length() > 0 && startCode != -1L) { HServerInfo s; synchronized (serversToServerInfo) { s = serversToServerInfo.get(serverName); @@ -2731,13 +2742,8 @@ HMasterRegionInterface, Runnable { final Text regionName, final HRegionInfo i) throws IOException { i.offLine = !online; - - ByteArrayOutputStream byteValue = new ByteArrayOutputStream(); - DataOutputStream s = new DataOutputStream(byteValue); - i.write(s); - server.put(regionName, clientId, lockid, COL_REGIONINFO, - byteValue.toByteArray()); + Writables.getBytes(i)); } } @@ -2811,15 +2817,12 @@ HMasterRegionInterface, Runnable { protected void updateRegionInfo(HRegionInterface server, Text regionName, HRegionInfo i) throws IOException { - ByteArrayOutputStream byteValue = new ByteArrayOutputStream(); - DataOutputStream s = new DataOutputStream(byteValue); - i.write(s); long lockid = -1L; long clientId = rand.nextLong(); try { lockid = server.startUpdate(regionName, clientId, i.regionName); server.put(regionName, clientId, lockid, COL_REGIONINFO, - byteValue.toByteArray()); + Writables.getBytes(i)); server.commit(regionName, clientId, lockid, System.currentTimeMillis()); lockid = -1L; diff --git a/src/java/org/apache/hadoop/hbase/HMerge.java b/src/java/org/apache/hadoop/hbase/HMerge.java index 90caf374d83..6a92e368d08 100644 --- a/src/java/org/apache/hadoop/hbase/HMerge.java +++ b/src/java/org/apache/hadoop/hbase/HMerge.java @@ -34,6 +34,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.Text; +import org.apache.hadoop.hbase.util.Writables; + /** * A non-instantiable class that has a static method capable of compacting * a table by merging adjacent regions that have grown too small. @@ -220,7 +222,9 @@ class HMerge implements HConstants { throw new NoSuchElementException("meta region entry missing " + COL_REGIONINFO); } - HRegionInfo region = new HRegionInfo(bytes); + HRegionInfo region = + (HRegionInfo) Writables.getWritable(bytes, new HRegionInfo()); + if(!region.offLine) { throw new TableNotDisabledException("region " + region.regionName + " is not disabled"); diff --git a/src/java/org/apache/hadoop/hbase/HRegion.java b/src/java/org/apache/hadoop/hbase/HRegion.java index 5fc331ae370..20dcb6aaeff 100644 --- a/src/java/org/apache/hadoop/hbase/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/HRegion.java @@ -19,10 +19,7 @@ */ package org.apache.hadoop.hbase; -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; import java.io.IOException; -import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.Map; import java.util.Random; @@ -36,7 +33,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.Text; @@ -81,8 +77,6 @@ public class HRegion implements HConstants { static final Log LOG = LogFactory.getLog(HRegion.class); final AtomicBoolean closed = new AtomicBoolean(false); private long noFlushCount = 0; - static final Text COL_SPLITA = new Text(COLUMN_FAMILY_STR + "splitA"); - static final Text COL_SPLITB = new Text(COLUMN_FAMILY_STR + "splitB"); /** * Merge two HRegions. They must be available on the current @@ -1667,26 +1661,6 @@ public class HRegion implements HConstants { } // Utility methods - - /** - * Convenience method creating new HRegions. - * Note, this method creates an {@link HLog} for the created region. It - * needs to be closed explicitly. Use {@link HRegion#getLog()} to get - * access. - * @param regionId ID to use - * @param tableDesc Descriptor - * @param rootDir Root directory of HBase instance - * @param conf - * @return New META region (ROOT or META). - * @throws IOException - */ - static HRegion createHRegion(final long regionId, - final HTableDescriptor tableDesc, final Path rootDir, - final Configuration conf) - throws IOException { - return createHRegion(new HRegionInfo(regionId, tableDesc, null, null), - rootDir, conf, null); - } /** * Convenience method creating new HRegions. Used by createTable and by the @@ -1727,218 +1701,10 @@ public class HRegion implements HConstants { throws IOException { // The row key is the region name long writeid = meta.startUpdate(r.getRegionName()); - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - DataOutputStream s = new DataOutputStream(bytes); - r.getRegionInfo().write(s); - meta.put(writeid, COL_REGIONINFO, bytes.toByteArray()); + meta.put(writeid, COL_REGIONINFO, Writables.getBytes(r.getRegionInfo())); meta.commit(writeid, System.currentTimeMillis()); } - static void addRegionToMETA(final Configuration conf, - final Text table, final HRegion region, - final HServerAddress serverAddress, - final long startCode) - throws IOException { - HTable t = new HTable(conf, table); - try { - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(bytes); - region.getRegionInfo().write(out); - long lockid = t.startUpdate(region.getRegionName()); - t.put(lockid, COL_REGIONINFO, bytes.toByteArray()); - t.put(lockid, COL_SERVER, - serverAddress.toString().getBytes(UTF8_ENCODING)); - t.put(lockid, COL_STARTCODE, - String.valueOf(startCode).getBytes(UTF8_ENCODING)); - t.commit(lockid); - if (LOG.isDebugEnabled()) { - LOG.info("Added region " + region.getRegionName() + " to table " + - table); - } - } finally { - t.close(); - } - } - - /** - * Delete region from META table. - * @param conf Configuration object - * @param table META table we are to delete region from. - * @param regionName Region to remove. - * @throws IOException - */ - static void removeRegionFromMETA(final Configuration conf, - final Text table, final Text regionName) - throws IOException { - HTable t = new HTable(conf, table); - try { - removeRegionFromMETA(t, regionName); - } finally { - t.close(); - } - } - - /** - * Delete region from META table. - * @param conf Configuration object - * @param table META table we are to delete region from. - * @param regionName Region to remove. - * @throws IOException - */ - static void removeRegionFromMETA(final HTable t, final Text regionName) - throws IOException { - long lockid = t.startBatchUpdate(regionName); - t.delete(lockid, COL_REGIONINFO); - t.delete(lockid, COL_SERVER); - t.delete(lockid, COL_STARTCODE); - t.commit(lockid); - if (LOG.isDebugEnabled()) { - LOG.debug("Removed " + regionName + " from table " + t.getTableName()); - } - } - - /** - * Delete split column from META table. - * @param t - * @param split - * @param regionName Region to remove. - * @throws IOException - */ - static void removeSplitFromMETA(final HTable t, final Text regionName, - final Text split) - throws IOException { - long lockid = t.startBatchUpdate(regionName); - t.delete(lockid, split); - t.commit(lockid); - if (LOG.isDebugEnabled()) { - LOG.debug("Removed " + split + " from " + regionName + - " from table " + t.getTableName()); - } - } - - /** - * region has split. Update META table. - * @param client Client to use running update. - * @param table META table we are to delete region from. - * @param regionName Region to remove. - * @throws IOException - */ - static void writeSplitToMETA(final Configuration conf, - final Text table, final Text regionName, final HRegionInfo splitA, - final HRegionInfo splitB) - throws IOException { - HTable t = new HTable(conf, table); - try { - HRegionInfo hri = getRegionInfo(t.get(regionName, COL_REGIONINFO)); - hri.offLine = true; - hri.split = true; - ByteArrayOutputStream bytes = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(bytes); - hri.write(dos); - dos.close(); - long lockid = t.startBatchUpdate(regionName); - t.put(lockid, COL_REGIONINFO, bytes.toByteArray()); - t.put(lockid, COL_SPLITA, Writables.getBytes(splitA)); - t.put(lockid, COL_SPLITB, Writables.getBytes(splitB)); - t.commitBatch(lockid); - if (LOG.isDebugEnabled()) { - LOG.debug("Updated " + regionName + " in table " + table + - " on its being split"); - } - } finally { - t.close(); - } - } - - /** - * @param whichSplit COL_SPLITA or COL_SPLITB? - * @param data Map of META row labelled column data. - * @return HRegionInfo or null if not found. - * @throws IOException - */ - static HRegionInfo getSplit(final TreeMap data, - final Text whichSplit) - throws IOException { - if (!(whichSplit.equals(COL_SPLITA) || whichSplit.equals(COL_SPLITB))) { - throw new IOException("Illegal Argument: " + whichSplit); - } - byte [] bytes = data.get(whichSplit); - if (bytes == null || bytes.length == 0) { - return null; - } - return (HRegionInfo) Writables.getWritable(bytes, new HRegionInfo()); - } - - /** - * @param data Map of META row labelled column data. - * @return An HRegionInfo instance. - * @throws IOException - */ - static HRegionInfo getRegionInfo(final TreeMap data) - throws IOException { - return getRegionInfo(data.get(COL_REGIONINFO)); - } - - /** - * @param bytes Bytes of a HRegionInfo. - * @return An HRegionInfo instance. - * @throws IOException - */ - static HRegionInfo getRegionInfo(final byte[] bytes) throws IOException { - if (bytes == null || bytes.length == 0) { - throw new IOException("no value for " + COL_REGIONINFO); - } - return (HRegionInfo)Writables.getWritable(bytes, new HRegionInfo()); - } - - /** - * @param data Map of META row labelled column data. - * @return Server - */ - static String getServerName(final TreeMap data) { - byte [] bytes = data.get(COL_SERVER); - String name = null; - try { - name = (bytes != null && bytes.length != 0) ? - new String(bytes, UTF8_ENCODING): null; - - } catch(UnsupportedEncodingException e) { - assert(false); - } - return (name != null)? name.trim(): name; - } - - /** - * @param data Map of META row labelled column data. - * @return Start code. - */ - static long getStartCode(final TreeMap data) { - long startCode = -1L; - byte [] bytes = data.get(COL_STARTCODE); - if(bytes != null && bytes.length != 0) { - try { - startCode = Long.parseLong(new String(bytes, UTF8_ENCODING).trim()); - } catch(NumberFormatException e) { - LOG.error("Failed getting " + COL_STARTCODE, e); - } catch(UnsupportedEncodingException e) { - LOG.error("Failed getting " + COL_STARTCODE, e); - } - } - return startCode; - } - - /** - * Computes the Path of the HRegion - * - * @param dir parent directory - * @param regionName name of the region - * @return Path of HRegion directory - */ - public static Path getRegionDir(final Path dir, final Text regionName) { - return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName)); - } - - /** * Deletes all the files for a HRegion * @@ -1953,32 +1719,15 @@ public class HRegion implements HConstants { Path p = HRegion.getRegionDir(fs.makeQualified(baseDirectory), regionName); return fs.delete(p); } - + /** - * Look for HStoreFile references in passed region. - * @param fs - * @param baseDirectory - * @param hri - * @return True if we found references. - * @throws IOException + * Computes the Path of the HRegion + * + * @param dir parent directory + * @param regionName name of the region + * @return Path of HRegion directory */ - static boolean hasReferences(final FileSystem fs, final Path baseDirectory, - final HRegionInfo hri) - throws IOException { - boolean result = false; - for (Text family: hri.getTableDesc().families().keySet()) { - Path p = HStoreFile.getMapDir(baseDirectory, hri.getRegionName(), - HStoreKey.extractFamily(family)); - // Look for reference files. - Path [] ps = fs.listPaths(p, new PathFilter () { - public boolean accept(Path path) { - return HStoreFile.isReference(path); - }}); - if (ps != null && ps.length > 0) { - result = true; - break; - } - } - return result; + public static Path getRegionDir(final Path dir, final Text regionName) { + return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName)); } } \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/HRegionInfo.java b/src/java/org/apache/hadoop/hbase/HRegionInfo.java index 3021dc3d630..25fd3847920 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionInfo.java +++ b/src/java/org/apache/hadoop/hbase/HRegionInfo.java @@ -19,9 +19,7 @@ */ package org.apache.hadoop.hbase; -import java.io.ByteArrayInputStream; import java.io.DataInput; -import java.io.DataInputStream; import java.io.DataOutput; import java.io.IOException; @@ -80,17 +78,6 @@ public class HRegionInfo implements WritableComparable { this.split = false; } - /** - * Construct a HRegionInfo object from byte array - * - * @param serializedBytes - * @throws IOException - */ - public HRegionInfo(final byte [] serializedBytes) throws IOException { - this(); - readFields(new DataInputStream(new ByteArrayInputStream(serializedBytes))); - } - /** * Construct HRegionInfo with explicit parameters * diff --git a/src/java/org/apache/hadoop/hbase/HRegionServer.java b/src/java/org/apache/hadoop/hbase/HRegionServer.java index 067f4cc2212..dafeb038acb 100644 --- a/src/java/org/apache/hadoop/hbase/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/HRegionServer.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.filter.RowFilterInterface; import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.BatchOperation; import org.apache.hadoop.hbase.io.KeyedData; +import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RPC; @@ -79,7 +80,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { // Go down hard. Used debugging and in unit tests. protected volatile boolean abortRequested; - private final Path rootDir; + final Path rootDir; protected final HServerInfo serverInfo; protected final Configuration conf; private final Random rand; @@ -103,6 +104,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { /** Runs periodically to determine if regions need to be compacted or split */ class SplitOrCompactChecker implements Runnable, RegionUnavailableListener { + private HTable root = null; + private HTable meta = null; /** * {@inheritDoc} @@ -199,65 +202,67 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { // When a region is split, the META table needs to updated if we're // splitting a 'normal' region, and the ROOT table needs to be // updated if we are splitting a META region. - final Text tableToUpdate = - region.getRegionInfo().tableDesc.getName().equals(META_TABLE_NAME)? - ROOT_TABLE_NAME : META_TABLE_NAME; - LOG.info("Updating " + tableToUpdate + " with region split info"); + + HTable t = null; + if (region.getRegionInfo().tableDesc.getName().equals(META_TABLE_NAME)) { + // We need to update the root region + + if (root == null) { + root = new HTable(conf, ROOT_TABLE_NAME); + } + t = root; + + } else { + // For normal regions we need to update the meta region + + if (meta == null) { + meta = new HTable(conf, META_TABLE_NAME); + } + t = meta; + } + LOG.info("Updating " + t.getTableName() + " with region split info"); // Remove old region from META - for (int tries = 0; tries < numRetries; tries++) { - try { - HRegion.writeSplitToMETA(conf, tableToUpdate, - region.getRegionName(), newRegions[0].getRegionInfo(), - newRegions[1].getRegionInfo()); - break; - } catch (IOException e) { - if(tries == numRetries - 1) { - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - } - } + // NOTE: there is no need for retry logic here. HTable does it for us. + + long lockid = t.startBatchUpdate(oldRegionInfo.getRegionName()); + oldRegionInfo.offLine = true; + oldRegionInfo.split = true; + t.put(lockid, COL_REGIONINFO, Writables.getBytes(oldRegionInfo)); + + t.put(lockid, COL_SPLITA, Writables.getBytes( + newRegions[0].getRegionInfo())); + + t.put(lockid, COL_SPLITB, Writables.getBytes( + newRegions[1].getRegionInfo())); + t.commitBatch(lockid); // Add new regions to META + for (int i = 0; i < newRegions.length; i++) { - for (int tries = 0; tries < numRetries; tries ++) { - try { - HRegion.addRegionToMETA(conf, tableToUpdate, newRegions[i], - serverInfo.getServerAddress(), serverInfo.getStartCode()); - break; - } catch(IOException e) { - if(tries == numRetries - 1) { - if(e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - throw e; - } - } - } + lockid = t.startBatchUpdate(newRegions[i].getRegionName()); + + t.put(lockid, COL_REGIONINFO, Writables.getBytes( + newRegions[i].getRegionInfo())); + + t.commitBatch(lockid); } // Now tell the master about the new regions + if (LOG.isDebugEnabled()) { LOG.debug("Reporting region split to master"); } reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(), newRegions[1].getRegionInfo()); + LOG.info("region split, META update, and report to master all" + " successful. Old region=" + oldRegionInfo.getRegionName() + ", new regions: " + newRegions[0].getRegionName() + ", " + newRegions[1].getRegionName()); - // Finally, start serving the new regions - lock.writeLock().lock(); - try { - onlineRegions.put(newRegions[0].getRegionName(), newRegions[0]); - onlineRegions.put(newRegions[1].getRegionName(), newRegions[1]); - } finally { - lock.writeLock().unlock(); - } + // Do not serve the new regions. Let the Master assign them. + } } diff --git a/src/java/org/apache/hadoop/hbase/HRegiondirReader.java b/src/java/org/apache/hadoop/hbase/HRegiondirReader.java index 684cbe22039..b7213551868 100644 --- a/src/java/org/apache/hadoop/hbase/HRegiondirReader.java +++ b/src/java/org/apache/hadoop/hbase/HRegiondirReader.java @@ -34,6 +34,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.Text; +import org.apache.hadoop.hbase.util.Writables; + /** * A standalone HRegion directory reader. Currently reads content on * file system only. @@ -194,7 +196,7 @@ class HRegiondirReader { byte [] colvalue = es.getValue(); Object value = null; if (colname.toString().equals("info:regioninfo")) { - value = new HRegionInfo(colvalue); + value = Writables.getWritable(colvalue, new HRegionInfo()); } else { value = new String(colvalue, HConstants.UTF8_ENCODING); } diff --git a/src/java/org/apache/hadoop/hbase/HStoreFile.java b/src/java/org/apache/hadoop/hbase/HStoreFile.java index 7f35e7d0068..37e325b2b66 100644 --- a/src/java/org/apache/hadoop/hbase/HStoreFile.java +++ b/src/java/org/apache/hadoop/hbase/HStoreFile.java @@ -380,7 +380,7 @@ public class HStoreFile implements HConstants, WritableComparable { // Look first at info files. If a reference, these contain info we need // to create the HStoreFile. Path infodir = HStoreFile.getInfoDir(dir, regionName, colFamily); - Path infofiles[] = fs.listPaths(infodir); + Path infofiles[] = fs.listPaths(new Path[] {infodir}); Vector results = new Vector(infofiles.length); Vector mapfiles = new Vector(infofiles.length); for (int i = 0; i < infofiles.length; i++) { @@ -411,7 +411,7 @@ public class HStoreFile implements HConstants, WritableComparable { Path mapdir = HStoreFile.getMapDir(dir, regionName, colFamily); // List paths by experience returns fully qualified names -- at least when // running on a mini hdfs cluster. - Path datfiles[] = fs.listPaths(mapdir); + Path datfiles[] = fs.listPaths(new Path[] {mapdir}); for (int i = 0; i < datfiles.length; i++) { // If does not have sympathetic info file, delete. if (!mapfiles.contains(fs.makeQualified(datfiles[i]))) { diff --git a/src/java/org/apache/hadoop/hbase/HStoreKey.java b/src/java/org/apache/hadoop/hbase/HStoreKey.java index bd32cc94620..2cb94ebbfe7 100644 --- a/src/java/org/apache/hadoop/hbase/HStoreKey.java +++ b/src/java/org/apache/hadoop/hbase/HStoreKey.java @@ -91,6 +91,19 @@ public class HStoreKey implements WritableComparable { return offset; } + /** + * Returns row and column bytes out of an HStoreKey. + * @param hsk Store key. + * @return byte array encoding of HStoreKey + * @throws UnsupportedEncodingException + */ + public static byte[] getBytes(final HStoreKey hsk) + throws UnsupportedEncodingException { + StringBuilder s = new StringBuilder(hsk.getRow().toString()); + s.append(hsk.getColumn().toString()); + return s.toString().getBytes(HConstants.UTF8_ENCODING); + } + Text row; Text column; long timestamp; diff --git a/src/java/org/apache/hadoop/hbase/HTable.java b/src/java/org/apache/hadoop/hbase/HTable.java index 5a748dbf432..af875648cb2 100644 --- a/src/java/org/apache/hadoop/hbase/HTable.java +++ b/src/java/org/apache/hadoop/hbase/HTable.java @@ -886,7 +886,17 @@ public class HTable implements HConstants { public void close() throws IOException { checkClosed(); if (this.scannerId != -1L) { - this.server.close(this.scannerId); + try { + this.server.close(this.scannerId); + + } catch (IOException e) { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + if (!(e instanceof NotServingRegionException)) { + throw e; + } + } this.scannerId = -1L; } this.server = null; diff --git a/src/java/org/apache/hadoop/hbase/Leases.java b/src/java/org/apache/hadoop/hbase/Leases.java index e4d025e1068..d356c79ee7f 100644 --- a/src/java/org/apache/hadoop/hbase/Leases.java +++ b/src/java/org/apache/hadoop/hbase/Leases.java @@ -94,9 +94,8 @@ public class Leases { * without any cancellation calls. */ public void close() { - if(LOG.isDebugEnabled()) { - LOG.debug("closing leases"); - } + LOG.info("closing leases"); + this.running = false; try { this.leaseMonitorThread.interrupt(); @@ -110,9 +109,7 @@ public class Leases { sortedLeases.clear(); } } - if(LOG.isDebugEnabled()) { - LOG.debug("leases closed"); - } + LOG.info("leases closed"); } /* A client obtains a lease... */ @@ -139,9 +136,9 @@ public class Leases { sortedLeases.add(lease); } } - if (LOG.isDebugEnabled()) { - LOG.debug("Created lease " + name); - } +// if (LOG.isDebugEnabled()) { +// LOG.debug("Created lease " + name); +// } } /* A client renews a lease... */ @@ -170,9 +167,9 @@ public class Leases { sortedLeases.add(lease); } } - if (LOG.isDebugEnabled()) { - LOG.debug("Renewed lease " + name); - } +// if (LOG.isDebugEnabled()) { +// LOG.debug("Renewed lease " + name); +// } } /** @@ -196,9 +193,9 @@ public class Leases { leases.remove(name); } } - if (LOG.isDebugEnabled()) { - LOG.debug("Cancel lease " + name); - } +// if (LOG.isDebugEnabled()) { +// LOG.debug("Cancel lease " + name); +// } } /** LeaseMonitor is a thread that expires Leases that go on too long. */ @@ -327,9 +324,8 @@ public class Leases { } void expired() { - if (LOG.isDebugEnabled()) { - LOG.debug("Lease expired " + getLeaseName()); - } + LOG.info("Lease expired " + getLeaseName()); + listener.leaseExpired(); } diff --git a/src/java/org/apache/hadoop/hbase/util/Keying.java b/src/java/org/apache/hadoop/hbase/util/Keying.java index e19a64f64c2..a26f141c27e 100644 --- a/src/java/org/apache/hadoop/hbase/util/Keying.java +++ b/src/java/org/apache/hadoop/hbase/util/Keying.java @@ -19,18 +19,10 @@ */ package org.apache.hadoop.hbase.util; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.nio.ByteBuffer; import java.util.StringTokenizer; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HStoreKey; - /** * Utility creating hbase friendly keys. * Use fabricating row names or column qualifiers. @@ -119,61 +111,4 @@ public class Keying { } return sb.toString(); } - - /** - * @param i - * @return i as byte array. - */ - public static byte[] intToBytes(final int i){ - ByteBuffer buffer = ByteBuffer.allocate(Integer.SIZE); - buffer.putInt(i); - return buffer.array(); - } - - /** - * @param l - * @return i as byte array. - */ - public static byte[] longToBytes(final long l){ - ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE); - buffer.putLong(l); - return buffer.array(); - } - - /** - * Returns row and column bytes out of an HStoreKey. - * @param hsk Store key. - * @throws UnsupportedEncodingException - */ - public static byte[] getBytes(final HStoreKey hsk) - throws UnsupportedEncodingException { - StringBuilder s = new StringBuilder(hsk.getRow().toString()); - s.append(hsk.getColumn().toString()); - return s.toString().getBytes(HConstants.UTF8_ENCODING); - } - - /** - * @param bytes - * @return String made of the bytes or null if bytes are null. - * @throws UnsupportedEncodingException - */ - public static String bytesToString(final byte [] bytes) - throws UnsupportedEncodingException { - if(bytes == null) { - return null; - } - return new String(bytes, HConstants.UTF8_ENCODING); - } - - public static long bytesToLong(final byte [] bytes) throws IOException { - long result = -1; - DataInputStream dis = null; - try { - dis = new DataInputStream(new ByteArrayInputStream(bytes)); - result = dis.readLong(); - } finally { - dis.close(); - } - return result; - } } \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/util/Writables.java b/src/java/org/apache/hadoop/hbase/util/Writables.java index 320c694e2bd..de1a2346374 100644 --- a/src/java/org/apache/hadoop/hbase/util/Writables.java +++ b/src/java/org/apache/hadoop/hbase/util/Writables.java @@ -22,11 +22,17 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.UnsupportedEncodingException; import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.hbase.HConstants; +/** + * Utility class with methods for manipulating Writable objects + */ public class Writables { /** * @param w @@ -36,6 +42,9 @@ public class Writables { * @see #getWritable(byte[], Writable) */ public static byte [] getBytes(final Writable w) throws IOException { + if (w == null) { + throw new IllegalArgumentException("Writable cannot be null"); + } ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(byteStream); try { @@ -64,7 +73,11 @@ public class Writables { public static Writable getWritable(final byte [] bytes, final Writable w) throws IOException { if (bytes == null || bytes.length == 0) { - throw new IOException("Con't build a writable with empty bytes array"); + throw new IllegalArgumentException( + "Con't build a writable with empty bytes array"); + } + if (w == null) { + throw new IllegalArgumentException("Writable cannot be null"); } DataInputBuffer in = new DataInputBuffer(); try { @@ -85,14 +98,67 @@ public class Writables { */ public static Writable copyWritable(final Writable src, final Writable tgt) throws IOException { + if (src == null || tgt == null) { + throw new IllegalArgumentException("Writables cannot be null"); + } byte [] bytes = getBytes(src); - DataInputStream dis = null; + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes)); try { - dis = new DataInputStream(new ByteArrayInputStream(bytes)); tgt.readFields(dis); } finally { dis.close(); } return tgt; } + + /** + * Convert a long value to a byte array + * @param val + * @return the byte array + * @throws IOException + */ + public static byte[] longToBytes(long val) throws IOException { + return getBytes(new LongWritable(val)); + } + + /** + * Converts a byte array to a long value + * @param bytes + * @return the long value + * @throws IOException + */ + public static long bytesToLong(byte[] bytes) throws IOException { + if (bytes == null || bytes.length == 0) { + return -1L; + } + return ((LongWritable) getWritable(bytes, new LongWritable())).get(); + } + + /** + * Converts a string to a byte array in a consistent manner. + * @param s + * @return the byte array + * @throws UnsupportedEncodingException + */ + public static byte[] stringToBytes(String s) + throws UnsupportedEncodingException { + if (s == null) { + throw new IllegalArgumentException("string cannot be null"); + } + return s.getBytes(HConstants.UTF8_ENCODING); + } + + /** + * Converts a byte array to a string in a consistent manner. + * @param bytes + * @return the string + * @throws UnsupportedEncodingException + */ + public static String bytesToString(byte[] bytes) + throws UnsupportedEncodingException { + if (bytes == null || bytes.length == 0) { + return ""; + } + return new String(bytes, HConstants.UTF8_ENCODING); + } } diff --git a/src/test/org/apache/hadoop/hbase/TestGet.java b/src/test/org/apache/hadoop/hbase/TestGet.java index a5848ff687c..ae7b16d7bdc 100644 --- a/src/test/org/apache/hadoop/hbase/TestGet.java +++ b/src/test/org/apache/hadoop/hbase/TestGet.java @@ -32,6 +32,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; +import org.apache.hadoop.hbase.util.Writables; + /** Test case for get */ public class TestGet extends HBaseTestCase { private static final Log LOG = LogFactory.getLog(TestGet.class.getName()); @@ -59,8 +61,7 @@ public class TestGet extends HBaseTestCase { for(Iterator i = values.keySet().iterator(); i.hasNext(); ) { Text column = i.next(); if (column.equals(HConstants.COL_SERVER)) { - byte [] val = values.get(column); - String server = new String(val, HConstants.UTF8_ENCODING); + String server = Writables.bytesToString(values.get(column)); assertEquals(expectedServer, server); LOG.info(server); } @@ -106,20 +107,17 @@ public class TestGet extends HBaseTestCase { bytes.reset(); HGlobals.rootRegionInfo.write(s); - r.put(lockid, HConstants.COL_REGIONINFO, bytes.toByteArray()); + r.put(lockid, HConstants.COL_REGIONINFO, + Writables.getBytes(HGlobals.rootRegionInfo)); r.commit(lockid, System.currentTimeMillis()); lockid = r.startUpdate(ROW_KEY); r.put(lockid, HConstants.COL_SERVER, - new HServerAddress(SERVER_ADDRESS).toString(). - getBytes(HConstants.UTF8_ENCODING) - ); + Writables.stringToBytes(new HServerAddress(SERVER_ADDRESS).toString())); - r.put(lockid, HConstants.COL_STARTCODE, - String.valueOf(lockid).getBytes(HConstants.UTF8_ENCODING) - ); + r.put(lockid, HConstants.COL_STARTCODE, Writables.longToBytes(lockid)); r.put(lockid, new Text(HConstants.COLUMN_FAMILY + "region"), "region".getBytes(HConstants.UTF8_ENCODING)); @@ -150,8 +148,7 @@ public class TestGet extends HBaseTestCase { String otherServerName = "bar.foo.com:4321"; r.put(lockid, HConstants.COL_SERVER, - new HServerAddress(otherServerName).toString(). - getBytes(HConstants.UTF8_ENCODING)); + Writables.stringToBytes(new HServerAddress(otherServerName).toString())); r.put(lockid, new Text(HConstants.COLUMN_FAMILY + "junk"), "junk".getBytes()); diff --git a/src/test/org/apache/hadoop/hbase/TestScanner.java b/src/test/org/apache/hadoop/hbase/TestScanner.java index 45e6eb4e863..dde52a4a3de 100644 --- a/src/test/org/apache/hadoop/hbase/TestScanner.java +++ b/src/test/org/apache/hadoop/hbase/TestScanner.java @@ -28,9 +28,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.dfs.MiniDFSCluster; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.Text; +import org.apache.hadoop.hbase.util.Writables; + /** * Test of a long-lived scanner validating as we go. */ @@ -52,13 +53,11 @@ public class TestScanner extends HBaseTestCase { private static final long START_CODE = Long.MAX_VALUE; private HRegion region; - private DataInputBuffer in = new DataInputBuffer(); /** Compare the HRegionInfo we read from HBase to what we stored */ private void validateRegionInfo(byte [] regionBytes) throws IOException { - in.reset(regionBytes, regionBytes.length); - HRegionInfo info = new HRegionInfo(); - info.readFields(in); + HRegionInfo info = + (HRegionInfo) Writables.getWritable(regionBytes, new HRegionInfo()); assertEquals(REGION_INFO.regionId, info.regionId); assertEquals(0, info.startKey.getLength()); @@ -94,8 +93,7 @@ public class TestScanner extends HBaseTestCase { val = results.get(HConstants.COL_STARTCODE); assertNotNull(val); assertFalse(val.length == 0); - long startCode = - Long.valueOf(new String(val, HConstants.UTF8_ENCODING)); + long startCode = Writables.bytesToLong(val); assertEquals(START_CODE, startCode); } @@ -104,7 +102,7 @@ public class TestScanner extends HBaseTestCase { val = results.get(HConstants.COL_SERVER); assertNotNull(val); assertFalse(val.length == 0); - String server = new String(val, HConstants.UTF8_ENCODING); + String server = Writables.bytesToString(val); assertEquals(0, server.compareTo(serverName)); } results.clear(); @@ -187,10 +185,10 @@ public class TestScanner extends HBaseTestCase { lockid = region.startUpdate(ROW_KEY); region.put(lockid, HConstants.COL_SERVER, - address.toString().getBytes(HConstants.UTF8_ENCODING)); + Writables.stringToBytes(address.toString())); - region.put(lockid, HConstants.COL_STARTCODE, - String.valueOf(START_CODE).getBytes(HConstants.UTF8_ENCODING)); + region.put(lockid, HConstants.COL_STARTCODE, + Writables.longToBytes(START_CODE)); region.commit(lockid, System.currentTimeMillis()); @@ -227,7 +225,7 @@ public class TestScanner extends HBaseTestCase { lockid = region.startUpdate(ROW_KEY); region.put(lockid, HConstants.COL_SERVER, - address.toString().getBytes(HConstants.UTF8_ENCODING)); + Writables.stringToBytes(address.toString())); region.commit(lockid, System.currentTimeMillis()); diff --git a/src/test/org/apache/hadoop/hbase/TestScanner2.java b/src/test/org/apache/hadoop/hbase/TestScanner2.java index 4eb02c0a065..21651a063d1 100644 --- a/src/test/org/apache/hadoop/hbase/TestScanner2.java +++ b/src/test/org/apache/hadoop/hbase/TestScanner2.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.filter.RowFilterSet; import org.apache.hadoop.hbase.filter.StopRowFilter; import org.apache.hadoop.hbase.filter.WhileMatchRowFilter; import org.apache.hadoop.hbase.io.KeyedData; +import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.Text; /** @@ -57,7 +58,7 @@ public class TestScanner2 extends HBaseClusterTestCase { final char LAST_COLKEY = '3'; final byte[] GOOD_BYTES = "goodstuff".getBytes(); final byte[] BAD_BYTES = "badstuff".getBytes(); - + /** * Test the scanner's handling of various filters. * @@ -170,7 +171,7 @@ public class TestScanner2 extends HBaseClusterTestCase { region.regionName.toString().startsWith(getName())); // Now do what happens at split time; remove old region and then add two // new ones in its place. - HRegion.removeRegionFromMETA(conf, HConstants.META_TABLE_NAME, + removeRegionFromMETA(new HTable(conf, HConstants.META_TABLE_NAME), region.regionName); HTableDescriptor desc = region.tableDesc; Path homedir = new Path(getName()); @@ -183,7 +184,7 @@ public class TestScanner2 extends HBaseClusterTestCase { homedir, this.conf, null)); try { for (HRegion r : newRegions) { - HRegion.addRegionToMETA(conf, HConstants.META_TABLE_NAME, r, + addRegionToMETA(conf, HConstants.META_TABLE_NAME, r, this.cluster.getHMasterAddress(), -1L); } regions = scan(conf, HConstants.META_TABLE_NAME); @@ -219,9 +220,15 @@ public class TestScanner2 extends HBaseClusterTestCase { results.put(values[i].getKey().getColumn(), values[i].getData()); } - HRegionInfo info = HRegion.getRegionInfo(results); - String serverName = HRegion.getServerName(results); - long startCode = HRegion.getStartCode(results); + HRegionInfo info = (HRegionInfo) Writables.getWritable( + results.get(HConstants.COL_REGIONINFO), new HRegionInfo()); + + byte[] bytes = results.get(HConstants.COL_SERVER); + String serverName = Writables.bytesToString(bytes); + + long startCode = + Writables.bytesToLong(results.get(HConstants.COL_STARTCODE)); + LOG.info(Thread.currentThread().getName() + " scanner: " + Long.valueOf(scannerId) + ": regioninfo: {" + info.toString() + "}, server: " + serverName + ", startCode: " + startCode); @@ -240,4 +247,49 @@ public class TestScanner2 extends HBaseClusterTestCase { } return regions; } + + private void addRegionToMETA(final Configuration conf, + final Text table, final HRegion region, + final HServerAddress serverAddress, + final long startCode) + throws IOException { + HTable t = new HTable(conf, table); + try { + long lockid = t.startUpdate(region.getRegionName()); + t.put(lockid, HConstants.COL_REGIONINFO, Writables.getBytes(region.getRegionInfo())); + t.put(lockid, HConstants.COL_SERVER, + Writables.stringToBytes(serverAddress.toString())); + t.put(lockid, HConstants.COL_STARTCODE, Writables.longToBytes(startCode)); + t.commit(lockid); + if (LOG.isDebugEnabled()) { + LOG.info("Added region " + region.getRegionName() + " to table " + + table); + } + } finally { + t.close(); + } + } + + /* + * Delete region from META table. + * @param conf Configuration object + * @param table META table we are to delete region from. + * @param regionName Region to remove. + * @throws IOException + */ + private void removeRegionFromMETA(final HTable t, final Text regionName) + throws IOException { + try { + long lockid = t.startBatchUpdate(regionName); + t.delete(lockid, HConstants.COL_REGIONINFO); + t.delete(lockid, HConstants.COL_SERVER); + t.delete(lockid, HConstants.COL_STARTCODE); + t.commit(lockid); + if (LOG.isDebugEnabled()) { + LOG.debug("Removed " + regionName + " from table " + t.getTableName()); + } + } finally { + t.close(); + } + } } \ No newline at end of file diff --git a/src/test/org/apache/hadoop/hbase/TestSplit.java b/src/test/org/apache/hadoop/hbase/TestSplit.java index 8d9b21083f1..fa3856a1dbe 100644 --- a/src/test/org/apache/hadoop/hbase/TestSplit.java +++ b/src/test/org/apache/hadoop/hbase/TestSplit.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hbase; import java.io.IOException; import java.util.ConcurrentModificationException; -import java.util.List; import java.util.SortedMap; import java.util.TreeMap; @@ -31,6 +30,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.Text; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; /** * {@Link TestHRegion} does a split but this TestCase adds testing of fast @@ -45,7 +46,13 @@ public class TestSplit extends HBaseTestCase { private FileSystem fs = null; private static final char FIRST_CHAR = 'a'; private static final char LAST_CHAR = 'z'; - + + /** constructor */ + public TestSplit() { + Logger.getRootLogger().setLevel(Level.WARN); + Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG); + } + /** {@inheritDoc} */ @Override public void setUp() throws Exception { @@ -63,12 +70,14 @@ public class TestSplit extends HBaseTestCase { /** {@inheritDoc} */ @Override public void tearDown() throws Exception { - try { - if (this.fs.exists(testDir)) { - this.fs.delete(testDir); + if (fs != null) { + try { + if (this.fs.exists(testDir)) { + this.fs.delete(testDir); + } + } catch (Exception e) { + e.printStackTrace(); } - } catch (Exception e) { - e.printStackTrace(); } super.tearDown(); } @@ -175,13 +184,13 @@ public class TestSplit extends HBaseTestCase { * @throws Exception */ public void testSplitRegionIsDeleted() throws Exception { - final int timeout = 60; + final int retries = 10; + this.testDir = null; + this.fs = null; // Start up a hbase cluster - this.conf.set(HConstants.HBASE_DIR, this.testDir.toString()); - MiniHBaseCluster.MasterThread masterThread = - MiniHBaseCluster.startMaster(this.conf); - List regionServerThreads = - MiniHBaseCluster.startRegionServers(this.conf, 1); + MiniHBaseCluster cluster = new MiniHBaseCluster(conf, 1); + Path testDir = cluster.regionThreads.get(0).getRegionServer().rootDir; + FileSystem fs = cluster.getDFSCluster().getFileSystem(); HTable meta = null; HTable t = null; try { @@ -197,17 +206,15 @@ public class TestSplit extends HBaseTestCase { // region instance and bring on a split. HRegionInfo hri = t.getRegionLocation(HConstants.EMPTY_START_ROW).getRegionInfo(); - HRegion r = null; - synchronized(regionServerThreads) { - r = regionServerThreads.get(0).getRegionServer().onlineRegions. - get(hri.getRegionName()); - } + HRegion r = + cluster.regionThreads.get(0).getRegionServer().onlineRegions.get( + hri.getRegionName()); // Flush will provoke a split next time the split-checker thread runs. r.flushcache(false); // Now, wait until split makes it into the meta table. - for (int i = 0; i < timeout && - (count(meta, HConstants.COLUMN_FAMILY_STR) <= count); i++) { - Thread.sleep(1000); + for (int i = 0; i < retries && + (count(meta, HConstants.COLUMN_FAMILY_STR) <= count); i++) { + Thread.sleep(5000); } int oldCount = count; count = count(meta, HConstants.COLUMN_FAMILY_STR); @@ -217,47 +224,72 @@ public class TestSplit extends HBaseTestCase { HRegionInfo parent = getSplitParent(meta); assertTrue(parent.isOffline()); Path parentDir = - HRegion.getRegionDir(this.testDir, parent.getRegionName()); - assertTrue(this.fs.exists(parentDir)); + HRegion.getRegionDir(testDir, parent.getRegionName()); + assertTrue(fs.exists(parentDir)); LOG.info("Split happened and parent " + parent.getRegionName() + " is " + - "offline"); + "offline"); + for (int i = 0; i < retries; i++) { + // Now open a scanner on the table. This will force HTable to recalibrate + // and in doing so, will force us to wait until the new child regions + // come on-line (since they are no longer automatically served by the + // HRegionServer that was serving the parent. In this test they will + // end up on the same server (since there is only one), but we have to + // wait until the master assigns them. + try { + HScannerInterface s = + t.obtainScanner(new Text[] {new Text(COLFAMILY_NAME3)}, + HConstants.EMPTY_START_ROW); + try { + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + s.next(key, results); + break; + + } finally { + s.close(); + } + } catch (NotServingRegionException x) { + Thread.sleep(5000); + } + } // Now, force a compaction. This will rewrite references and make it // so the parent region becomes deletable. LOG.info("Starting compaction"); - synchronized(regionServerThreads) { - for (MiniHBaseCluster.RegionServerThread thread: regionServerThreads) { - SortedMap regions = - thread.getRegionServer().onlineRegions; - // Retry if ConcurrentModification... alternative of sync'ing is not - // worth it for sake of unit test. - for (int i = 0; i < 10; i++) { - try { - for (HRegion online: regions.values()) { - if (online.getRegionName().toString().startsWith(getName())) { - online.compactStores(); - } + for (MiniHBaseCluster.RegionServerThread thread: cluster.regionThreads) { + SortedMap regions = + thread.getRegionServer().onlineRegions; + // Retry if ConcurrentModification... alternative of sync'ing is not + // worth it for sake of unit test. + for (int i = 0; i < 10; i++) { + try { + for (HRegion online: regions.values()) { + if (online.getRegionName().toString().startsWith(getName())) { + online.compactStores(); } - break; - } catch (ConcurrentModificationException e) { - LOG.warn("Retrying because ..." + e.toString() + " -- one or " + - "two should be fine"); - continue; } + break; + } catch (ConcurrentModificationException e) { + LOG.warn("Retrying because ..." + e.toString() + " -- one or " + + "two should be fine"); + continue; } } } - + // Now wait until parent disappears. LOG.info("Waiting on parent " + parent.getRegionName() + - " to disappear"); - for (int i = 0; i < timeout && getSplitParent(meta) != null; i++) { - Thread.sleep(1000); + " to disappear"); + for (int i = 0; i < retries && getSplitParent(meta) != null; i++) { + Thread.sleep(5000); } assertTrue(getSplitParent(meta) == null); // Assert cleaned up. - assertFalse(this.fs.exists(parentDir)); + for (int i = 0; i < retries && fs.exists(parentDir); i++) { + Thread.sleep(5000); + } + assertFalse(fs.exists(parentDir)); } finally { - MiniHBaseCluster.shutdown(masterThread, regionServerThreads); + cluster.shutdown(); } } @@ -282,8 +314,13 @@ public class TestSplit extends HBaseTestCase { HStoreKey curKey = new HStoreKey(); TreeMap curVals = new TreeMap(); while(s.next(curKey, curVals)) { - HRegionInfo hri = (HRegionInfo)Writables. - getWritable(curVals.get(HConstants.COL_REGIONINFO), new HRegionInfo()); + byte[] bytes = curVals.get(HConstants.COL_REGIONINFO); + if (bytes == null || bytes.length == 0) { + continue; + } + HRegionInfo hri = + (HRegionInfo) Writables.getWritable(bytes, new HRegionInfo()); + // Assert that if region is a split region, that it is also offline. // Otherwise, if not a split region, assert that it is online. if (hri.isSplit() && hri.isOffline()) {