HADOOP-1662 Make region splits faster

Splits are now near-instantaneous.  On split, daughter splits create
'references' to store files up in the parent region using new 'HalfMapFile'
class to proxy accesses against the top-half or bottom-half of 
backing MapFile.  Parent region is deleted after all references in daughter
regions have been let go.

Below includes other cleanups and at least one bug fix for fails adding
>32k records and improvements to make it more likely TestRegionServerAbort
will complete..

A src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHStoreFile.java
    Added. Tests new Reference HStoreFiles. Test new HalfMapFileReader inner
    class of HStoreFile. Test that we do the right thing when HStoreFiles
    are smaller than a MapFile index range (i.e. there is not 'MidKey').
    Test we do right thing when key is outside of a HalfMapFile.
M src/contrib/hbase/src/test/org/apache/hadoop/hbase/HBaseTestCase.java
M src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestGet.java
M src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestScanner.java
M src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTimestamp.java
    getHRegionDir moved from HStoreFile to HRegion.
M src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestBatchUpdate.java
    Let out exception rather than catch and call 'fail'.
M src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
    Refactored so can start and stop a minihbasecluster w/o having to
    subclass this TestCase. Refactored methods in this class to use the
    newly added methods listed below.
    (MasterThread, RegionServerThread, startMaster, startRegionServers
      shutdown): Added.
    Added logging of abort, close and wait.  Also on abort/close
    was doing a remove that made it so subsequent wait had nothing to
    wait on.
M src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestSplit.java
    Added tests that assert all works properly at region level on
    multiple levels of splits and then do same on a cluster.
M src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestHRegion.java
    Removed catch and 'fail()'.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreFile.java
    Javadoc to explain how split now works. Have constructors flow
    into each other rather than replicate setup per instance. Moved
    in here operations such as delete, rename, and length of store files
    (No need of clients to remember to delete map and info files).
    (REF_NAME_PARSER, Reference, HalfMapFile, isReference,
      writeReferenceFiles, writeSplitInfo, readSplitInfo,
      createOrFail, getReader, getWriter, toString): Added.
    (getMapDir, getMapFilePath, getInfoDir, getInfoFilePath): Added
    a bunch of overrides for reference handling.
    (loadHStoreFiles): Amended to load references off disk.
    (splitStoreFiles): Redone to instead write references into
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
    Rename maps as readers and mapFiles as storefiles.
    Moved BloomFilterReader and Writer into HStoreFile. Removed
    getMapFileReader and getMapFileWriter (They are in HStoreFile now).
    (getReaders): Added.
    (HStoreSize): Added.  Data Structure to hold aggregated size
    of all HStoreFiles in HStore, the largest, its midkey, and
    if the HStore is splitable (May not be if references).
    Previous we only did largest file; less accurate.
    (getLargestFileSize): Renamed size and redone to aggregate
    sizes, etc.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HColumnDescriptor.java
    Have constructors waterfall down through each other rather than
    repeat initializations.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMerge.java
    Use new HStoreSize structure.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
    Added delayed remove of HRegion (Now done in HMaster as part of
    meta scan). Change LOG.error and LOG.warn so they throw stack trace
    instead of just the Exception.toString as message.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConstants.java
    (COLUMN_FAMILY_STR): Added.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
    Added why to log of splitting.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLogEdit.java
    Short is not big enough to hold edits tha could contain a sizable
    web page.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTable.java
    (getTableName): Added.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    Added constructor to BaseScanner that takes name of table we're
    scanning (ROOT or META usually). Added to scanOneRegion handling
    of split regions.  Collect splits to check while scanning and
    then outside of the scanning, so we can modify the META table
    is needed, do the checks of daughter regions and update on
    change of state.  Made LOG.warn and LOG.error print stack trace.
    (isSplitParent, cleanupSplits, hasReferences): Added. 
    Added toString to each of the PendingOperation implementations.
    In the ShutdownPendingOperation scan of meta data, removed
    check of startcode (if the server name is that of the dead
    server, it needs reassigning even if start code is good).
    Also, if server name is null -- possible if we are missing
    edits off end of log -- then the region should be reassigned
    just in case its from the dead server.  Also, if reassigning,
    clear from pendingRegions.  Server may have died after sending
    region is up but before the server confirms receipt in the
    meta scan. Added mare detail to each log.  In OpenPendingOperation
    we were trying to clear pendingRegion in wrong place -- it was
    never executed (regions were always pending). 
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
    Add split boolean.  Output offline and split status in toString.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
    Comments.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
    Moved getRegionDir here from HStoreFile.
    (COL_SPLITA, COL_SPLITB): Added.
    (closeAndSplit): Refactored to use new fast split method.
       StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
    (splitStoreFile): Moved into HStoreFile.
    (getSplitRegionDir, getSplitsDir, toString): Added.
    (needsSplit): Refactored to exploit new HStoreSize structure.
    Also manages notion of 'unsplitable' region.
    (largestHStore): Refactored.
    (removeSplitFromMETA, writeSplitToMETA, getSplit, hasReference): Added.
M src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Keying.java
    (intToBytes, getBytes): Added.
A src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/Writables.java    
    Utility reading and writing Writables.


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@564012 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2007-08-08 20:30:13 +00:00
parent b205cac36d
commit 790e3d767e
30 changed files with 2779 additions and 1030 deletions

View File

@ -85,3 +85,4 @@ Trunk (unreleased changes)
side objects (HTable/HBaseAdmin/HConnection) instead of HClient.
53. HADOOP-1528 HClient for multiple tables - expose close table function
54. HADOOP-1466 Clean up warnings, visibility and javadoc issues in HBase.
55. HADOOP-1662 Make region splits faster

View File

@ -498,11 +498,8 @@ public class HBaseAdmin implements HConstants {
private HRegionLocation getFirstMetaServerForTable(Text tableName)
throws IOException {
SortedMap<Text, HRegionLocation> metaservers =
connection.getTableServers(META_TABLE_NAME);
connection.getTableServers(META_TABLE_NAME);
return metaservers.get((metaservers.containsKey(tableName)) ?
tableName : metaservers.headMap(tableName).lastKey());
}
}

View File

