HBASE-8219 Align Offline Merge with Online Merge

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1465943 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
zjushch 2013-04-09 09:31:31 +00:00
parent c18561ec67
commit 6aad836c0e
2 changed files with 41 additions and 76 deletions

View File

@ -66,7 +66,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@ -78,7 +77,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
@ -4219,7 +4218,6 @@ public class HRegion implements HeapSize { // , Writable{
}
FileSystem fs = a.getRegionFileSystem().getFileSystem();
// Make sure each region's cache is empty
a.flushcache();
b.flushcache();
@ -4235,85 +4233,42 @@ public class HRegion implements HeapSize { // , Writable{
LOG.debug("Files for region: " + b);
b.getRegionFileSystem().logFileSystemState(LOG);
}
Configuration conf = a.baseConf;
HTableDescriptor tabledesc = a.getTableDesc();
HLog log = a.getLog();
Path tableDir = a.getRegionFileSystem().getTableDir();
// Presume both are of same region type -- i.e. both user or catalog
// table regions. This way can use comparator.
final byte[] startKey =
(a.comparator.matchingRows(a.getStartKey(), 0, a.getStartKey().length,
HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)
|| b.comparator.matchingRows(b.getStartKey(), 0,
b.getStartKey().length, HConstants.EMPTY_BYTE_ARRAY, 0,
HConstants.EMPTY_BYTE_ARRAY.length))
? HConstants.EMPTY_BYTE_ARRAY
: (a.comparator.compareRows(a.getStartKey(), 0, a.getStartKey().length,
b.getStartKey(), 0, b.getStartKey().length) <= 0
? a.getStartKey()
: b.getStartKey());
final byte[] endKey =
(a.comparator.matchingRows(a.getEndKey(), 0, a.getEndKey().length,
HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)
|| a.comparator.matchingRows(b.getEndKey(), 0, b.getEndKey().length,
HConstants.EMPTY_BYTE_ARRAY, 0,
HConstants.EMPTY_BYTE_ARRAY.length))
? HConstants.EMPTY_BYTE_ARRAY
: (a.comparator.compareRows(a.getEndKey(), 0, a.getEndKey().length,
b.getEndKey(), 0, b.getEndKey().length) <= 0
? b.getEndKey()
: a.getEndKey());
HRegionInfo newRegionInfo = new HRegionInfo(tabledesc.getName(), startKey, endKey);
LOG.info("Creating new region " + newRegionInfo);
HRegionFileSystem regionFs = HRegionFileSystem.createRegionOnFileSystem(
conf, fs, tableDir, newRegionInfo);
LOG.info("starting merge of regions: " + a + " and " + b +
" into new region " + newRegionInfo.toString() +
" with start key <" + Bytes.toStringBinary(startKey) + "> and end key <" +
Bytes.toStringBinary(endKey) + ">");
// Because we compacted the source regions we should have no more than two
// StoreFiles per family and there will be no reference store
Map<byte[], List<StoreFile>> aStoreFiles = a.close();
Map<byte[], List<StoreFile>> bStoreFiles = b.close();
// Move StoreFiles under new region directory
regionFs.commitStoreFiles(aStoreFiles);
regionFs.commitStoreFiles(bStoreFiles);
if (LOG.isDebugEnabled()) {
LOG.debug("Files for new region");
regionFs.logFileSystemState(LOG);
RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true);
if (!rmt.prepare(null)) {
throw new IOException("Unable to merge regions " + a + " and " + b);
}
// Create HRegion and update the metrics
HRegion dstRegion = HRegion.newHRegion(tableDir, log, fs, conf,
newRegionInfo, tabledesc, null);
dstRegion.readRequestsCount.set(a.readRequestsCount.get() + b.readRequestsCount.get());
dstRegion.writeRequestsCount.set(a.writeRequestsCount.get() + b.writeRequestsCount.get());
dstRegion.checkAndMutateChecksFailed.set(
a.checkAndMutateChecksFailed.get() + b.checkAndMutateChecksFailed.get());
dstRegion.checkAndMutateChecksPassed.set(
a.checkAndMutateChecksPassed.get() + b.checkAndMutateChecksPassed.get());
dstRegion.initialize();
dstRegion.compactStores();
HRegionInfo mergedRegionInfo = rmt.getMergedRegionInfo();
LOG.info("starting merge of regions: " + a + " and " + b
+ " into new region " + mergedRegionInfo.getRegionNameAsString()
+ " with start key <"
+ Bytes.toStringBinary(mergedRegionInfo.getStartKey())
+ "> and end key <"
+ Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">");
HRegion dstRegion = null;
try {
dstRegion = rmt.execute(null, null);
} catch (IOException ioe) {
rmt.rollback(null, null);
throw new IOException("Failed merging region " + a + " and " + b
+ ", and succssfully rolled back");
}
dstRegion.compactStores(true);
if (LOG.isDebugEnabled()) {
LOG.debug("Files for new region");
dstRegion.getRegionFileSystem().logFileSystemState(LOG);
}
if (dstRegion.getRegionFileSystem().hasReferences(dstRegion.getTableDesc())) {
throw new IOException("Merged region " + dstRegion
+ " still has references after the compaction, is compaction canceled?");
}
// delete out the 'A' region
HRegionFileSystem.deleteRegionFromFileSystem(
a.getBaseConf(), fs, tableDir, a.getRegionInfo());
// delete out the 'B' region
HRegionFileSystem.deleteRegionFromFileSystem(
b.getBaseConf(), fs, tableDir, b.getRegionInfo());
// Archiving the 'A' region
HFileArchiver.archiveRegion(a.getBaseConf(), fs, a.getRegionInfo());
// Archiving the 'B' region
HFileArchiver.archiveRegion(b.getBaseConf(), fs, b.getRegionInfo());
LOG.info("merge completed. New region is " + dstRegion);
return dstRegion;

View File

@ -28,6 +28,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.Server;
@ -402,12 +403,20 @@ public class RegionMergeTransaction {
byte[] startKey = null;
byte[] endKey = null;
// Choose the smaller as start key
if (a.compareTo(b) <= 0) {
startKey = a.getStartKey();
endKey = b.getEndKey();
} else {
startKey = b.getStartKey();
}
// Choose the bigger as end key
if (a.getComparator().matchingRows(a.getEndKey(), 0, a.getEndKey().length,
HConstants.EMPTY_BYTE_ARRAY, 0, HConstants.EMPTY_BYTE_ARRAY.length)
|| a.getComparator().compareRows(a.getEndKey(), 0,
a.getEndKey().length, b.getEndKey(), 0, b.getEndKey().length) > 0) {
endKey = a.getEndKey();
} else {
endKey = b.getEndKey();
}
// Merged region is sorted between two merging regions in META
@ -756,6 +765,7 @@ public class RegionMergeTransaction {
*/
boolean hasMergeQualifierInMeta(final RegionServerServices services,
final byte[] regionName) throws IOException {
if (services == null) return false;
// Get merge regions if it is a merged region and already has merge
// qualifier
Pair<HRegionInfo, HRegionInfo> mergeRegions = MetaReader