@ -76,14 +76,7 @@ public class HColumnDescriptor implements WritableComparable {
* Default constructor. Must be present for Writable.
*/
public HColumnDescriptor() {
this.name = new Text();
this.maxVersions = DEFAULT_N_VERSIONS;
this.compressionType = COMPRESSION_NONE;
this.inMemory = false;
this.maxValueLength = Integer.MAX_VALUE;
this.bloomFilterSpecified = false;
this.bloomFilter = null;
this.versionNumber = COLUMN_DESCRIPTOR_VERSION;
this(null);
}
/**
@ -93,8 +86,10 @@ public class HColumnDescriptor implements WritableComparable {
* @param columnName - column family name
*/
public HColumnDescriptor(String columnName) {
this();
this.name.set(columnName);
this(columnName == null || columnName.length() <= 0?
new Text(): new Text(columnName),
DEFAULT_N_VERSIONS, CompressionType.NONE, false,
Integer.MAX_VALUE, null);
}
/**
@ -112,13 +107,19 @@ public class HColumnDescriptor implements WritableComparable {
* end in a <code>:</code>
* @throws IllegalArgumentException if the number of versions is &lt;= 0
*/
public HColumnDescriptor(Text name, int maxVersions, CompressionType compression,
boolean inMemory, int maxValueLength, BloomFilterDescriptor bloomFilter) {
public HColumnDescriptor(final Text name, final int maxVersions,
final CompressionType compression, final boolean inMemory,
final int maxValueLength, final BloomFilterDescriptor bloomFilter) {
String familyStr = name.toString();
Matcher m = LEGAL_FAMILY_NAME.matcher(familyStr);
if(m == null || !m.matches()) {
throw new IllegalArgumentException(
"Family names can only contain 'word characters' and must end with a ':'");
// Test name if not null (It can be null when deserializing after
// construction but before we've read in the fields);
if (familyStr.length() > 0) {
Matcher m = LEGAL_FAMILY_NAME.matcher(familyStr);
if(m == null || !m.matches()) {
throw new IllegalArgumentException("Illegal family name <" + name +
">. Family names can only contain " +
"'word characters' and must end with a ':'");
}
}
this.name = name;

View File

@ -270,8 +270,8 @@ public class HConnectionManager implements HConstants {
/** {@inheritDoc} */
public SortedMap<Text, HRegionLocation>
getTableServers(Text tableName) throws IOException {
getTableServers(Text tableName)
throws IOException {
if (tableName == null || tableName.getLength() == 0) {
throw new IllegalArgumentException(
"table name cannot be null or zero length");
@ -468,6 +468,7 @@ public class HConnectionManager implements HConstants {
try {
this.tablesBeingLocated.wait(threadWakeFrequency);
} catch (InterruptedException e) {
// continue
}
}
if (!waited) {

View File

@ -100,7 +100,8 @@ public interface HConstants {
// Defines for the column names used in both ROOT and META HBase 'meta' tables.
/** The ROOT and META column family */
static final Text COLUMN_FAMILY = new Text("info:");
static final String COLUMN_FAMILY_STR = "info:";
static final Text COLUMN_FAMILY = new Text(COLUMN_FAMILY_STR);
/** Array of meta column names */
static final Text [] COLUMN_FAMILY_ARRAY = new Text [] {COLUMN_FAMILY};

View File

@ -97,13 +97,15 @@ public class HLog implements HConstants {
static void splitLog(Path rootDir, Path srcDir, FileSystem fs,
Configuration conf) throws IOException {
Path logfiles[] = fs.listPaths(srcDir);
LOG.info("splitting " + logfiles.length + " log files in " +
srcDir.toString());
LOG.info("splitting " + logfiles.length + " log(s) in " +
srcDir.toString());
HashMap<Text, SequenceFile.Writer> logWriters =
new HashMap<Text, SequenceFile.Writer>();
try {
for(int i = 0; i < logfiles.length; i++) {
if (LOG.isDebugEnabled()) {
LOG.debug("Splitting " + logfiles[i]);
}
SequenceFile.Reader in =
new SequenceFile.Reader(fs, logfiles[i], conf);
try {
@ -113,7 +115,7 @@ public class HLog implements HConstants {
Text regionName = key.getRegionName();
SequenceFile.Writer w = logWriters.get(regionName);
if (w == null) {
Path logfile = new Path(HStoreFile.getHRegionDir(rootDir,
Path logfile = new Path(HRegion.getRegionDir(rootDir,
regionName), HREGION_OLDLOGFILE_NAME);
if (LOG.isDebugEnabled()) {
LOG.debug("getting new log file writer for path " + logfile);
@ -122,6 +124,9 @@ public class HLog implements HConstants {
HLogEdit.class);
logWriters.put(regionName, w);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Edit " + key.toString());
}
w.append(key, val);
}
} finally {

View File

@ -81,7 +81,7 @@ public class HLogEdit implements Writable {
/** {@inheritDoc} */
public void write(DataOutput out) throws IOException {
this.column.write(out);
out.writeShort(this.val.length);
out.writeInt(this.val.length);
out.write(this.val);
out.writeLong(timestamp);
}
@ -89,7 +89,7 @@ public class HLogEdit implements Writable {
/** {@inheritDoc} */
public void readFields(DataInput in) throws IOException {
this.column.readFields(in);
this.val = new byte[in.readShort()];
this.val = new byte[in.readInt()];
in.readFully(this.val);
this.timestamp = in.readLong();
}

File diff suppressed because it is too large Load Diff

View File

@ -29,6 +29,8 @@ import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Text;
@ -37,6 +39,7 @@ import org.apache.hadoop.io.Text;
* wrapper around a TreeMap that helps us when staging the Memcache out to disk.
*/
public class HMemcache {
static final Log LOG = LogFactory.getLog(HMemcache.class);
TreeMap<HStoreKey, byte []> memcache =
new TreeMap<HStoreKey, byte []>();
final Vector<TreeMap<HStoreKey, byte []>> history
@ -47,6 +50,7 @@ public class HMemcache {
/*
* Approximate size in bytes of the payload carried by this memcache.
* Does not consider deletes nor adding again on same key.
*/
private AtomicLong size = new AtomicLong(0);
@ -157,6 +161,7 @@ public class HMemcache {
/**
* @return Approximate size in bytes of payload carried by this memcache.
* Does not take into consideration deletes nor adding again on same key.
*/
public long getSize() {
return this.size.get();

View File

@ -143,17 +143,17 @@ class HMerge implements HConstants {
long currentSize = 0;
HRegion nextRegion = null;
long nextSize = 0;
Text midKey = new Text();
for(int i = 0; i < regions.length - 1; i++) {
if(currentRegion == null) {
currentRegion =
new HRegion(dir, hlog, fs, conf, regions[i], null);
currentSize = currentRegion.largestHStore();
currentSize = currentRegion.largestHStore(midKey).getAggregate();
}
nextRegion =
new HRegion(dir, hlog, fs, conf, regions[i + 1], null);
nextSize = nextRegion.largestHStore();
nextSize = nextRegion.largestHStore(midKey).getAggregate();
long maxFilesize =
conf.getLong("hbase.hregion.max.filesize", DEFAULT_MAX_FILE_SIZE);

View File

@ -36,8 +36,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.StringUtils;
@ -68,7 +69,7 @@ import org.apache.hadoop.util.StringUtils;
* each column family. (This config info will be communicated via the
* tabledesc.)
*
* The HTableDescriptor contains metainfo about the HRegion's table.
* <p>The HTableDescriptor contains metainfo about the HRegion's table.
* regionName is a unique identifier for this HRegion. (startKey, endKey]
* defines the keyspace for this HRegion.
*/
@ -76,24 +77,12 @@ public class HRegion implements HConstants {
static String SPLITDIR = "splits";
static String MERGEDIR = "merges";
static String TMPREGION_PREFIX = "tmpregion_";
static Random rand = new Random();
static final Random rand = new Random();
static final Log LOG = LogFactory.getLog(HRegion.class);
final AtomicBoolean closed = new AtomicBoolean(false);
private long noFlushCount = 0;
/**
* Deletes all the files for a HRegion
*
* @param fs - the file system object
* @param baseDirectory - base directory for HBase
* @param regionName - name of the region to delete
* @throws IOException
*/
static void deleteRegion(FileSystem fs, Path baseDirectory,
Text regionName) throws IOException {
LOG.info("Deleting region " + regionName);
fs.delete(HStoreFile.getHRegionDir(baseDirectory, regionName));
}
static final Text COL_SPLITA = new Text(COLUMN_FAMILY_STR + "splitA");
static final Text COL_SPLITB = new Text(COLUMN_FAMILY_STR + "splitB");
/**
* Merge two HRegions. They must be available on the current
@ -108,7 +97,7 @@ public class HRegion implements HConstants {
// Make sure that srcA comes first; important for key-ordering during
// write of the merged file.
FileSystem fs = srcA.getFilesystem();
if(srcA.getStartKey() == null) {
if(srcB.getStartKey() == null) {
throw new IOException("Cannot merge two regions with null start key");
@ -126,7 +115,6 @@ public class HRegion implements HConstants {
throw new IOException("Cannot merge non-adjacent regions");
}
FileSystem fs = a.getFilesystem();
Configuration conf = a.getConf();
HTableDescriptor tabledesc = a.getTableDesc();
HLog log = a.getLog();
@ -143,22 +131,21 @@ public class HRegion implements HConstants {
HRegionInfo newRegionInfo
= new HRegionInfo(Math.abs(rand.nextLong()), tabledesc, startKey, endKey);
Path newRegionDir = HStoreFile.getHRegionDir(merges, newRegionInfo.regionName);
Path newRegionDir = HRegion.getRegionDir(merges, newRegionInfo.regionName);
if(fs.exists(newRegionDir)) {
throw new IOException("Cannot merge; target file collision at " + newRegionDir);
}
LOG.info("starting merge of regions: " + a.getRegionName() + " and "
+ b.getRegionName() + " new region start key is '"
+ (startKey == null ? "" : startKey) + "', end key is '"
+ (endKey == null ? "" : endKey) + "'");
LOG.info("starting merge of regions: " + a.getRegionName() + " and " +
b.getRegionName() + " into new region " + newRegionInfo.toString());
// Flush each of the sources, and merge their files into a single
// target for each column family.
TreeSet<HStoreFile> alreadyMerged = new TreeSet<HStoreFile>();
TreeMap<Text, Vector<HStoreFile>> filesToMerge =
new TreeMap<Text, Vector<HStoreFile>>();
for(HStoreFile src: a.flushcache(true)) {
Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
if(v == null) {
@ -185,7 +172,7 @@ public class HRegion implements HConstants {
Text colFamily = es.getKey();
Vector<HStoreFile> srcFiles = es.getValue();
HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName,
colFamily, Math.abs(rand.nextLong()));
colFamily, Math.abs(rand.nextLong()));
dst.mergeStoreFiles(srcFiles, fs, conf);
alreadyMerged.addAll(srcFiles);
}
@ -193,7 +180,6 @@ public class HRegion implements HConstants {
// That should have taken care of the bulk of the data.
// Now close the source HRegions for good, and repeat the above to take care
// of any last-minute inserts
if(LOG.isDebugEnabled()) {
LOG.debug("flushing changes since start of merge for region "
+ a.getRegionName());
@ -235,13 +221,13 @@ public class HRegion implements HConstants {
for (Map.Entry<Text, Vector<HStoreFile>> es : filesToMerge.entrySet()) {
Text colFamily = es.getKey();
Vector<HStoreFile> srcFiles = es.getValue();
HStoreFile dst = new HStoreFile(conf, merges,
newRegionInfo.regionName, colFamily, Math.abs(rand.nextLong()));
HStoreFile dst = new HStoreFile(conf, merges, newRegionInfo.regionName,
colFamily, Math.abs(rand.nextLong()));
dst.mergeStoreFiles(srcFiles, fs, conf);
}
// Done
// Construction moves the merge files into place under region.
HRegion dstRegion = new HRegion(rootDir, log, fs, conf, newRegionInfo,
newRegionDir);
@ -304,7 +290,6 @@ public class HRegion implements HConstants {
* appropriate log info for this HRegion. If there is a previous log file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
*
* @param rootDir root directory for HBase instance
* @param fs is the filesystem.
* @param conf is global configuration settings.
@ -324,29 +309,27 @@ public class HRegion implements HConstants {
this.conf = conf;
this.regionInfo = regionInfo;
this.memcache = new HMemcache();
this.writestate.writesOngoing = true;
this.writestate.writesEnabled = true;
// Declare the regionName. This is a unique string for the region, used to
// build a unique filename.
this.regiondir = HStoreFile.getHRegionDir(rootDir, this.regionInfo.regionName);
this.regiondir = HRegion.getRegionDir(rootDir, this.regionInfo.regionName);
Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME);
// Move prefab HStore files into place (if any)
// Move prefab HStore files into place (if any). This picks up split files
// and any merges from splits and merges dirs.
if(initialFiles != null && fs.exists(initialFiles)) {
fs.rename(initialFiles, regiondir);
fs.rename(initialFiles, this.regiondir);
}
// Load in all the HStores.
for(Map.Entry<Text, HColumnDescriptor> e :
this.regionInfo.tableDesc.families().entrySet()) {
Text colFamily = HStoreKey.extractFamily(e.getKey());
stores.put(colFamily, new HStore(rootDir, this.regionInfo.regionName,
e.getValue(), fs, oldLogFile, conf));
stores.put(colFamily,
new HStore(rootDir, this.regionInfo.regionName, e.getValue(), fs,
oldLogFile, conf));
}
// Get rid of any splits or merges that were lost in-progress
@ -359,7 +342,7 @@ public class HRegion implements HConstants {
fs.delete(merges);
}
// By default, we flush the cache when 32M.
// By default, we flush the cache when 16M.
this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size",
1024*1024*16);
this.blockingMemcacheSize = this.memcacheFlushSize *
@ -367,8 +350,8 @@ public class HRegion implements HConstants {
// By default, we compact the region if an HStore has more than
// MIN_COMMITS_FOR_COMPACTION map files
this.compactionThreshold = conf.getInt("hbase.hregion.compactionThreshold",
3);
this.compactionThreshold =
conf.getInt("hbase.hregion.compactionThreshold", 3);
// By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
this.desiredMaxFileSize =
@ -397,9 +380,8 @@ public class HRegion implements HConstants {
* time-sensitive thread.
*
* @return Vector of all the storage files that the HRegion's component
* HStores make use of. It's a list of HStoreFile objects. Returns empty
* vector if already closed and null if it is judged that it should not
* close.
* HStores make use of. It's a list of all HStoreFile objects. Returns empty
* vector if already closed and null if judged that it should not close.
*
* @throws IOException
*/
@ -443,7 +425,6 @@ public class HRegion implements HConstants {
if(!shouldClose) {
return null;
}
LOG.info("closing region " + this.regionInfo.regionName);
// Write lock means no more row locks can be given out. Wait on
// outstanding row locks to come in before we close so we do not drop
@ -465,124 +446,121 @@ public class HRegion implements HConstants {
writestate.writesOngoing = false;
}
this.closed.set(true);
LOG.info("region " + this.regionInfo.regionName + " closed");
LOG.info("closed " + this.regionInfo.regionName);
}
} finally {
lock.releaseWriteLock();
}
}
/**
* Split the HRegion to create two brand-new ones. This will also close the
* current HRegion.
*
* Returns two brand-new (and open) HRegions
/*
* Split the HRegion to create two brand-new ones. This also closes
* current HRegion. Split should be fast since we don't rewrite store files
* but instead create new 'reference' store files that read off the top and
* bottom ranges of parent store files.
* @param midKey Row to split on.
* @param listener May be null.
* @return two brand-new (and open) HRegions
* @throws IOException
*/
HRegion[] closeAndSplit(Text midKey, RegionUnavailableListener listener)
HRegion[] closeAndSplit(final Text midKey,
final RegionUnavailableListener listener)
throws IOException {
if(((regionInfo.startKey.getLength() != 0)
&& (regionInfo.startKey.compareTo(midKey) > 0))
|| ((regionInfo.endKey.getLength() != 0)
&& (regionInfo.endKey.compareTo(midKey) < 0))) {
throw new IOException("Region splitkey must lie within region " +
"boundaries.");
}
checkMidKey(midKey);
long startTime = System.currentTimeMillis();
Path splits = new Path(regiondir, SPLITDIR);
if(! fs.exists(splits)) {
fs.mkdirs(splits);
Path splits = getSplitsDir();
HRegionInfo regionAInfo = new HRegionInfo(Math.abs(rand.nextLong()),
this.regionInfo.tableDesc, this.regionInfo.startKey, midKey);
Path dirA = getSplitRegionDir(splits, regionAInfo.regionName);
if(fs.exists(dirA)) {
throw new IOException("Cannot split; target file collision at " + dirA);
}
long regionAId = Math.abs(rand.nextLong());
HRegionInfo regionAInfo = new HRegionInfo(regionAId, regionInfo.tableDesc,
regionInfo.startKey, midKey);
long regionBId = Math.abs(rand.nextLong());
HRegionInfo regionBInfo =
new HRegionInfo(regionBId, regionInfo.tableDesc, midKey, null);
Path dirA = HStoreFile.getHRegionDir(splits, regionAInfo.regionName);
Path dirB = HStoreFile.getHRegionDir(splits, regionBInfo.regionName);
if(fs.exists(dirA) || fs.exists(dirB)) {
throw new IOException("Cannot split; target file collision at " + dirA +
" or " + dirB);
HRegionInfo regionBInfo = new HRegionInfo(Math.abs(rand.nextLong()),
this.regionInfo.tableDesc, midKey, null);
Path dirB = getSplitRegionDir(splits, regionBInfo.regionName);
if(this.fs.exists(dirB)) {
throw new IOException("Cannot split; target file collision at " + dirB);
}
// We just copied most of the data. Now get whatever updates are up in
// the memcache (after shutting down new updates).
// Notify the caller that we are about to close the region. This moves
// us ot the 'retiring' queue. Means no more updates coming in -- just
// us to the 'retiring' queue. Means no more updates coming in -- just
// whatever is outstanding.
listener.closing(this.getRegionName());
// Wait on the last row updates to come in.
LOG.debug("Starting wait on row locks.");
waitOnRowLocks();
// Flush this HRegion out to storage, and turn off flushes
// or compactions until close() is called.
LOG.debug("Calling flushcache inside closeAndSplit");
Vector<HStoreFile> hstoreFilesToSplit = flushcache(true);
if (hstoreFilesToSplit == null) {
// It should always return a list of hstore files even if memcache is
// empty. It will return null if concurrent compaction or splits which
// should not happen.
throw new NullPointerException("Flushcache did not return any files");
}
TreeSet<HStoreFile> alreadySplit = new TreeSet<HStoreFile>();
for(HStoreFile hsf: hstoreFilesToSplit) {
alreadySplit.add(splitStoreFile(hsf, splits, regionAInfo,
regionBInfo, midKey));
if (listener != null) {
listener.closing(getRegionName());
}
// Now close the HRegion
hstoreFilesToSplit = close();
// Now close the HRegion. Close returns all store files or null if not
// supposed to close (? What to do in this case? Implement abort of close?)
// Close also does wait on outstanding rows and calls a flush just-in-case.
Vector<HStoreFile> hstoreFilesToSplit = close();
if (hstoreFilesToSplit == null) {
LOG.warn("Close came back null (Implement abort of close?)");
}
// Tell listener that region is now closed and that they can therefore
// clean up any outstanding references.
listener.closed(this.getRegionName());
if (listener != null) {
listener.closed(this.getRegionName());
}
// Copy the small remainder
for(HStoreFile hsf: hstoreFilesToSplit) {
if(!alreadySplit.contains(hsf)) {
splitStoreFile(hsf, splits, regionAInfo, regionBInfo, midKey);
}
// Split each store file.
for(HStoreFile h: hstoreFilesToSplit) {
// A reference to the bottom half of the hsf store file.
HStoreFile.Reference aReference = new HStoreFile.Reference(
getRegionName(), h.getFileId(), new HStoreKey(midKey),
HStoreFile.Range.bottom);
HStoreFile a = new HStoreFile(this.conf, splits,
regionAInfo.regionName, h.getColFamily(), Math.abs(rand.nextLong()),
aReference);
HStoreFile.Reference bReference = new HStoreFile.Reference(
getRegionName(), h.getFileId(), new HStoreKey(midKey),
HStoreFile.Range.top);
HStoreFile b = new HStoreFile(this.conf, splits,
regionBInfo.regionName, h.getColFamily(), Math.abs(rand.nextLong()),
bReference);
h.splitStoreFile(a, b, this.fs);
}
// Done
// Done!
// Opening the region copies the splits files from the splits directory
// under each region.
HRegion regionA = new HRegion(rootDir, log, fs, conf, regionAInfo, dirA);
HRegion regionB = new HRegion(rootDir, log, fs, conf, regionBInfo, dirB);
// Cleanup
fs.delete(splits); // Get rid of splits directory
fs.delete(regiondir); // and the directory for the old region
HRegion regions[] = new HRegion[2];
regions[0] = regionA;
regions[1] = regionB;
LOG.info("Region split of " + this.regionInfo.regionName + " complete. " +
"New regions are: " + regions[0].getRegionName() + ", " +
regions[1].getRegionName() + ". Took " +
boolean deleted = fs.delete(splits); // Get rid of splits directory
if (LOG.isDebugEnabled()) {
LOG.debug("Cleaned up " + splits.toString() + " " + deleted);
}
HRegion regions[] = new HRegion [] {regionA, regionB};
LOG.info("Region split of " + this.regionInfo.regionName + " complete; " +
"new regions: " + regions[0].getRegionName() + ", " +
regions[1].getRegionName() + ". Split took " +
StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
return regions;
}
private HStoreFile splitStoreFile(final HStoreFile hsf, final Path splits,
final HRegionInfo a, final HRegionInfo b, final Text midKey)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Started splitting HStore " + hsf.getRegionName() + "/" +
hsf.getColFamily() + "/" + hsf.fileId());
private void checkMidKey(final Text midKey) throws IOException {
if(((this.regionInfo.startKey.getLength() != 0)
&& (this.regionInfo.startKey.compareTo(midKey) > 0))
|| ((this.regionInfo.endKey.getLength() != 0)
&& (this.regionInfo.endKey.compareTo(midKey) < 0))) {
throw new IOException("Region splitkey must lie within region " +
"boundaries.");
}
HStoreFile dstA = new HStoreFile(conf, splits, a.regionName,
hsf.getColFamily(), Math.abs(rand.nextLong()));
HStoreFile dstB = new HStoreFile(conf, splits, b.regionName,
hsf.getColFamily(), Math.abs(rand.nextLong()));
hsf.splitStoreFile(midKey, dstA, dstB, fs, conf);
if (LOG.isDebugEnabled()) {
LOG.debug("Finished splitting HStore " + hsf.getRegionName() + "/" +
hsf.getColFamily() + "/" + hsf.fileId());
}
private Path getSplitRegionDir(final Path splits, final Text regionName) {
return HRegion.getRegionDir(splits, regionName);
}
private Path getSplitsDir() throws IOException {
Path splits = new Path(this.regiondir, SPLITDIR);
if(!this.fs.exists(splits)) {
this.fs.mkdirs(splits);
}
return hsf;
return splits;
}
//////////////////////////////////////////////////////////////////////////////
@ -591,22 +569,22 @@ public class HRegion implements HConstants {
/** @return start key for region */
public Text getStartKey() {
return regionInfo.startKey;
return this.regionInfo.startKey;
}
/** @return end key for region */
public Text getEndKey() {
return regionInfo.endKey;
return this.regionInfo.endKey;
}
/** @return region id */
public long getRegionId() {
return regionInfo.regionId;
return this.regionInfo.regionId;
}
/** @return region name */
public Text getRegionName() {
return regionInfo.regionName;
return this.regionInfo.regionName;
}
/** @return root directory path */
@ -616,27 +594,27 @@ public class HRegion implements HConstants {
/** @return HTableDescriptor for this region */
public HTableDescriptor getTableDesc() {
return regionInfo.tableDesc;
return this.regionInfo.tableDesc;
}
/** @return HLog in use for this region */
public HLog getLog() {
return log;
return this.log;
}
/** @return Configuration object */
public Configuration getConf() {
return conf;
return this.conf;
}
/** @return region directory Path */
public Path getRegionDir() {
return regiondir;
return this.regiondir;
}
/** @return FileSystem being used by this region */
public FileSystem getFilesystem() {
return fs;
return this.fs;
}
//////////////////////////////////////////////////////////////////////////////
@ -646,63 +624,74 @@ public class HRegion implements HConstants {
// upkeep.
//////////////////////////////////////////////////////////////////////////////
/**
/*
* Iterates through all the HStores and finds the one with the largest
* MapFile size. If the size is greater than the (currently hard-coded)
* threshold, returns true indicating that the region should be split. The
* midKey for the largest MapFile is returned through the midKey parameter.
*
* @param midKey - (return value) midKey of the largest MapFile
* @return - true if the region should be split
* It is possible for us to rule the region non-splitable even in excess of
* configured size. This happens if region contains a reference file. If
* a reference file, the region can not be split.
* @param midKey midKey of the largest MapFile
* @return true if the region should be split. midKey is set by this method.
* Check it for a midKey value on return.
*/
boolean needsSplit(Text midKey) {
lock.obtainReadLock();
try {
Text key = new Text();
long maxSize = 0;
long aggregateSize = 0;
for(HStore store: stores.values()) {
long size = store.getLargestFileSize(key);
aggregateSize += size;
if(size > maxSize) { // Largest so far
maxSize = size;
midKey.set(key);
}
}
HStore.HStoreSize biggest = largestHStore(midKey);
long triggerSize =
this.desiredMaxFileSize + (this.desiredMaxFileSize / 2);
boolean split = (maxSize >= triggerSize || aggregateSize >= triggerSize);
boolean split = (biggest.getAggregate() >= triggerSize);
if (split) {
LOG.info("Splitting " + getRegionName().toString() +
" because largest file is " + StringUtils.humanReadableInt(maxSize) +
", aggregate size is " +
StringUtils.humanReadableInt(aggregateSize) +
" and desired size is " +
StringUtils.humanReadableInt(this.desiredMaxFileSize));
if (!biggest.isSplitable()) {
LOG.warn("Region " + getRegionName().toString() +
" is NOT splitable though its aggregate size is " +
StringUtils.humanReadableInt(biggest.getAggregate()) +
" and desired size is " +
StringUtils.humanReadableInt(this.desiredMaxFileSize));
split = false;
} else {
LOG.info("Splitting " + getRegionName().toString() +
" because largest aggregate size is " +
StringUtils.humanReadableInt(biggest.getAggregate()) +
" and desired size is " +
StringUtils.humanReadableInt(this.desiredMaxFileSize));
}
}
return split;
} finally {
lock.releaseReadLock();
}
}
/**
* @return - returns the size of the largest HStore
* @return returns size of largest HStore. Also returns whether store is
* splitable or not (Its not splitable if region has a store that has a
* reference store file).
*/
long largestHStore() {
long maxsize = 0;
HStore.HStoreSize largestHStore(final Text midkey) {
HStore.HStoreSize biggest = null;
boolean splitable = true;
lock.obtainReadLock();
try {
Text key = new Text();
for(HStore h: stores.values()) {
long size = h.getLargestFileSize(key);
if(size > maxsize) { // Largest so far
maxsize = size;
HStore.HStoreSize size = h.size(midkey);
// If we came across a reference down in the store, then propagate
// fact that region is not splitable.
if (splitable) {
splitable = size.splitable;
}
if (biggest == null) {
biggest = size;
continue;
}
if(size.getAggregate() > biggest.getAggregate()) { // Largest so far
biggest = size;
}
}
return maxsize;
biggest.setSplitable(splitable);
return biggest;
} finally {
lock.releaseReadLock();
}
@ -891,7 +880,6 @@ public class HRegion implements HConstants {
* not flush, returns list of all store files.
*/
Vector<HStoreFile> internalFlushcache() throws IOException {
long startTime = -1;
if(LOG.isDebugEnabled()) {
startTime = System.currentTimeMillis();
@ -911,12 +899,14 @@ public class HRegion implements HConstants {
// explicitly cleaned up using a call to deleteSnapshot().
HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
if(retval == null || retval.memcacheSnapshot == null) {
LOG.debug("Finished memcache flush; empty snapshot");
return getAllStoreFiles();
}
long logCacheFlushId = retval.sequenceId;
if(LOG.isDebugEnabled()) {
LOG.debug("Snapshotted memcache for region " +
this.regionInfo.regionName + ". Sequence id " + retval.sequenceId);
this.regionInfo.regionName + " with sequence id " + retval.sequenceId +
" and entries " + retval.memcacheSnapshot.size());
}
// A. Flush memcache to all the HStores.
@ -1272,7 +1262,7 @@ public class HRegion implements HConstants {
+ lockid + " unexpected aborted by another thread");
}
this.targetColumns.remove(lockid);
this.targetColumns.remove(Long.valueOf(lockid));
releaseRowLock(row);
}
}
@ -1387,7 +1377,7 @@ public class HRegion implements HConstants {
// Pattern is that all access to rowsToLocks and/or to
// locksToRows is via a lock on rowsToLocks.
synchronized(rowsToLocks) {
return locksToRows.get(lockid);
return locksToRows.get(Long.valueOf(lockid));
}
}
@ -1398,7 +1388,7 @@ public class HRegion implements HConstants {
void releaseRowLock(Text row) {
synchronized(rowsToLocks) {
long lockid = rowsToLocks.remove(row).longValue();
locksToRows.remove(lockid);
locksToRows.remove(Long.valueOf(lockid));
rowsToLocks.notifyAll();
}
}
@ -1415,6 +1405,11 @@ public class HRegion implements HConstants {
}
}
@Override
public String toString() {
return getRegionName().toString();
}
/**
* HScanner is an iterator through a bunch of rows in an HRegion.
*/
@ -1686,7 +1681,7 @@ public class HRegion implements HConstants {
static HRegion createHRegion(final HRegionInfo info,
final Path rootDir, final Configuration conf, final Path initialFiles)
throws IOException {
Path regionDir = HStoreFile.getHRegionDir(rootDir, info.regionName);
Path regionDir = HRegion.getRegionDir(rootDir, info.regionName);
FileSystem fs = FileSystem.get(conf);
fs.mkdirs(regionDir);
return new HRegion(rootDir,
@ -1721,19 +1716,23 @@ public class HRegion implements HConstants {
final long startCode)
throws IOException {
HTable t = new HTable(conf, table);
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(bytes);
region.getRegionInfo().write(out);
long lockid = t.startUpdate(region.getRegionName());
t.put(lockid, COL_REGIONINFO, bytes.toByteArray());
t.put(lockid, COL_SERVER,
serverAddress.toString().getBytes(UTF8_ENCODING));
t.put(lockid, COL_STARTCODE,
String.valueOf(startCode).getBytes(UTF8_ENCODING));
t.commit(lockid);
if (LOG.isDebugEnabled()) {
LOG.info("Added region " + region.getRegionName() + " to table " +
table);
try {
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(bytes);
region.getRegionInfo().write(out);
long lockid = t.startUpdate(region.getRegionName());
t.put(lockid, COL_REGIONINFO, bytes.toByteArray());
t.put(lockid, COL_SERVER,
serverAddress.toString().getBytes(UTF8_ENCODING));
t.put(lockid, COL_STARTCODE,
String.valueOf(startCode).getBytes(UTF8_ENCODING));
t.commit(lockid);
if (LOG.isDebugEnabled()) {
LOG.info("Added region " + region.getRegionName() + " to table " +
table);
}
} finally {
t.close();
}
}
@ -1748,31 +1747,126 @@ public class HRegion implements HConstants {
final Text table, final Text regionName)
throws IOException {
HTable t = new HTable(conf, table);
long lockid = t.startUpdate(regionName);
try {
removeRegionFromMETA(t, regionName);
} finally {
t.close();
}
}
/**
* Delete <code>region</code> from META <code>table</code>.
* @param conf Configuration object
* @param table META table we are to delete region from.
* @param regionName Region to remove.
* @throws IOException
*/
static void removeRegionFromMETA(final HTable t, final Text regionName)
throws IOException {
long lockid = t.startBatchUpdate(regionName);
t.delete(lockid, COL_REGIONINFO);
t.delete(lockid, COL_SERVER);
t.delete(lockid, COL_STARTCODE);
t.commit(lockid);
if (LOG.isDebugEnabled()) {
LOG.debug("Removed " + regionName + " from table " + table);
LOG.debug("Removed " + regionName + " from table " + t.getTableName());
}
}
/**
* Delete <code>split</code> column from META <code>table</code>.
* @param t
* @param split
* @param regionName Region to remove.
* @throws IOException
*/
static void removeSplitFromMETA(final HTable t, final Text regionName,
final Text split)
throws IOException {
long lockid = t.startBatchUpdate(regionName);
t.delete(lockid, split);
t.commit(lockid);
if (LOG.isDebugEnabled()) {
LOG.debug("Removed " + split + " from " + regionName +
" from table " + t.getTableName());
}
}
/**
* <code>region</code> has split. Update META <code>table</code>.
* @param client Client to use running update.
* @param table META table we are to delete region from.
* @param regionName Region to remove.
* @throws IOException
*/
static void writeSplitToMETA(final Configuration conf,
final Text table, final Text regionName, final HRegionInfo splitA,
final HRegionInfo splitB)
throws IOException {
HTable t = new HTable(conf, table);
try {
HRegionInfo hri = getRegionInfo(t.get(regionName, COL_REGIONINFO));
hri.offLine = true;
hri.split = true;
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bytes);
hri.write(dos);
dos.close();
long lockid = t.startBatchUpdate(regionName);
t.put(lockid, COL_REGIONINFO, bytes.toByteArray());
t.put(lockid, COL_SPLITA, Writables.getBytes(splitA));
t.put(lockid, COL_SPLITB, Writables.getBytes(splitB));
t.commitBatch(lockid);
if (LOG.isDebugEnabled()) {
LOG.debug("Updated " + regionName + " in table " + table +
" on its being split");
}
} finally {
t.close();
}
}
/**
* @param whichSplit COL_SPLITA or COL_SPLITB?
* @param data Map of META row labelled column data.
* @return Server
* @return HRegionInfo or null if not found.
* @throws IOException
*/
static HRegionInfo getSplit(final TreeMap<Text, byte[]> data,
final Text whichSplit)
throws IOException {
if (!(whichSplit.equals(COL_SPLITA) || whichSplit.equals(COL_SPLITB))) {
throw new IOException("Illegal Argument: " + whichSplit);
}
byte [] bytes = data.get(whichSplit);
if (bytes == null || bytes.length == 0) {
return null;
}
return (HRegionInfo)((bytes == null || bytes.length == 0)?
null:
Writables.getWritable(bytes, new HRegionInfo()));
}
/**
* @param data Map of META row labelled column data.
* @return An HRegionInfo instance.
* @throws IOException
*/
static HRegionInfo getRegionInfo(final TreeMap<Text, byte[]> data)
throws IOException {
byte[] bytes = data.get(COL_REGIONINFO);
return getRegionInfo(data.get(COL_REGIONINFO));
}
/**
* @param bytes Bytes of a HRegionInfo.
* @return An HRegionInfo instance.
* @throws IOException
*/
static HRegionInfo getRegionInfo(final byte[] bytes) throws IOException {
if (bytes == null || bytes.length == 0) {
throw new IOException("no value for " + COL_REGIONINFO);
}
DataInputBuffer in = new DataInputBuffer();
in.reset(bytes, bytes.length);
HRegionInfo info = new HRegionInfo();
info.readFields(in);
return info;
return (HRegionInfo)Writables.getWritable(bytes, new HRegionInfo());
}
/**
@ -1810,4 +1904,52 @@ public class HRegion implements HConstants {
}
return startCode;
}
}
public static Path getRegionDir(final Path dir, final Text regionName) {
return new Path(dir, new Path(HREGIONDIR_PREFIX + regionName));
}
/**
* Deletes all the files for a HRegion
*
* @param fs the file system object
* @param baseDirectory base directory for HBase
* @param regionName name of the region to delete
* @throws IOException
* @return True if deleted.
*/
static boolean deleteRegion(FileSystem fs, Path baseDirectory,
Text regionName) throws IOException {
Path p = HRegion.getRegionDir(fs.makeQualified(baseDirectory), regionName);
return fs.delete(p);
}
/**
* Look for HStoreFile references in passed region.
* @param fs
* @param baseDirectory
* @param hri
* @return True if we found references.
* @throws IOException
*/
static boolean hasReferences(final FileSystem fs, final Path baseDirectory,
final HRegionInfo hri)
throws IOException {
boolean result = false;
for (Text family: hri.getTableDesc().families().keySet()) {
Path p = HStoreFile.getMapDir(baseDirectory, hri.getRegionName(),
HStoreKey.extractFamily(family));
// Look for reference files.
Path [] ps = fs.listPaths(p, new PathFilter () {
public boolean accept(Path path) {
return HStoreFile.isReference(path);
}});
if (ps != null && ps.length > 0) {
result = true;
break;
}
}
return result;
}
}

View File

@ -66,6 +66,7 @@ public class HRegionInfo implements WritableComparable {
Text startKey;
Text endKey;
boolean offLine;
boolean split;
HTableDescriptor tableDesc;
/** Default constructor - creates empty object */
@ -76,6 +77,7 @@ public class HRegionInfo implements WritableComparable {
this.endKey = new Text();
this.regionName = new Text();
this.offLine = false;
this.split = false;
}
/**
@ -88,7 +90,7 @@ public class HRegionInfo implements WritableComparable {
this();
readFields(new DataInputStream(new ByteArrayInputStream(serializedBytes)));
}
/**
* Construct HRegionInfo with explicit parameters
*
@ -99,7 +101,25 @@ public class HRegionInfo implements WritableComparable {
* @throws IllegalArgumentException
*/
public HRegionInfo(long regionId, HTableDescriptor tableDesc, Text startKey,
Text endKey) throws IllegalArgumentException {
Text endKey)
throws IllegalArgumentException {
this(regionId, tableDesc, startKey, endKey, false);
}
/**
* Construct HRegionInfo with explicit parameters
*
* @param regionId the region id
* @param tableDesc the table descriptor
* @param startKey first key in region
* @param endKey end of key range
* @param split true if this region has split and we have daughter regions
* regions that may or may not hold references to this region.
* @throws IllegalArgumentException
*/
public HRegionInfo(long regionId, HTableDescriptor tableDesc, Text startKey,
Text endKey, final boolean split)
throws IllegalArgumentException {
this.regionId = regionId;
@ -124,6 +144,7 @@ public class HRegionInfo implements WritableComparable {
regionId);
this.offLine = false;
this.split = split;
}
/** @return the endKey */
@ -150,6 +171,20 @@ public class HRegionInfo implements WritableComparable {
public HTableDescriptor getTableDesc(){
return tableDesc;
}
/**
* @return True if has been split and has daughters.
*/
public boolean isSplit() {
return this.split;
}
/**
* @return True if this region is offline.
*/
public boolean isOffline() {
return this.offLine;
}
/**
* {@inheritDoc}
@ -157,8 +192,10 @@ public class HRegionInfo implements WritableComparable {
@Override
public String toString() {
return "regionname: " + this.regionName.toString() + ", startKey: <" +
this.startKey.toString() + ">, tableDesc: {" +
this.tableDesc.toString() + "}";
this.startKey.toString() + ">," +
(isOffline()? " offline: true,": "") +
(isSplit()? " split: true,": "") +
" tableDesc: {" + this.tableDesc.toString() + "}";
}
/**
@ -197,6 +234,7 @@ public class HRegionInfo implements WritableComparable {
endKey.write(out);
regionName.write(out);
out.writeBoolean(offLine);
out.writeBoolean(split);
}
/**
@ -209,6 +247,7 @@ public class HRegionInfo implements WritableComparable {
this.endKey.readFields(in);
this.regionName.readFields(in);
this.offLine = in.readBoolean();
this.split = in.readBoolean();
}
//

View File

@ -91,4 +91,4 @@ public class HRegionLocation implements Comparable {
}
return result;
}
}
}

View File

@ -113,7 +113,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// regions.
retiringRegions.put(regionName, onlineRegions.remove(regionName));
if (LOG.isDebugEnabled()) {
LOG.debug(regionName.toString() + "closing (" +
LOG.debug(regionName.toString() + " closing (" +
"Adding to retiringRegions)");
}
} finally {
@ -199,15 +199,16 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// splitting a 'normal' region, and the ROOT table needs to be
// updated if we are splitting a META region.
final Text tableToUpdate =
region.getRegionInfo().tableDesc.getName().equals(META_TABLE_NAME) ?
ROOT_TABLE_NAME : META_TABLE_NAME;
region.getRegionInfo().tableDesc.getName().equals(META_TABLE_NAME)?
ROOT_TABLE_NAME : META_TABLE_NAME;
LOG.info("Updating " + tableToUpdate + " with region split info");
// Remove old region from META
for (int tries = 0; tries < numRetries; tries++) {
try {
HRegion.removeRegionFromMETA(conf, tableToUpdate,
region.getRegionName());
HRegion.writeSplitToMETA(conf, tableToUpdate,
region.getRegionName(), newRegions[0].getRegionInfo(),
newRegions[1].getRegionInfo());
break;
} catch (IOException e) {
if(tries == numRetries - 1) {
@ -242,18 +243,17 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
LOG.debug("Reporting region split to master");
}
reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(),
newRegions[1].getRegionInfo());
newRegions[1].getRegionInfo());
LOG.info("region split, META update, and report to master all" +
" successful. Old region=" + oldRegionInfo.getRegionName() +
", new regions: " + newRegions[0].getRegionName() + ", " +
newRegions[1].getRegionName());
" successful. Old region=" + oldRegionInfo.getRegionName() +
", new regions: " + newRegions[0].getRegionName() + ", " +
newRegions[1].getRegionName());
// Finally, start serving the new regions
lock.writeLock().lock();
try {
onlineRegions.put(newRegions[0].getRegionName(), newRegions[0]);
onlineRegions.put(newRegions[1].getRegionName(), newRegions[1]);
} finally {
lock.writeLock().unlock();
}
@ -307,7 +307,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
iex = x;
}
}
LOG.error(iex);
LOG.error("", iex);
}
}
}
@ -347,10 +347,10 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* {@inheritDoc}
*/
public void run() {
while(! stopRequested) {
while(!stopRequested) {
synchronized(logRollerLock) {
// If the number of log entries is high enough, roll the log. This is a
// very fast operation, but should not be done too frequently.
// If the number of log entries is high enough, roll the log. This
// is a very fast operation, but should not be done too frequently.
int nEntries = log.getNumEntries();
if(nEntries > this.maxLogEntries) {
try {
@ -359,17 +359,16 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
} catch (IOException iex) {
if (iex instanceof RemoteException) {
try {
iex = RemoteExceptionHandler.decodeRemoteException((RemoteException) iex);
iex = RemoteExceptionHandler.
decodeRemoteException((RemoteException) iex);
} catch (IOException x) {
iex = x;
}
}
LOG.warn(iex);
LOG.warn("", iex);
}
}
}
if(!stopRequested) {
try {
Thread.sleep(threadWakeFrequency);
@ -586,7 +585,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
e = ex;
}
}
LOG.error(e);
LOG.error("", e);
}
while(! stopRequested) {
@ -653,9 +652,6 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
break;
default:
if (LOG.isDebugEnabled()) {
LOG.debug("Got default message");
}
try {
toDo.put(new ToDoEntry(msgs[i]));
} catch (InterruptedException e) {
@ -678,7 +674,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
e = ex;
}
}
LOG.error(e);
LOG.error("", e);
}
}
@ -716,16 +712,16 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
if (abortRequested) {
try {
log.close();
LOG.info("On abort, closed hlog");
} catch (IOException e) {
if (e instanceof RemoteException) {
try {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
} catch (IOException ex) {
e = ex;
}
}
LOG.warn(e);
LOG.warn("Abort close of log", e);
}
closeAllRegions(); // Don't leave any open file handles
LOG.info("aborting server at: " +
@ -743,7 +739,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
e = ex;
}
}
LOG.error(e);
LOG.error("", e);
}
try {
HMsg[] exitMsg = new HMsg[closedRegions.size() + 1];
@ -767,7 +763,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
e = ex;
}
}
LOG.warn(e);
LOG.warn("", e);
}
LOG.info("stopping server at: " +
serverInfo.getServerAddress().toString());
@ -1113,7 +1109,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
iex = x;
}
}
LOG.error(iex);
LOG.error("", iex);
}
}
}
@ -1267,12 +1263,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
if (e instanceof RemoteException) {
try {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
} catch (IOException x) {
e = x;
}
}
LOG.error(e);
LOG.error("", e);
throw e;
}
return scannerId;

View File

@ -33,21 +33,21 @@ import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.StringUtils;
import org.onelab.filter.*;
import org.onelab.filter.BloomFilter;
import org.onelab.filter.CountingBloomFilter;
import org.onelab.filter.Filter;
import org.onelab.filter.RetouchedBloomFilter;
/**
* HStore maintains a bunch of data files. It is responsible for maintaining
@ -86,8 +86,8 @@ class HStore implements HConstants {
final HLocking lock = new HLocking();
TreeMap<Long, MapFile.Reader> maps = new TreeMap<Long, MapFile.Reader>();
TreeMap<Long, HStoreFile> mapFiles = new TreeMap<Long, HStoreFile>();
TreeMap<Long, HStoreFile> storefiles = new TreeMap<Long, HStoreFile>();
TreeMap<Long, MapFile.Reader> readers = new TreeMap<Long, MapFile.Reader>();
Random rand = new Random();
@ -137,10 +137,9 @@ class HStore implements HConstants {
if(family.getCompression() != HColumnDescriptor.CompressionType.NONE) {
if(family.getCompression() == HColumnDescriptor.CompressionType.BLOCK) {
this.compression = SequenceFile.CompressionType.BLOCK;
} else if(family.getCompression() == HColumnDescriptor.CompressionType.RECORD) {
} else if(family.getCompression() ==
HColumnDescriptor.CompressionType.RECORD) {
this.compression = SequenceFile.CompressionType.RECORD;
} else {
assert(false);
}
@ -148,16 +147,13 @@ class HStore implements HConstants {
this.fs = fs;
this.conf = conf;
this.mapdir = HStoreFile.getMapDir(dir, regionName, familyName);
fs.mkdirs(mapdir);
this.loginfodir = HStoreFile.getInfoDir(dir, regionName, familyName);
fs.mkdirs(loginfodir);
if(family.bloomFilter == null) {
this.filterDir = null;
this.bloomFilter = null;
} else {
this.filterDir = HStoreFile.getFilterDir(dir, regionName, familyName);
fs.mkdirs(filterDir);
@ -165,13 +161,15 @@ class HStore implements HConstants {
}
if(LOG.isDebugEnabled()) {
LOG.debug("Starting HStore for " + this.storeName);
LOG.debug("starting " + this.storeName +
((reconstructionLog == null)?
" (no reconstruction log)": " with reconstruction log: " +
reconstructionLog.toString()));
}
// Either restart or get rid of any leftover compaction work. Either way,
// by the time processReadyCompaction() returns, we can get rid of the
// existing compaction-dir.
this.compactdir = new Path(dir, COMPACTION_DIR);
Path curCompactStore =
HStoreFile.getHStoreDir(compactdir, regionName, familyName);
@ -187,7 +185,7 @@ class HStore implements HConstants {
Vector<HStoreFile> hstoreFiles
= HStoreFile.loadHStoreFiles(conf, dir, regionName, familyName, fs);
for(HStoreFile hsf: hstoreFiles) {
mapFiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf);
this.storefiles.put(Long.valueOf(hsf.loadInfo(fs)), hsf);
}
// Now go through all the HSTORE_LOGINFOFILEs and figure out the
@ -209,21 +207,24 @@ class HStore implements HConstants {
}
doReconstructionLog(reconstructionLog, maxSeqID);
// Compact all the MapFiles into a single file. The resulting MapFile
// should be "timeless"; that is, it should not have an associated seq-ID,
// because all log messages have been reflected in the TreeMaps at this
// point.
if(mapFiles.size() >= 1) {
// point.
//
// TODO: Only do the compaction if we are over a threshold, not
// every time. Not necessary if only two or three store files. Fix after
// revamp of compaction.
if(storefiles.size() > 1) {
compactHelper(true);
}
// Finally, start up all the map readers! (There should be just one at this
// point, as we've compacted them all.)
for(Map.Entry<Long, HStoreFile> e: mapFiles.entrySet()) {
// TODO - is this really necessary? Don't I do this inside compact()?
maps.put(e.getKey(),
getMapFileReader(e.getValue().getMapFilePath().toString()));
for(Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
this.readers.put(e.getKey(),
e.getValue().getReader(this.fs, this.bloomFilter));
}
}
@ -239,6 +240,11 @@ class HStore implements HConstants {
final long maxSeqID)
throws UnsupportedEncodingException, IOException {
if (reconstructionLog == null || !fs.exists(reconstructionLog)) {
if (reconstructionLog != null && !fs.exists(reconstructionLog)) {
LOG.warn("Passed reconstruction log " + reconstructionLog +
" does not exist");
}
// Nothing to do.
return;
}
long maxSeqIdInLog = -1;
@ -271,8 +277,7 @@ class HStore implements HConstants {
}
HStoreKey k = new HStoreKey(key.getRow(), column, val.getTimestamp());
if (LOG.isDebugEnabled()) {
LOG.debug("Applying edit " + k.toString() + "=" +
new String(val.getVal(), UTF8_ENCODING));
LOG.debug("Applying edit " + k.toString());
}
reconstructedCache.put(k, val.getVal());
}
@ -364,144 +369,24 @@ class HStore implements HConstants {
LOG.debug("flushed bloom filter for " + this.storeName);
}
}
/** Generates a bloom filter key from the row and column keys */
Key getBloomFilterKey(HStoreKey k) {
StringBuilder s = new StringBuilder(k.getRow().toString());
s.append(k.getColumn().toString());
byte[] bytes = null;
try {
bytes = s.toString().getBytes(HConstants.UTF8_ENCODING);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
assert(false);
}
return new Key(bytes);
}
/**
* Extends MapFile.Reader and overrides get and getClosest to consult the
* bloom filter before attempting to read from disk.
*/
private class BloomFilterReader extends MapFile.Reader {
BloomFilterReader(FileSystem fs, String dirName, Configuration conf)
throws IOException {
super(fs, dirName, conf);
}
/** {@inheritDoc} */
@Override
public Writable get(WritableComparable key, Writable val) throws IOException {
// Note - the key being passed to us is always a HStoreKey
if(bloomFilter.membershipTest(getBloomFilterKey((HStoreKey)key))) {
if (LOG.isDebugEnabled()) {
LOG.debug("bloom filter reported that key exists");
}
return super.get(key, val);
}
if (LOG.isDebugEnabled()) {
LOG.debug("bloom filter reported that key does not exist");
}
return null;
}
/** {@inheritDoc} */
@Override
public WritableComparable getClosest(WritableComparable key, Writable val)
throws IOException {
// Note - the key being passed to us is always a HStoreKey
if(bloomFilter.membershipTest(getBloomFilterKey((HStoreKey)key))) {
if (LOG.isDebugEnabled()) {
LOG.debug("bloom filter reported that key exists");
}
return super.getClosest(key, val);
}
if (LOG.isDebugEnabled()) {
LOG.debug("bloom filter reported that key does not exist");
}
return null;
}
}
/**
* Extends MapFile.Writer and overrides append, so that whenever a MapFile
* is written to, the key is added to the bloom filter.
*/
private class BloomFilterWriter extends MapFile.Writer {
@SuppressWarnings("unchecked")
BloomFilterWriter(Configuration conf, FileSystem fs, String dirName,
Class keyClass, Class valClass, SequenceFile.CompressionType compression)
throws IOException {
super(conf, fs, dirName, keyClass, valClass, compression);
}
/** {@inheritDoc} */
@Override
public void append(WritableComparable key, Writable val) throws IOException {
// Note - the key being passed to us is always a HStoreKey
bloomFilter.add(getBloomFilterKey((HStoreKey)key));
super.append(key, val);
}
}
/**
* Get a MapFile reader
* This allows us to substitute a BloomFilterReader if a bloom filter is enabled
*/
MapFile.Reader getMapFileReader(String dirName) throws IOException {
if(bloomFilter != null) {
return new BloomFilterReader(fs, dirName, conf);
}
return new MapFile.Reader(fs, dirName, conf);
}
/**
* Get a MapFile writer
* This allows us to substitute a BloomFilterWriter if a bloom filter is
* enabled
*
* @param dirName Directory with store files.
* @return Map file.
* @throws IOException
*/
MapFile.Writer getMapFileWriter(String dirName) throws IOException {
if (bloomFilter != null) {
return new BloomFilterWriter(conf, fs, dirName, HStoreKey.class,
ImmutableBytesWritable.class, compression);
}
return new MapFile.Writer(conf, fs, dirName, HStoreKey.class,
ImmutableBytesWritable.class, compression);
}
//////////////////////////////////////////////////////////////////////////////
// End bloom filters
//////////////////////////////////////////////////////////////////////////////
/**
* Turn off all the MapFile readers
*
* Close all the MapFile readers
* @throws IOException
*/
void close() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.info("closing HStore for " + this.storeName);
}
this.lock.obtainWriteLock();
try {
for (MapFile.Reader map: maps.values()) {
map.close();
for (MapFile.Reader reader: this.readers.values()) {
reader.close();
}
maps.clear();
mapFiles.clear();
LOG.info("HStore closed for " + this.storeName);
this.readers.clear();
this.storefiles.clear();
LOG.info("closed " + this.storeName);
} finally {
this.lock.releaseWriteLock();
}
@ -540,17 +425,19 @@ class HStore implements HConstants {
// A. Write the TreeMap out to the disk
HStoreFile flushedFile = HStoreFile.obtainNewHStoreFile(conf, dir,
regionName, familyName, fs);
Path mapfile = flushedFile.getMapFilePath();
if(LOG.isDebugEnabled()) {
LOG.debug("Flushing to " + mapfile.toString());
}
MapFile.Writer out = getMapFileWriter(mapfile.toString());
String name = flushedFile.toString();
MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression,
this.bloomFilter);
int count = 0;
int total = 0;
try {
for (Map.Entry<HStoreKey, byte []> es: inputCache.entrySet()) {
HStoreKey curkey = es.getKey();
total++;
if (this.familyName.
equals(HStoreKey.extractFamily(curkey.getColumn()))) {
out.append(curkey, new ImmutableBytesWritable(es.getValue()));
count++;
}
}
} finally {
@ -571,13 +458,14 @@ class HStore implements HConstants {
this.lock.obtainWriteLock();
try {
Long flushid = Long.valueOf(logCacheFlushId);
maps.put(flushid, getMapFileReader(mapfile.toString()));
mapFiles.put(flushid, flushedFile);
// Open the map file reader.
this.readers.put(flushid,
flushedFile.getReader(this.fs, this.bloomFilter));
this.storefiles.put(flushid, flushedFile);
if(LOG.isDebugEnabled()) {
LOG.debug("Added " + mapfile.toString() +
" with flush id " + logCacheFlushId + " and size " +
StringUtils.humanReadableInt(mapfile.getFileSystem(this.conf).
getContentLength(mapfile)));
LOG.debug("Added " + name +
" with sequence id " + logCacheFlushId + " and size " +
StringUtils.humanReadableInt(flushedFile.length()));
}
} finally {
this.lock.releaseWriteLock();
@ -593,8 +481,7 @@ class HStore implements HConstants {
Vector<HStoreFile> getAllMapFiles() {
this.lock.obtainReadLock();
try {
return new Vector<HStoreFile>(mapFiles.values());
return new Vector<HStoreFile>(storefiles.values());
} finally {
this.lock.releaseReadLock();
}
@ -632,7 +519,7 @@ class HStore implements HConstants {
HStoreFile.getHStoreDir(compactdir, regionName, familyName);
fs.mkdirs(curCompactStore);
if(LOG.isDebugEnabled()) {
LOG.debug("started compaction of " + mapFiles.size() + " files in " +
LOG.debug("started compaction of " + storefiles.size() + " files in " +
curCompactStore.toString());
}
try {
@ -640,7 +527,7 @@ class HStore implements HConstants {
Vector<HStoreFile> toCompactFiles = null;
this.lock.obtainWriteLock();
try {
toCompactFiles = new Vector<HStoreFile>(mapFiles.values());
toCompactFiles = new Vector<HStoreFile>(storefiles.values());
} finally {
this.lock.releaseWriteLock();
}
@ -660,6 +547,7 @@ class HStore implements HConstants {
HStoreFile compactedOutputFile
= new HStoreFile(conf, compactdir, regionName, familyName, -1);
if(toCompactFiles.size() == 1) {
// TODO: Only rewrite if NOT a HSF reference file.
if(LOG.isDebugEnabled()) {
LOG.debug("nothing to compact for " + this.storeName);
}
@ -671,7 +559,8 @@ class HStore implements HConstants {
// Step through them, writing to the brand-new TreeMap
MapFile.Writer compactedOut =
getMapFileWriter(compactedOutputFile.getMapFilePath().toString());
compactedOutputFile.getWriter(this.fs, this.compression,
this.bloomFilter);
try {
// We create a new set of MapFile.Reader objects so we don't screw up
// the caching associated with the currently-loaded ones.
@ -684,14 +573,14 @@ class HStore implements HConstants {
// lowest-ranked one. Updates to a single row/column will appear
// ranked by timestamp. This allows us to throw out deleted values or
// obsolete versions.
MapFile.Reader[] readers = new MapFile.Reader[toCompactFiles.size()];
MapFile.Reader[] rdrs = new MapFile.Reader[toCompactFiles.size()];
HStoreKey[] keys = new HStoreKey[toCompactFiles.size()];
ImmutableBytesWritable[] vals =
new ImmutableBytesWritable[toCompactFiles.size()];
boolean[] done = new boolean[toCompactFiles.size()];
int pos = 0;
for(HStoreFile hsf: toCompactFiles) {
readers[pos] = getMapFileReader(hsf.getMapFilePath().toString());
rdrs[pos] = hsf.getReader(this.fs, this.bloomFilter);
keys[pos] = new HStoreKey();
vals[pos] = new ImmutableBytesWritable();
done[pos] = false;
@ -701,9 +590,9 @@ class HStore implements HConstants {
// Now, advance through the readers in order. This will have the
// effect of a run-time sort of the entire dataset.
int numDone = 0;
for(int i = 0; i < readers.length; i++) {
readers[i].reset();
done[i] = ! readers[i].next(keys[i], vals[i]);
for(int i = 0; i < rdrs.length; i++) {
rdrs[i].reset();
done[i] = ! rdrs[i].next(keys[i], vals[i]);
if(done[i]) {
numDone++;
}
@ -715,7 +604,7 @@ class HStore implements HConstants {
while(numDone < done.length) {
// Find the reader with the smallest key
int smallestKey = -1;
for(int i = 0; i < readers.length; i++) {
for(int i = 0; i < rdrs.length; i++) {
if(done[i]) {
continue;
}
@ -760,10 +649,10 @@ class HStore implements HConstants {
// Advance the smallest key. If that reader's all finished, then
// mark it as done.
if(! readers[smallestKey].next(keys[smallestKey],
if(! rdrs[smallestKey].next(keys[smallestKey],
vals[smallestKey])) {
done[smallestKey] = true;
readers[smallestKey].close();
rdrs[smallestKey].close();
numDone++;
}
}
@ -772,8 +661,7 @@ class HStore implements HConstants {
}
if(LOG.isDebugEnabled()) {
LOG.debug("writing new compacted HStore to " +
compactedOutputFile.getMapFilePath().toString());
LOG.debug("writing new compacted HStore " + compactedOutputFile);
}
// Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
@ -818,7 +706,6 @@ class HStore implements HConstants {
* invoked at HStore startup, if the prior execution died midway through.
*/
void processReadyCompaction() throws IOException {
// Move the compacted TreeMap into place.
// That means:
// 1) Acquiring the write-lock
@ -830,29 +717,20 @@ class HStore implements HConstants {
// 7) Releasing the write-lock
// 1. Acquiring the write-lock
Path curCompactStore =
HStoreFile.getHStoreDir(compactdir, regionName, familyName);
this.lock.obtainWriteLock();
try {
Path doneFile = new Path(curCompactStore, COMPACTION_DONE);
if(!fs.exists(doneFile)) {
// The last execution didn't finish the compaction, so there's nothing
// we can do. We'll just have to redo it. Abandon it and return.
LOG.warn("Redoing a failed compaction");
return;
}
// OK, there's actually compaction work that needs to be put into place.
if(LOG.isDebugEnabled()) {
LOG.debug("Process ready compaction starting");
}
// 2. Load in the files to be deleted.
// (Figuring out what MapFiles are going to be replaced)
Vector<HStoreFile> toCompactFiles = new Vector<HStoreFile>();
Path filesToReplace = new Path(curCompactStore, COMPACTION_TO_REPLACE);
DataInputStream in = new DataInputStream(fs.open(filesToReplace));
@ -868,41 +746,29 @@ class HStore implements HConstants {
in.close();
}
if(LOG.isDebugEnabled()) {
LOG.debug("loaded " + toCompactFiles.size() +
" file(s) to be deleted");
}
// 3. Unload all the replaced MapFiles.
Iterator<HStoreFile> it2 = mapFiles.values().iterator();
for(Iterator<MapFile.Reader> it = maps.values().iterator();
it.hasNext(); ) {
MapFile.Reader curReader = it.next();
HStoreFile curMapFile = it2.next();
if(toCompactFiles.contains(curMapFile)) {
curReader.close();
it.remove();
}
}
for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext(); ) {
HStoreFile curMapFile = it.next();
if(toCompactFiles.contains(curMapFile)) {
it.remove();
}
}
// 3. Unload all the replaced MapFiles. Do it by getting keys of all
// to remove. Then cycling on keys, removing, closing and deleting.
// What if we crash at this point? No big deal; we will restart
// processReadyCompaction(), and nothing has been lost.
// 4. Delete all the old files, no longer needed
for(HStoreFile hsf: toCompactFiles) {
fs.delete(hsf.getMapFilePath());
fs.delete(hsf.getInfoFilePath());
Vector<Long> keys = new Vector<Long>(toCompactFiles.size());
for(Map.Entry<Long, HStoreFile> e: storefiles.entrySet()) {
if(toCompactFiles.contains(e.getValue())) {
keys.add(e.getKey());
}
}
for (Long key: keys) {
MapFile.Reader reader = this.readers.remove(key);
if (reader != null) {
reader.close();
}
HStoreFile hsf = this.storefiles.remove(key);
// 4. Delete all old files, no longer needed
hsf.delete();
}
if(LOG.isDebugEnabled()) {
LOG.debug("old file(s) deleted");
LOG.debug("deleted " + toCompactFiles.size() + " old file(s)");
}
// What if we fail now? The above deletes will fail silently. We'd better
@ -915,24 +781,19 @@ class HStore implements HConstants {
HStoreFile finalCompactedFile
= HStoreFile.obtainNewHStoreFile(conf, dir, regionName, familyName, fs);
if(LOG.isDebugEnabled()) {
LOG.debug("moving " + compactedFile.getMapFilePath().toString() +
" to " + finalCompactedFile.getMapFilePath().toString());
LOG.debug("moving " + compactedFile.toString() + " in " +
compactdir.toString() +
" to " + finalCompactedFile.toString() + " in " + dir.toString());
}
fs.rename(compactedFile.getMapFilePath(),
finalCompactedFile.getMapFilePath());
// Fail here? No problem.
fs.rename(compactedFile.getInfoFilePath(),
finalCompactedFile.getInfoFilePath());
compactedFile.rename(this.fs, finalCompactedFile);
// Fail here? No worries.
Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
// 6. Loading the new TreeMap.
mapFiles.put(orderVal, finalCompactedFile);
maps.put(orderVal, getMapFileReader(
finalCompactedFile.getMapFilePath().toString()));
this.readers.put(orderVal,
finalCompactedFile.getReader(this.fs, this.bloomFilter));
this.storefiles.put(orderVal, finalCompactedFile);
} finally {
// 7. Releasing the write-lock
@ -955,8 +816,7 @@ class HStore implements HConstants {
throws IOException {
this.lock.obtainReadLock();
try {
MapFile.Reader[] maparray
= maps.values().toArray(new MapFile.Reader[maps.size()]);
MapFile.Reader[] maparray = getReaders();
for (int i = maparray.length - 1; i >= 0; i--) {
MapFile.Reader map = maparray[i];
synchronized(map) {
@ -984,6 +844,11 @@ class HStore implements HConstants {
this.lock.releaseReadLock();
}
}
private MapFile.Reader [] getReaders() {
return this.readers.values().
toArray(new MapFile.Reader[this.readers.size()]);
}
/**
* Get the value for the indicated HStoreKey. Grab the target value and the
@ -999,10 +864,8 @@ class HStore implements HConstants {
List<byte []> results = new ArrayList<byte []>();
this.lock.obtainReadLock();
try {
MapFile.Reader[] maparray
= maps.values().toArray(new MapFile.Reader[maps.size()]);
for(int i = maparray.length-1; i >= 0; i--) {
MapFile.Reader[] maparray = getReaders();
for(int i = maparray.length - 1; i >= 0; i--) {
MapFile.Reader map = maparray[i];
synchronized(map) {
@ -1044,40 +907,83 @@ class HStore implements HConstants {
}
}
/**
* Gets the size of the largest MapFile and its mid key.
*
* @param midKey - the middle key for the largest MapFile
* @return - size of the largest MapFile
/*
* Data structure to hold result of a look at store file sizes.
*/
long getLargestFileSize(Text midKey) {
class HStoreSize {
final long aggregate;
final long largest;
boolean splitable;
HStoreSize(final long a, final long l, final boolean s) {
this.aggregate = a;
this.largest = l;
this.splitable = s;
}
long getAggregate() {
return this.aggregate;
}
long getLargest() {
return this.largest;
}
boolean isSplitable() {
return this.splitable;
}
void setSplitable(final boolean s) {
this.splitable = s;
}
}
/**
* Gets size for the store.
*
* @param midKey Gets set to the middle key of the largest splitable store
* file or its set to empty if largest is not splitable.
* @return Sizes for the store and the passed <code>midKey</code> is
* set to midKey of largest splitable. Otherwise, its set to empty
* to indicate we couldn't find a midkey to split on
*/
HStoreSize size(Text midKey) {
long maxSize = 0L;
if (this.mapFiles.size() <= 0) {
return maxSize;
long aggregateSize = 0L;
// Not splitable if we find a reference store file present in the store.
boolean splitable = true;
if (this.storefiles.size() <= 0) {
return new HStoreSize(0, 0, splitable);
}
this.lock.obtainReadLock();
try {
Long mapIndex = Long.valueOf(0L);
// Iterate through all the MapFiles
for(Map.Entry<Long, HStoreFile> e: mapFiles.entrySet()) {
for(Map.Entry<Long, HStoreFile> e: storefiles.entrySet()) {
HStoreFile curHSF = e.getValue();
long size = fs.getFileStatus(
new Path(curHSF.getMapFilePath(), MapFile.DATA_FILE_NAME)).getLen();
if(size > maxSize) { // This is the largest one so far
long size = curHSF.length();
aggregateSize += size;
if (maxSize == 0L || size > maxSize) {
// This is the largest one so far
maxSize = size;
mapIndex = e.getKey();
}
if (splitable) {
splitable = !curHSF.isReference();
}
}
MapFile.Reader r = this.readers.get(mapIndex);
WritableComparable midkey = r.midKey();
if (midkey != null) {
midKey.set(((HStoreKey)midkey).getRow());
}
MapFile.Reader r = maps.get(mapIndex);
midKey.set(((HStoreKey)r.midKey()).getRow());
} catch(IOException e) {
LOG.warn(e);
LOG.warn("", e);
} finally {
this.lock.releaseReadLock();
}
return maxSize;
return new HStoreSize(aggregateSize, maxSize, splitable);
}
/**
@ -1086,7 +992,7 @@ class HStore implements HConstants {
int getNMaps() {
this.lock.obtainReadLock();
try {
return maps.size();
return storefiles.size();
} finally {
this.lock.releaseReadLock();
@ -1127,20 +1033,17 @@ class HStore implements HConstants {
private MapFile.Reader[] readers;
HStoreScanner(long timestamp, Text[] targetCols, Text firstRow)
throws IOException {
throws IOException {
super(timestamp, targetCols);
lock.obtainReadLock();
try {
this.readers = new MapFile.Reader[mapFiles.size()];
this.readers = new MapFile.Reader[storefiles.size()];
// Most recent map file should be first
int i = readers.length - 1;
for(Iterator<HStoreFile> it = mapFiles.values().iterator(); it.hasNext(); ) {
HStoreFile curHSF = it.next();
readers[i--] = getMapFileReader(curHSF.getMapFilePath().toString());
for(HStoreFile curHSF: storefiles.values()) {
readers[i--] = curHSF.getReader(fs, bloomFilter);
}
this.keys = new HStoreKey[readers.length];
@ -1164,7 +1067,7 @@ class HStore implements HConstants {
}
} catch (Exception ex) {
LOG.error(ex);
LOG.error("Failed construction", ex);
close();
}
}
@ -1218,9 +1121,8 @@ class HStore implements HConstants {
if(readers[i] != null) {
try {
readers[i].close();
} catch(IOException e) {
LOG.error(e);
LOG.error("Sub-scanner close", e);
}
}
@ -1240,9 +1142,8 @@ class HStore implements HConstants {
if(readers[i] != null) {
try {
readers[i].close();
} catch(IOException e) {
LOG.error(e);
LOG.error("Scanner close", e);
}
}
}

File diff suppressed because it is too large Load Diff

View File

@ -131,6 +131,11 @@ public class HTable implements HConstants {
throw new IllegalStateException("update in progress");
}
}
public Text getTableName() {
return this.tableName;
}
/**
* Gets the starting row key for every region in the currently open table

View File

@ -19,10 +19,18 @@
*/
package org.apache.hadoop.hbase.util;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.StringTokenizer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HStoreKey;
/**
* Utility creating hbase friendly keys.
* Use fabricating row names or column qualifiers.
@ -111,4 +119,61 @@ public class Keying {
}
return sb.toString();
}
/**
* @param i
* @return <code>i</code> as byte array.
*/
public static byte[] intToBytes(final int i){
ByteBuffer buffer = ByteBuffer.allocate(Integer.SIZE);
buffer.putInt(i);
return buffer.array();
}
/**
* @param l
* @return <code>i</code> as byte array.
*/
public static byte[] longToBytes(final long l){
ByteBuffer buffer = ByteBuffer.allocate(Long.SIZE);
buffer.putLong(l);
return buffer.array();
}
/**
* Returns row and column bytes out of an HStoreKey.
* @param hsk Store key.
* @throws UnsupportedEncodingException
*/
public static byte[] getBytes(final HStoreKey hsk)
throws UnsupportedEncodingException {
StringBuilder s = new StringBuilder(hsk.getRow().toString());
s.append(hsk.getColumn().toString());
return s.toString().getBytes(HConstants.UTF8_ENCODING);
}
/**
* @param bytes
* @return String made of the bytes or null if bytes are null.
* @throws UnsupportedEncodingException
*/
public static String bytesToString(final byte [] bytes)
throws UnsupportedEncodingException {
if(bytes == null) {
return null;
}
return new String(bytes, HConstants.UTF8_ENCODING);
}
public static long bytesToLong(final byte [] bytes) throws IOException {
long result = -1;
DataInputStream dis = null;
try {
dis = new DataInputStream(new ByteArrayInputStream(bytes));
result = dis.readLong();
} finally {
dis.close();
}
return result;
}
}

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.Writable;
public class Writables {
/**
* @param w
* @return The bytes of <code>w</code> gotten by running its
* {@link Writable#write(java.io.DataOutput)} method.
* @throws IOException
* @see #getWritable(byte[], Writable)
*/
public static byte [] getBytes(final Writable w) throws IOException {
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(byteStream);
try {
w.write(out);
out.close();
out = null;
return byteStream.toByteArray();
} finally {
if (out != null) {
out.close();
}
}
}
/**
* Set bytes into the passed Writable by calling its
* {@link Writable#readFields(java.io.DataInput)}.
* @param bytes
* @param w An empty Writable (usually made by calling the null-arg
* constructor).
* @return The passed Writable after its readFields has been called fed
* by the passed <code>bytes</code> array or null if passed null or
* empty <code>bytes</code>.
* @throws IOException
*/
public static Writable getWritable(final byte [] bytes, final Writable w)
throws IOException {
if (bytes == null || bytes.length == 0) {
throw new IOException("Con't build a writable with empty bytes array");
}
DataInputBuffer in = new DataInputBuffer();
try {
in.reset(bytes, bytes.length);
w.readFields(in);
return w;
} finally {
in.close();
}
}
/**
* Copy one Writable to another. Copies bytes using data streams.
* @param src Source Writable
* @param tgt Target Writable
* @return The target Writable.
* @throws IOException
*/
public static Writable copyWritable(final Writable src, final Writable tgt)
throws IOException {
byte [] bytes = getBytes(src);
DataInputStream dis = null;
try {
dis = new DataInputStream(new ByteArrayInputStream(bytes));
tgt.readFields(dis);
} finally {
dis.close();
}
return tgt;
}
}

View File

@ -56,7 +56,7 @@ public abstract class HBaseTestCase extends TestCase {
HTableDescriptor desc, long regionId, Text startKey, Text endKey)
throws IOException {
HRegionInfo info = new HRegionInfo(regionId, desc, startKey, endKey);
Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName);
Path regionDir = HRegion.getRegionDir(dir, info.regionName);
FileSystem fs = dir.getFileSystem(c);
fs.mkdirs(regionDir);
return new HRegion(dir,

View File

@ -33,18 +33,24 @@ import org.apache.log4j.Logger;
/**
* This class creates a single process HBase cluster for junit testing.
* One thread is created for each server.
*
* <p>TestCases do not need to subclass to start a HBaseCluster. Call
* {@link #startMaster(Configuration)} and
* {@link #startRegionServers(Configuration, int)} to startup master and
* region servers. Save off the returned values and pass them to
* {@link #shutdown(org.apache.hadoop.hbase.MiniHBaseCluster.MasterThread, List)}
* to shut it all down when done.
*
*/
public class MiniHBaseCluster implements HConstants {
private static final Logger LOG =
static final Logger LOG =
Logger.getLogger(MiniHBaseCluster.class.getName());
private Configuration conf;
private MiniDFSCluster cluster;
private FileSystem fs;
private Path parentdir;
private HMaster master = null;
private Thread masterThread = null;
List<HRegionServer> regionServers;
List<Thread> regionThreads;
private MasterThread masterThread = null;
ArrayList<RegionServerThread> regionThreads;
private boolean deleteOnExit = true;
/**
@ -87,8 +93,6 @@ public class MiniHBaseCluster implements HConstants {
this.conf = conf;
this.cluster = dfsCluster;
this.regionServers = new ArrayList<HRegionServer>(nRegionNodes);
this.regionThreads = new ArrayList<Thread>(nRegionNodes);
init(nRegionNodes);
}
@ -108,13 +112,9 @@ public class MiniHBaseCluster implements HConstants {
throws IOException {
this.conf = conf;
this.deleteOnExit = deleteOnExit;
this.regionServers = new ArrayList<HRegionServer>(nRegionNodes);
this.regionThreads = new ArrayList<Thread>(nRegionNodes);
if (miniHdfsFilesystem) {
try {
this.cluster = new MiniDFSCluster(this.conf, 2, format, (String[])null);
} catch(Throwable t) {
LOG.error("Failed setup of mini dfs cluster", t);
t.printStackTrace();
@ -124,45 +124,123 @@ public class MiniHBaseCluster implements HConstants {
init(nRegionNodes);
}
private void init(int nRegionNodes) throws IOException {
private void init(final int nRegionNodes)
throws IOException {
try {
try {
this.fs = FileSystem.get(conf);
this.parentdir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
fs.mkdirs(parentdir);
} catch(IOException e) {
LOG.error("Failed setup of FileSystem", e);
throw e;
}
if(this.conf.get(MASTER_ADDRESS) == null) {
this.conf.set(MASTER_ADDRESS, "localhost:0");
}
// Create the master
this.master = new HMaster(conf);
this.masterThread = new Thread(this.master, "HMaster");
// Start up the master
LOG.info("Starting HMaster");
masterThread.start();
// Set the master's port for the HRegionServers
String address = master.getMasterAddress().toString();
this.conf.set(MASTER_ADDRESS, address);
// Start the HRegionServers. Always have regionservers come up on
// port '0' so there won't be clashes over default port as unit tests
// start/stop ports at different times during the life of the test.
this.conf.set(REGIONSERVER_ADDRESS, DEFAULT_HOST + ":0");
LOG.info("Starting HRegionServers");
startRegionServers(nRegionNodes);
this.masterThread = startMaster(this.conf);
this.regionThreads = startRegionServers(this.conf, nRegionNodes);
} catch(IOException e) {
shutdown();
throw e;
}
}
public static class MasterThread extends Thread {
private final HMaster master;
MasterThread(final HMaster m) {
super(m, "Master:" + m.getMasterAddress().toString());
this.master = m;
}
@Override
public void run() {
LOG.info("Starting " + getName());
super.run();
}
public HMaster getMaster() {
return this.master;
}
}
public static class RegionServerThread extends Thread {
private final HRegionServer regionServer;
RegionServerThread(final HRegionServer r, final int index) {
super(r, "RegionServer:" + index);
this.regionServer = r;
}
@Override
public void run() {
LOG.info("Starting " + getName());
super.run();
}
public HRegionServer getRegionServer() {
return this.regionServer;
}
}
/**
* Use this method to start a master.
* If you want to start an hbase cluster
* without subclassing this test case, run this method and
* {@link #startRegionServers(Configuration, int)} to start servers.
* Call {@link #shutdown(org.apache.hadoop.hbase.MiniHBaseCluster.MasterThread, List)}
* to shut them down.
* @param c
* @return Thread running the master.
* @throws IOException
* @see #startRegionServers(Configuration, int)
* @see #shutdown(org.apache.hadoop.hbase.MiniHBaseCluster.MasterThread, List)
*/
public static MasterThread startMaster(final Configuration c)
throws IOException {
if(c.get(MASTER_ADDRESS) == null) {
c.set(MASTER_ADDRESS, "localhost:0");
}
// Create the master
final HMaster m = new HMaster(c);
MasterThread masterThread = new MasterThread(m);
// Start up the master
masterThread.start();
// Set the master's port for the HRegionServers
c.set(MASTER_ADDRESS, m.getMasterAddress().toString());
return masterThread;
}
/**
* @param c
* @param count
* @return List of region server threads started. Synchronize on the
* returned list when iterating to avoid ConcurrentModificationExceptions.
* @throws IOException
* @see #startMaster(Configuration)
*/
public static ArrayList<RegionServerThread> startRegionServers(
final Configuration c, final int count)
throws IOException {
// Start the HRegionServers. Always have regionservers come up on
// port '0' so there won't be clashes over default port as unit tests
// start/stop ports at different times during the life of the test.
c.set(REGIONSERVER_ADDRESS, DEFAULT_HOST + ":0");
LOG.info("Starting HRegionServers");
ArrayList<RegionServerThread> threads =
new ArrayList<RegionServerThread>();
for(int i = 0; i < count; i++) {
threads.add(startRegionServer(c, i));
}
return threads;
}
public void startRegionServer() throws IOException {
RegionServerThread t =
startRegionServer(this.conf, this.regionThreads.size());
this.regionThreads.add(t);
}
private static RegionServerThread startRegionServer(final Configuration c,
final int index)
throws IOException {
final HRegionServer hsr = new HRegionServer(c);
RegionServerThread t = new RegionServerThread(hsr, index);
t.start();
return t;
}
/**
* Get the cluster on which this HBase cluster is running
@ -173,27 +251,12 @@ public class MiniHBaseCluster implements HConstants {
return cluster;
}
private void startRegionServers(final int nRegionNodes)
throws IOException {
for(int i = 0; i < nRegionNodes; i++) {
startRegionServer();
}
}
void startRegionServer() throws IOException {
HRegionServer hsr = new HRegionServer(this.conf);
this.regionServers.add(hsr);
Thread t = new Thread(hsr, "HRegionServer-" + this.regionServers.size());
t.start();
this.regionThreads.add(t);
}
/**
* @return Returns the rpc address actually used by the master server, because
* the supplied port is not necessarily the actual port used.
*/
public HServerAddress getHMasterAddress() {
return master.getMasterAddress();
return this.masterThread.getMaster().getMasterAddress();
}
/**
@ -202,7 +265,9 @@ public class MiniHBaseCluster implements HConstants {
* @param serverNumber
*/
public void abortRegionServer(int serverNumber) {
HRegionServer server = this.regionServers.remove(serverNumber);
HRegionServer server =
this.regionThreads.get(serverNumber).getRegionServer();
LOG.info("Aborting " + server.serverInfo.toString());
server.abort();
}
@ -211,53 +276,77 @@ public class MiniHBaseCluster implements HConstants {
*
* @param serverNumber
*/
public void stopRegionServer(int serverNumber) {
HRegionServer server = this.regionServers.remove(serverNumber);
public HRegionServer stopRegionServer(int serverNumber) {
HRegionServer server =
this.regionThreads.get(serverNumber).getRegionServer();
LOG.info("Stopping " + server.toString());
server.stop();
return server;
}
/**
* Wait for the specified region server to stop
*
* Removes this thread from list of running threads.
* @param serverNumber
*/
public void waitOnRegionServer(int serverNumber) {
Thread regionServerThread = this.regionThreads.remove(serverNumber);
RegionServerThread regionServerThread =
this.regionThreads.remove(serverNumber);
try {
LOG.info("Waiting on " +
regionServerThread.getRegionServer().serverInfo.toString());
regionServerThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/** Shut down the HBase cluster */
public void shutdown() {
LOG.info("Shutting down the HBase Cluster");
for(HRegionServer hsr: this.regionServers) {
hsr.stop();
/**
* Shut down HBase cluster started by calling
* {@link #startMaster(Configuration)} and then
* {@link #startRegionServers(Configuration, int)};
* @param masterThread
* @param regionServerThreads
*/
public static void shutdown(final MasterThread masterThread,
final List<RegionServerThread> regionServerThreads) {
LOG.info("Shutting down HBase Cluster");
/** This is not needed. Remove.
for(RegionServerThread hsr: regionServerThreads) {
hsr.getRegionServer().stop();
}
if(master != null) {
master.shutdown();
*/
if(masterThread != null) {
masterThread.getMaster().shutdown();
}
for(Thread t: this.regionThreads) {
if (t.isAlive()) {
try {
t.join();
} catch (InterruptedException e) {
// continue
synchronized(regionServerThreads) {
if (regionServerThreads != null) {
for(Thread t: regionServerThreads) {
if (t.isAlive()) {
try {
t.join();
} catch (InterruptedException e) {
// continue
}
}
}
}
}
if (masterThread != null) {
try {
masterThread.join();
} catch(InterruptedException e) {
// continue
}
}
LOG.info("HBase Cluster shutdown complete");
LOG.info("Shutdown " +
((masterThread != null)? masterThread.getName(): "0 masters") + " " +
((regionServerThreads == null)? 0: regionServerThreads.size()) +
" region server(s)");
}
void shutdown() {
shutdown(this.masterThread, this.regionThreads);
// Close the file system. Will complain if files open so helps w/ leaks.
try {
this.cluster.getFileSystem().close();
@ -285,4 +374,4 @@ public class MiniHBaseCluster implements HConstants {
}
f.delete();
}
}
}

View File

@ -63,7 +63,7 @@ public class StaticTestEnvironment {
debugging = true;
Logger rootLogger = Logger.getRootLogger();
rootLogger.setLevel(Level.WARN);
// rootLogger.setLevel(Level.WARN);
Level logLevel = Level.INFO;
value = System.getenv("LOGGING_LEVEL");

View File

@ -35,14 +35,10 @@ public class TestBatchUpdate extends HBaseClusterTestCase {
private HTableDescriptor desc = null;
private HTable table = null;
/** constructor */
public TestBatchUpdate() {
try {
value = "abcd".getBytes(HConstants.UTF8_ENCODING);
} catch (UnsupportedEncodingException e) {
fail();
}
/** constructor
* @throws UnsupportedEncodingException */
public TestBatchUpdate() throws UnsupportedEncodingException {
value = "abcd".getBytes(HConstants.UTF8_ENCODING);
}
/**

View File

@ -88,7 +88,7 @@ public class TestGet extends HBaseTestCase {
desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY.toString()));
HRegionInfo info = new HRegionInfo(0L, desc, null, null);
Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName);
Path regionDir = HRegion.getRegionDir(dir, info.regionName);
fs.mkdirs(regionDir);
HLog log = new HLog(fs, new Path(regionDir, "log"), conf);

View File

@ -40,7 +40,8 @@ import org.apache.log4j.Logger;
* HRegions or in the HBaseMaster, so only basic testing is possible.
*/
public class TestHRegion extends HBaseTestCase implements RegionUnavailableListener {
Logger LOG = Logger.getLogger(this.getClass().getName());
private static final Logger LOG =
Logger.getLogger(TestHRegion.class.getName());
/** Constructor */
public TestHRegion() {
@ -51,8 +52,9 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
* Since all the "tests" depend on the results of the previous test, they are
* not Junit tests that can stand alone. Consequently we have a single Junit
* test that runs the "sub-tests" as private methods.
* @throws IOException
*/
public void testHRegion() {
public void testHRegion() throws IOException {
try {
setup();
locks();
@ -63,13 +65,10 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
splitAndMerge();
read();
cleanup();
} catch(Exception e) {
} finally {
if(cluster != null) {
cluster.shutdown();
}
e.printStackTrace();
fail();
}
}
@ -674,7 +673,7 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
anchorFetched++;
} else {
System.out.println(col);
System.out.println("UNEXPECTED COLUMN " + col);
}
}
curVals.clear();

View File

@ -0,0 +1,382 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class TestHStoreFile extends TestCase {
static final Log LOG = LogFactory.getLog(TestHStoreFile.class);
private static String DIR = System.getProperty("test.build.data", ".");
private static final char FIRST_CHAR = 'a';
private static final char LAST_CHAR = 'z';
private FileSystem fs;
private Configuration conf;
private Path dir = null;
@Override
protected void setUp() throws Exception {
super.setUp();
this.conf = new HBaseConfiguration();
this.fs = FileSystem.getLocal(this.conf);
this.dir = new Path(DIR, getName());
}
@Override
protected void tearDown() throws Exception {
if (this.fs.exists(this.dir)) {
this.fs.delete(this.dir);
}
super.tearDown();
}
private Path writeMapFile(final String name)
throws IOException {
Path path = new Path(DIR, name);
MapFile.Writer writer = new MapFile.Writer(this.conf, fs, path.toString(),
HStoreKey.class, ImmutableBytesWritable.class);
writeStoreFile(writer);
return path;
}
private Path writeSmallMapFile(final String name)
throws IOException {
Path path = new Path(DIR, name);
MapFile.Writer writer = new MapFile.Writer(this.conf, fs, path.toString(),
HStoreKey.class, ImmutableBytesWritable.class);
try {
for (char d = FIRST_CHAR; d <= LAST_CHAR; d++) {
byte[] b = new byte[] {(byte)d};
Text t = new Text(new String(b));
writer.append(new HStoreKey(t, t, System.currentTimeMillis()),
new ImmutableBytesWritable(t.getBytes()));
}
} finally {
writer.close();
}
return path;
}
/*
* Writes HStoreKey and ImmutableBytes data to passed writer and
* then closes it.
* @param writer
* @throws IOException
*/
private void writeStoreFile(final MapFile.Writer writer)
throws IOException {
try {
for (char d = FIRST_CHAR; d <= LAST_CHAR; d++) {
for (char e = FIRST_CHAR; e <= LAST_CHAR; e++) {
byte[] b = new byte[] { (byte) d, (byte) e };
Text t = new Text(new String(b));
writer.append(new HStoreKey(t, t, System.currentTimeMillis()),
new ImmutableBytesWritable(t.getBytes()));
}
}
} finally {
writer.close();
}
}
/**
* Test that our mechanism of writing store files in one region to reference
* store files in other regions works.
* @throws IOException
*/
public void testReference()
throws IOException {
// Make a store file and write data to it.
HStoreFile hsf = new HStoreFile(this.conf, this.dir, new Text(getName()),
new Text("colfamily"), 1234567890L);
MapFile.Writer writer =
hsf.getWriter(this.fs, SequenceFile.CompressionType.NONE, null);
writeStoreFile(writer);
MapFile.Reader reader = hsf.getReader(this.fs, null);
// Split on a row, not in middle of row. Midkey returned by reader
// may be in middle of row. Create new one with empty column and
// timestamp.
HStoreKey midkey = new HStoreKey(((HStoreKey)reader.midKey()).getRow());
HStoreKey hsk = new HStoreKey();
reader.finalKey(hsk);
Text finalKey = hsk.getRow();
// Make a reference for the bottom half of the just written file.
HStoreFile.Reference reference =
new HStoreFile.Reference(hsf.getRegionName(), hsf.getFileId(), midkey,
HStoreFile.Range.top);
HStoreFile refHsf = new HStoreFile(this.conf, new Path(DIR, getName()),
new Text(getName() + "_reference"), hsf.getColFamily(), 456,
reference);
// Assert that reference files are written and that we can write and
// read the info reference file at least.
refHsf.writeReferenceFiles(this.fs);
assertTrue(this.fs.exists(refHsf.getMapFilePath()));
assertTrue(this.fs.exists(refHsf.getInfoFilePath()));
HStoreFile.Reference otherReference =
HStoreFile.readSplitInfo(refHsf.getInfoFilePath(), this.fs);
assertEquals(reference.getRegionName().toString(),
otherReference.getRegionName().toString());
assertEquals(reference.getFileId(),
otherReference.getFileId());
assertEquals(reference.getMidkey().toString(),
otherReference.getMidkey().toString());
// Now confirm that I can read from the reference and that it only gets
// keys from top half of the file.
MapFile.Reader halfReader = refHsf.getReader(this.fs, null);
HStoreKey key = new HStoreKey();
ImmutableBytesWritable value = new ImmutableBytesWritable();
boolean first = true;
while(halfReader.next(key, value)) {
if (first) {
assertEquals(key.getRow().toString(), midkey.getRow().toString());
first = false;
}
}
assertEquals(key.getRow().toString(), finalKey.toString());
}
/**
* Write a file and then assert that we can read from top and bottom halves
* using two HalfMapFiles.
* @throws Exception
*/
public void testBasicHalfMapFile() throws Exception {
Path p = writeMapFile(getName());
WritableComparable midkey = getMidkey(p);
checkHalfMapFile(p, midkey);
}
/**
* Check HalfMapFile works even if file we're to go against is smaller than
* the default MapFile interval of 128: i.e. index gets entry every 128
* keys.
* @throws Exception
*/
public void testSmallHalfMapFile() throws Exception {
Path p = writeSmallMapFile(getName());
// I know keys are a-z. Let the midkey we want to use be 'd'. See if
// HalfMapFiles work even if size of file is < than default MapFile
// interval.
checkHalfMapFile(p, new HStoreKey(new Text("d")));
}
private WritableComparable getMidkey(final Path p) throws IOException {
MapFile.Reader reader =
new MapFile.Reader(this.fs, p.toString(), this.conf);
HStoreKey key = new HStoreKey();
ImmutableBytesWritable value = new ImmutableBytesWritable();
reader.next(key, value);
String firstKey = key.toString();
WritableComparable midkey = reader.midKey();
reader.finalKey(key);
LOG.info("First key " + firstKey + ", midkey " + midkey.toString()
+ ", last key " + key.toString());
reader.close();
return midkey;
}
private void checkHalfMapFile(final Path p, WritableComparable midkey)
throws IOException {
MapFile.Reader top = null;
MapFile.Reader bottom = null;
HStoreKey key = new HStoreKey();
ImmutableBytesWritable value = new ImmutableBytesWritable();
String previous = null;
try {
// Now make two HalfMapFiles and assert they can read the full backing
// file, one from the top and the other from the bottom.
// Test bottom half first.
bottom = new HStoreFile.HalfMapFileReader(this.fs, p.toString(),
this.conf, HStoreFile.Range.bottom, midkey);
boolean first = true;
while (bottom.next(key, value)) {
previous = key.toString();
if (first) {
first = false;
LOG.info("First in bottom: " + previous);
}
assertTrue(key.compareTo(midkey) < 0);
}
LOG.info("Last in bottom: " + previous.toString());
// Now test reading from the top.
top = new HStoreFile.HalfMapFileReader(this.fs, p.toString(), this.conf,
HStoreFile.Range.top, midkey);
first = true;
while (top.next(key, value)) {
assertTrue(key.compareTo(midkey) >= 0);
if (first) {
first = false;
assertEquals(((HStoreKey)midkey).getRow().toString(),
key.getRow().toString());
LOG.info("First in top: " + key.toString());
}
}
LOG.info("Last in top: " + key.toString());
top.getClosest(midkey, value);
// Assert value is same as key.
assertEquals(new String(value.get()),
((HStoreKey) midkey).getRow().toString());
// Next test using a midkey that does not exist in the file.
// First, do a key that is < than first key. Ensure splits behave
// properly.
midkey = new HStoreKey(new Text(" "));
bottom = new HStoreFile.HalfMapFileReader(this.fs, p.toString(),
this.conf, HStoreFile.Range.bottom, midkey);
// When midkey is < than the bottom, should return no values.
assertFalse(bottom.next(key, value));
// Now read from the top.
top = new HStoreFile.HalfMapFileReader(this.fs, p.toString(), this.conf,
HStoreFile.Range.top, midkey);
first = true;
while (top.next(key, value)) {
assertTrue(key.compareTo(midkey) >= 0);
if (first) {
first = false;
LOG.info("First top when key < bottom: " + key.toString());
String tmp = key.getRow().toString();
for (int i = 0; i < tmp.length(); i++) {
assertTrue(tmp.charAt(i) == 'a');
}
}
}
LOG.info("Last top when key < bottom: " + key.toString());
String tmp = key.getRow().toString();
for (int i = 0; i < tmp.length(); i++) {
assertTrue(tmp.charAt(i) == 'z');
}
// Test when midkey is > than last key in file ('||' > 'zz').
midkey = new HStoreKey(new Text("|||"));
bottom = new HStoreFile.HalfMapFileReader(this.fs, p.toString(),
this.conf, HStoreFile.Range.bottom, midkey);
first = true;
while (bottom.next(key, value)) {
if (first) {
first = false;
LOG.info("First bottom when key > top: " + key.toString());
tmp = key.getRow().toString();
for (int i = 0; i < tmp.length(); i++) {
assertTrue(tmp.charAt(i) == 'a');
}
}
}
LOG.info("Last bottom when key > top: " + key.toString());
tmp = key.getRow().toString();
for (int i = 0; i < tmp.length(); i++) {
assertTrue(tmp.charAt(i) == 'z');
}
// Now look at top. Should not return any values.
top = new HStoreFile.HalfMapFileReader(this.fs, p.toString(), this.conf,
HStoreFile.Range.top, midkey);
assertFalse(top.next(key, value));
} finally {
if (top != null) {
top.close();
}
if (bottom != null) {
bottom.close();
}
fs.delete(p);
}
}
/**
* Assert HalFMapFile does right thing when midkey does not exist in the
* backing file (its larger or smaller than any of the backing mapfiles keys).
*
* @throws Exception
*/
public void testOutOfRangeMidkeyHalfMapFile() throws Exception {
MapFile.Reader top = null;
MapFile.Reader bottom = null;
HStoreKey key = new HStoreKey();
ImmutableBytesWritable value = new ImmutableBytesWritable();
Path p = writeMapFile(getName());
try {
try {
// Test using a midkey that does not exist in the file.
// First, do a key that is < than first key. Ensure splits behave
// properly.
HStoreKey midkey = new HStoreKey(new Text(" "));
bottom = new HStoreFile.HalfMapFileReader(this.fs, p.toString(),
this.conf, HStoreFile.Range.bottom, midkey);
// When midkey is < than the bottom, should return no values.
assertFalse(bottom.next(key, value));
// Now read from the top.
top = new HStoreFile.HalfMapFileReader(this.fs, p.toString(),
this.conf, HStoreFile.Range.top, midkey);
boolean first = true;
while (top.next(key, value)) {
assertTrue(key.compareTo(midkey) >= 0);
if (first) {
first = false;
LOG.info("First top when key < bottom: " + key.toString());
assertEquals("aa", key.getRow().toString());
}
}
LOG.info("Last top when key < bottom: " + key.toString());
assertEquals("zz", key.getRow().toString());
// Test when midkey is > than last key in file ('||' > 'zz').
midkey = new HStoreKey(new Text("|||"));
bottom = new HStoreFile.HalfMapFileReader(this.fs, p.toString(),
this.conf, HStoreFile.Range.bottom, midkey);
first = true;
while (bottom.next(key, value)) {
if (first) {
first = false;
LOG.info("First bottom when key > top: " + key.toString());
assertEquals("aa", key.getRow().toString());
}
}
LOG.info("Last bottom when key > top: " + key.toString());
assertEquals("zz", key.getRow().toString());
// Now look at top. Should not return any values.
top = new HStoreFile.HalfMapFileReader(this.fs, p.toString(),
this.conf, HStoreFile.Range.top, midkey);
assertFalse(top.next(key, value));
} finally {
if (top != null) {
top.close();
}
if (bottom != null) {
bottom.close();
}
fs.delete(p);
}
} finally {
this.fs.delete(p);
}
}
}

View File

@ -77,10 +77,9 @@ public class TestRegionServerAbort extends HBaseClusterTestCase {
// Now shutdown the region server and wait for it to go down.
this.cluster.abortRegionServer(0);
this.cluster.waitOnRegionServer(0);
// Verify that the client can find the data after the region has been moved
// to a different server
HScannerInterface scanner =
table.obtainScanner(HConstants.COLUMN_FAMILY_ARRAY, new Text());

View File

@ -146,7 +146,7 @@ public class TestScanner extends HBaseTestCase {
Path dir = new Path("/hbase");
fs.mkdirs(dir);
Path regionDir = HStoreFile.getHRegionDir(dir, REGION_INFO.regionName);
Path regionDir = HRegion.getRegionDir(dir, REGION_INFO.regionName);
fs.mkdirs(regionDir);
HLog log = new HLog(fs, new Path(regionDir, "log"), conf);

View File

@ -0,0 +1,442 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Text;
/**
* {@Link TestHRegion} does a split but this TestCase adds testing of fast
* split and manufactures odd-ball split scenarios.
*/
public class TestSplit extends HBaseTestCase {
static final Log LOG = LogFactory.getLog(TestSplit.class);
private final static String COLFAMILY_NAME1 = "colfamily1:";
private final static String COLFAMILY_NAME2 = "colfamily2:";
private final static String COLFAMILY_NAME3 = "colfamily3:";
private Path testDir = null;
private FileSystem fs = null;
private static final char FIRST_CHAR = 'a';
private static final char LAST_CHAR = 'z';
@Override
public void setUp() throws Exception {
super.setUp();
this.testDir = getUnitTestdir(getName());
this.fs = FileSystem.getLocal(this.conf);
if (fs.exists(testDir)) {
fs.delete(testDir);
}
// This size should make it so we always split using the addContent
// below. After adding all data, the first region is 1.3M
conf.setLong("hbase.hregion.max.filesize", 1024 * 128);
}
@Override
public void tearDown() throws Exception {
try {
if (this.fs.exists(testDir)) {
this.fs.delete(testDir);
}
} catch (Exception e) {
e.printStackTrace();
}
super.tearDown();
}
/**
* Splits twice and verifies getting from each of the split regions.
* @throws Exception
*/
public void testBasicSplit() throws Exception {
HRegion region = null;
HLog hlog = new HLog(this.fs, this.testDir, this.conf);
try {
HTableDescriptor htd = createTableDescriptor(getName());
HRegionInfo hri = new HRegionInfo(1, htd, null, null);
region = new HRegion(testDir, hlog, fs, this.conf, hri, null);
basicSplit(region);
} finally {
if (region != null) {
region.close();
}
hlog.closeAndDelete();
}
}
private HTableDescriptor createTableDescriptor(final String name) {
HTableDescriptor htd = new HTableDescriptor(name);
htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME1));
htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME2));
htd.addFamily(new HColumnDescriptor(COLFAMILY_NAME3));
return htd;
}
private void basicSplit(final HRegion region) throws Exception {
addContent(region, COLFAMILY_NAME3);
region.internalFlushcache();
Text midkey = new Text();
assertTrue(region.needsSplit(midkey));
HRegion [] regions = split(region);
// Assert can get rows out of new regions. Should be able to get first
// row from first region and the midkey from second region.
byte [] b = new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
assertGet(regions[0], COLFAMILY_NAME3, new Text(b));
assertGet(regions[1], COLFAMILY_NAME3, midkey);
// Test I can get scanner and that it starts at right place.
assertScan(regions[0], COLFAMILY_NAME3, new Text(b));
assertScan(regions[1], COLFAMILY_NAME3, midkey);
// Now prove can't split regions that have references.
Text [] midkeys = new Text[regions.length];
for (int i = 0; i < regions.length; i++) {
midkeys[i] = new Text();
// Even after above splits, still needs split but after splits its
// unsplitable because biggest store file is reference. References
// make the store unsplittable, until something bigger comes along.
assertFalse(regions[i].needsSplit(midkeys[i]));
// Add so much data to this region, we create a store file that is > than
// one of our unsplitable references.
// it will.
for (int j = 0; j < 2; j++) {
addContent(regions[i], COLFAMILY_NAME3);
}
addContent(regions[i], COLFAMILY_NAME2);
addContent(regions[i], COLFAMILY_NAME1);
regions[i].internalFlushcache();
}
// Assert that even if one store file is larger than a reference, the
// region is still deemed unsplitable (Can't split region if references
// presen).
for (int i = 0; i < regions.length; i++) {
midkeys[i] = new Text();
// Even after above splits, still needs split but after splits its
// unsplitable because biggest store file is reference. References
// make the store unsplittable, until something bigger comes along.
assertFalse(regions[i].needsSplit(midkeys[i]));
}
// To make regions splitable force compaction.
for (int i = 0; i < regions.length; i++) {
assertTrue(regions[i].compactStores());
}
TreeMap<String, HRegion> sortedMap = new TreeMap<String, HRegion>();
// Split these two daughter regions so then I'll have 4 regions. Will
// split because added data above.
for (int i = 0; i < regions.length; i++) {
HRegion [] rs = split(regions[i]);
for (int j = 0; j < rs.length; j++) {
sortedMap.put(rs[j].getRegionName().toString(), rs[j]);
}
}
LOG.info("Made 4 regions");
// The splits should have been even. Test I can get some arbitrary row out
// of each.
int interval = (LAST_CHAR - FIRST_CHAR) / 3;
for (HRegion r: sortedMap.values()) {
assertGet(r, COLFAMILY_NAME3, new Text(new String(b)));
b[0] += interval;
}
}
/**
* Test that a region is cleaned up after its daughter splits release all
* references.
* @throws Exception
*/
public void testSplitRegionIsDeleted() throws Exception {
final int timeout = 60;
// Start up a hbase cluster
this.conf.set(HConstants.HBASE_DIR, this.testDir.toString());
MiniHBaseCluster.MasterThread masterThread =
MiniHBaseCluster.startMaster(this.conf);
List<MiniHBaseCluster.RegionServerThread> regionServerThreads =
MiniHBaseCluster.startRegionServers(this.conf, 1);
HTable meta = null;
HTable t = null;
try {
// Create a table.
HBaseAdmin admin = new HBaseAdmin(this.conf);
admin.createTable(createTableDescriptor(getName()));
// Get connection on the meta table and get count of rows.
meta = new HTable(this.conf, HConstants.META_TABLE_NAME);
int count = count(meta, HConstants.COLUMN_FAMILY_STR);
t = new HTable(this.conf, new Text(getName()));
addContent(t, COLFAMILY_NAME3);
// All is running in the one JVM so I should be able to get the
// region instance and bring on a split.
HRegionInfo hri =
t.getRegionLocation(HConstants.EMPTY_START_ROW).getRegionInfo();
HRegion r = null;
synchronized(regionServerThreads) {
r = regionServerThreads.get(0).getRegionServer().onlineRegions.
get(hri.getRegionName());
}
// Flush will provoke a split next time the split-checker thread runs.
r.flushcache(false);
// Now, wait until split makes it into the meta table.
for (int i = 0; i < timeout &&
(count(meta, HConstants.COLUMN_FAMILY_STR) <= count); i++) {
Thread.sleep(1000);
}
int oldCount = count;
count = count(meta, HConstants.COLUMN_FAMILY_STR);
if (count <= oldCount) {
throw new IOException("Failed waiting on splits to show up");
}
HRegionInfo parent = getSplitParent(meta);
assertTrue(parent.isOffline());
Path parentDir =
HRegion.getRegionDir(this.testDir, parent.getRegionName());
assertTrue(this.fs.exists(parentDir));
LOG.info("Split happened and parent " + parent.getRegionName() + " is " +
"offline");
// Now, force a compaction. This will rewrite references and make it
// so the parent region becomes deletable.
LOG.info("Starting compaction");
synchronized(regionServerThreads) {
for (MiniHBaseCluster.RegionServerThread thread: regionServerThreads) {
SortedMap<Text, HRegion> regions =
thread.getRegionServer().onlineRegions;
// Retry if ConcurrentModification... alternative of sync'ing is not
// worth it for sake of unit test.
for (int i = 0; i < 10; i++) {
try {
for (HRegion online: regions.values()) {
if (online.getRegionName().toString().startsWith(getName())) {
online.compactStores();
}
}
break;
} catch (ConcurrentModificationException e) {
LOG.warn("Retrying because ..." + e.toString() + " -- one or " +
"two should be fine");
continue;
}
}
}
}
// Now wait until parent disappears.
LOG.info("Waiting on parent " + parent.getRegionName() +
" to disappear");
for (int i = 0; i < timeout && getSplitParent(meta) != null; i++) {
Thread.sleep(1000);
}
assertTrue(getSplitParent(meta) == null);
// Assert cleaned up.
assertFalse(this.fs.exists(parentDir));
} finally {
MiniHBaseCluster.shutdown(masterThread, regionServerThreads);
}
}
private void assertGet(final HRegion r, final String family, final Text k)
throws IOException {
// Now I have k, get values out and assert they are as expected.
byte [][] results = r.get(k, new Text(family),
Integer.MAX_VALUE);
for (int j = 0; j < results.length; j++) {
Text tmp = new Text(results[j]);
// Row should be equal to value every time.
assertEquals(k.toString(), tmp.toString());
}
}
private HRegionInfo getSplitParent(final HTable t)
throws IOException {
HRegionInfo result = null;
HScannerInterface s = t.obtainScanner(HConstants.COL_REGIONINFO_ARRAY,
HConstants.EMPTY_START_ROW, System.currentTimeMillis(), null);
try {
HStoreKey curKey = new HStoreKey();
TreeMap<Text, byte []> curVals = new TreeMap<Text, byte []>();
while(s.next(curKey, curVals)) {
HRegionInfo hri = (HRegionInfo)Writables.
getWritable(curVals.get(HConstants.COL_REGIONINFO), new HRegionInfo());
// Assert that if region is a split region, that it is also offline.
// Otherwise, if not a split region, assert that it is online.
if (hri.isSplit() && hri.isOffline()) {
result = hri;
break;
}
}
return result;
} finally {
s.close();
}
}
/*
* Count of rows in table for given column.
* @param t
* @param column
* @return
* @throws IOException
*/
private int count(final HTable t, final String column)
throws IOException {
int size = 0;
Text [] cols = new Text[] {new Text(column)};
HScannerInterface s = t.obtainScanner(cols, HConstants.EMPTY_START_ROW,
System.currentTimeMillis(), null);
try {
HStoreKey curKey = new HStoreKey();
TreeMap<Text, byte []> curVals = new TreeMap<Text, byte []>();
while(s.next(curKey, curVals)) {
size++;
}
return size;
} finally {
s.close();
}
}
/*
* Assert first value in the passed region is <code>firstValue</code>.
* @param r
* @param column
* @param firstValue
* @throws IOException
*/
private void assertScan(final HRegion r, final String column,
final Text firstValue)
throws IOException {
Text [] cols = new Text[] {new Text(column)};
HInternalScannerInterface s = r.getScanner(cols,
HConstants.EMPTY_START_ROW, System.currentTimeMillis(), null);
try {
HStoreKey curKey = new HStoreKey();
TreeMap<Text, byte []> curVals = new TreeMap<Text, byte []>();
boolean first = true;
OUTER_LOOP: while(s.next(curKey, curVals)) {
for(Text col: curVals.keySet()) {
byte [] val = curVals.get(col);
Text curval = new Text(val);
if (first) {
first = false;
assertTrue(curval.compareTo(firstValue) == 0);
} else {
// Not asserting anything. Might as well break.
break OUTER_LOOP;
}
}
}
} finally {
s.close();
}
}
private HRegion [] split(final HRegion r) throws IOException {
Text midKey = new Text();
assertTrue(r.needsSplit(midKey));
// Assert can get mid key from passed region.
assertGet(r, COLFAMILY_NAME3, midKey);
HRegion [] regions = r.closeAndSplit(midKey, null);
assertEquals(regions.length, 2);
return regions;
}
private void addContent(final HRegion r, final String column)
throws IOException {
Text startKey = r.getRegionInfo().getStartKey();
Text endKey = r.getRegionInfo().getEndKey();
byte [] startKeyBytes = startKey.getBytes();
if (startKeyBytes == null || startKeyBytes.length == 0) {
startKeyBytes = new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
}
// Add rows of three characters. The first character starts with the
// 'a' character and runs up to 'z'. Per first character, we run the
// second character over same range. And same for the third so rows
// (and values) look like this: 'aaa', 'aab', 'aac', etc.
char secondCharStart = (char)startKeyBytes[1];
char thirdCharStart = (char)startKeyBytes[2];
EXIT_ALL_LOOPS: for (char c = (char)startKeyBytes[0]; c <= LAST_CHAR; c++) {
for (char d = secondCharStart; d <= LAST_CHAR; d++) {
for (char e = thirdCharStart; e <= LAST_CHAR; e++) {
byte [] bytes = new byte [] {(byte)c, (byte)d, (byte)e};
Text t = new Text(new String(bytes));
if (endKey != null && endKey.getLength() > 0
&& endKey.compareTo(t) <= 0) {
break EXIT_ALL_LOOPS;
}
long lockid = r.startUpdate(t);
try {
r.put(lockid, new Text(column), bytes);
r.commit(lockid, System.currentTimeMillis());
lockid = -1;
} finally {
if (lockid != -1) {
r.abort(lockid);
}
}
}
// Set start character back to FIRST_CHAR after we've done first loop.
thirdCharStart = FIRST_CHAR;
}
secondCharStart = FIRST_CHAR;
}
}
// TODO: Have HTable and HRegion implement interface that has in it
// startUpdate, put, delete, commit, abort, etc.
private void addContent(final HTable table, final String column)
throws IOException {
byte [] startKeyBytes = new byte [] {FIRST_CHAR, FIRST_CHAR, FIRST_CHAR};
// Add rows of three characters. The first character starts with the
// 'a' character and runs up to 'z'. Per first character, we run the
// second character over same range. And same for the third so rows
// (and values) look like this: 'aaa', 'aab', 'aac', etc.
char secondCharStart = (char)startKeyBytes[1];
char thirdCharStart = (char)startKeyBytes[2];
for (char c = (char)startKeyBytes[0]; c <= LAST_CHAR; c++) {
for (char d = secondCharStart; d <= LAST_CHAR; d++) {
for (char e = thirdCharStart; e <= LAST_CHAR; e++) {
byte [] bytes = new byte [] {(byte)c, (byte)d, (byte)e};
Text t = new Text(new String(bytes));
long lockid = table.startBatchUpdate(t);
try {
table.put(lockid, new Text(column), bytes);
table.commit(lockid, System.currentTimeMillis());
lockid = -1;
} finally {
if (lockid != -1) {
table.abort(lockid);
}
}
}
// Set start character back to FIRST_CHAR after we've done first loop.
thirdCharStart = FIRST_CHAR;
}
secondCharStart = FIRST_CHAR;
}
}
}

View File

@ -107,7 +107,7 @@ public class TestTimestamp extends HBaseClusterTestCase {
// flush everything out to disk
HRegionServer s = cluster.regionServers.get(0);
HRegionServer s = cluster.regionThreads.get(0).getRegionServer();
for(HRegion r: s.onlineRegions.values() ) {
r.flushcache(false);
}