HADOOP-1391. Part 2 - table compaction via merging adjacent regions that have shrunk.
git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@543841 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a2fba1024d
commit
66839c4c17
|
@ -20,3 +20,5 @@ Trunk (unreleased changes)
|
|||
10. HADOOP-1430. HBase shutdown leaves regionservers up.
|
||||
11. HADOOP-1392. Part1: includes create/delete table; enable/disable table;
|
||||
add/remove column.
|
||||
12. HADOOP-1392. Part2: includes table compaction by merging adjacent regions
|
||||
that have shrunk in size.
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.lang.Class;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
|
@ -131,7 +130,7 @@ public class HClient implements HConstants {
|
|||
}
|
||||
|
||||
/* Find the address of the master and connect to it */
|
||||
private void checkMaster() throws IOException {
|
||||
private void checkMaster() throws MasterNotRunningException {
|
||||
if (this.master != null) {
|
||||
return;
|
||||
}
|
||||
|
@ -174,6 +173,21 @@ public class HClient implements HConstants {
|
|||
// Administrative methods
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* @return - true if the master server is running
|
||||
*/
|
||||
public boolean isMasterRunning() {
|
||||
if(this.master == null) {
|
||||
try {
|
||||
checkMaster();
|
||||
|
||||
} catch(MasterNotRunningException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new table
|
||||
*
|
||||
|
@ -305,10 +319,6 @@ public class HClient implements HConstants {
|
|||
}
|
||||
}
|
||||
|
||||
public synchronized void mergeRegions(Text regionName1, Text regionName2) throws IOException {
|
||||
|
||||
}
|
||||
|
||||
public synchronized void enableTable(Text tableName) throws IOException {
|
||||
checkReservedTableName(tableName);
|
||||
checkMaster();
|
||||
|
|
|
@ -576,13 +576,17 @@ public class HMaster implements HConstants, HMasterInterface,
|
|||
if(! fs.exists(rootRegionDir)) {
|
||||
LOG.info("bootstrap: creating ROOT and first META regions");
|
||||
try {
|
||||
HRegion root = createNewHRegion(HGlobals.rootTableDesc, 0L);
|
||||
HRegion meta = createNewHRegion(HGlobals.metaTableDesc, 1L);
|
||||
HRegion root = HRegion.createNewHRegion(fs, dir, conf,
|
||||
HGlobals.rootTableDesc, 0L, null, null);
|
||||
HRegion meta = HRegion.createNewHRegion(fs, dir, conf,
|
||||
HGlobals.metaTableDesc, 1L, null, null);
|
||||
|
||||
addTableToMeta(root, meta);
|
||||
HRegion.addRegionToMeta(root, meta);
|
||||
|
||||
root.close();
|
||||
root.getLog().close();
|
||||
meta.close();
|
||||
meta.getLog().close();
|
||||
|
||||
} catch(IOException e) {
|
||||
LOG.error(e);
|
||||
|
@ -1621,7 +1625,8 @@ public class HMaster implements HConstants, HMasterInterface,
|
|||
|
||||
// 2. Create the HRegion
|
||||
|
||||
HRegion r = createNewHRegion(desc, newRegion.regionId);
|
||||
HRegion r = HRegion.createNewHRegion(fs, dir, conf, desc,
|
||||
newRegion.regionId, null, null);
|
||||
|
||||
// 3. Insert into meta
|
||||
|
||||
|
@ -1659,53 +1664,6 @@ public class HMaster implements HConstants, HMasterInterface,
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal method to create a new HRegion. Used by createTable and by the
|
||||
* bootstrap code in the HMaster constructor
|
||||
*
|
||||
* @param desc - table descriptor
|
||||
* @param regionId - region id
|
||||
* @return - new HRegion
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private HRegion createNewHRegion(HTableDescriptor desc, long regionId)
|
||||
throws IOException {
|
||||
|
||||
HRegionInfo info = new HRegionInfo(regionId, desc, null, null);
|
||||
Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName);
|
||||
fs.mkdirs(regionDir);
|
||||
|
||||
return new HRegion(dir,
|
||||
new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf),
|
||||
fs, conf, info, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts a new table's meta information into the meta table. Used by
|
||||
* the HMaster bootstrap code.
|
||||
*
|
||||
* @param meta - HRegion to be updated
|
||||
* @param table - HRegion of new table
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private void addTableToMeta(HRegion meta, HRegion table) throws IOException {
|
||||
|
||||
// The row key is the region name
|
||||
|
||||
long writeid = meta.startUpdate(table.getRegionName());
|
||||
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||
DataOutputStream s = new DataOutputStream(bytes);
|
||||
|
||||
table.getRegionInfo().write(s);
|
||||
|
||||
meta.put(writeid, COL_REGIONINFO, new BytesWritable(bytes.toByteArray()));
|
||||
|
||||
meta.commit(writeid);
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.apache.hadoop.hbase.HMasterInterface#deleteTable(org.apache.hadoop.io.Text)
|
||||
*/
|
||||
|
@ -1730,13 +1688,6 @@ public class HMaster implements HConstants, HMasterInterface,
|
|||
new DeleteColumn(tableName, HStoreKey.extractFamily(columnName)).process();
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.apache.hadoop.hbase.HMasterInterface#mergeRegions(org.apache.hadoop.io.Text, org.apache.hadoop.io.Text)
|
||||
*/
|
||||
public void mergeRegions(Text regionName1, Text regionName2) throws IOException {
|
||||
//TODO
|
||||
}
|
||||
|
||||
/* (non-Javadoc)
|
||||
* @see org.apache.hadoop.hbase.HMasterInterface#enableTable(org.apache.hadoop.io.Text)
|
||||
*/
|
||||
|
|
|
@ -44,8 +44,6 @@ public interface HMasterInterface extends VersionedProtocol {
|
|||
public void addColumn(Text tableName, HColumnDescriptor column) throws IOException;
|
||||
public void deleteColumn(Text tableName, Text columnName) throws IOException;
|
||||
|
||||
public void mergeRegions(Text regionName1, Text regionName2) throws IOException;
|
||||
|
||||
public void enableTable(Text tableName) throws IOException;
|
||||
public void disableTable(Text tableName) throws IOException;
|
||||
|
||||
|
|
|
@ -0,0 +1,418 @@
|
|||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.io.DataInputBuffer;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
public class HMerge implements HConstants {
|
||||
private static final Log LOG = LogFactory.getLog(HMerge.class);
|
||||
private static final Text[] META_COLS = {COL_REGIONINFO};
|
||||
|
||||
private HMerge() {} // Not instantiable
|
||||
|
||||
/**
|
||||
* Scans the table and merges two adjacent regions if they are small. This
|
||||
* only happens when a lot of rows are deleted.
|
||||
*
|
||||
* When merging the META region, the HBase instance must be offline.
|
||||
* When merging a normal table, the HBase instance must be online, but the
|
||||
* table must be disabled.
|
||||
*
|
||||
* @param conf - configuration object for HBase
|
||||
* @param fs - FileSystem where regions reside
|
||||
* @param tableName - Table to be compacted
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void merge(Configuration conf, FileSystem fs, Text tableName)
|
||||
throws IOException {
|
||||
|
||||
HClient client = new HClient(conf);
|
||||
boolean masterIsRunning = client.isMasterRunning();
|
||||
if(tableName.equals(META_TABLE_NAME)) {
|
||||
if(masterIsRunning) {
|
||||
throw new IllegalStateException(
|
||||
"Can not compact META table if instance is on-line");
|
||||
}
|
||||
new OfflineMerger(conf, fs, META_TABLE_NAME).process();
|
||||
|
||||
} else {
|
||||
if(!masterIsRunning) {
|
||||
throw new IllegalStateException(
|
||||
"HBase instance must be running to merge a normal table");
|
||||
}
|
||||
new OnlineMerger(conf, fs, client, tableName).process();
|
||||
}
|
||||
}
|
||||
|
||||
private static abstract class Merger {
|
||||
protected Configuration conf;
|
||||
protected FileSystem fs;
|
||||
protected Text tableName;
|
||||
protected Path dir;
|
||||
protected Path basedir;
|
||||
protected HLog hlog;
|
||||
protected DataInputBuffer in;
|
||||
protected boolean more;
|
||||
protected HStoreKey key;
|
||||
protected HRegionInfo info;
|
||||
|
||||
protected Merger(Configuration conf, FileSystem fs, Text tableName)
|
||||
throws IOException {
|
||||
|
||||
this.conf = conf;
|
||||
this.fs = fs;
|
||||
this.tableName = tableName;
|
||||
this.in = new DataInputBuffer();
|
||||
this.more = true;
|
||||
this.key = new HStoreKey();
|
||||
this.info = new HRegionInfo();
|
||||
this.dir = new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR));
|
||||
this.basedir = new Path(dir, "merge_" + System.currentTimeMillis());
|
||||
fs.mkdirs(basedir);
|
||||
this.hlog = new HLog(fs, new Path(basedir, HREGION_LOGDIR_NAME), conf);
|
||||
}
|
||||
|
||||
public void process() throws IOException {
|
||||
try {
|
||||
while(more) {
|
||||
TreeSet<HRegionInfo> regionsToMerge = next();
|
||||
if(regionsToMerge == null) {
|
||||
break;
|
||||
}
|
||||
merge(regionsToMerge.toArray(new HRegionInfo[regionsToMerge.size()]));
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
hlog.close();
|
||||
|
||||
} catch(IOException e) {
|
||||
LOG.error(e);
|
||||
}
|
||||
try {
|
||||
fs.delete(basedir);
|
||||
|
||||
} catch(IOException e) {
|
||||
LOG.error(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void merge(HRegionInfo[] regions) throws IOException {
|
||||
if(regions.length < 2) {
|
||||
LOG.info("only one region - nothing to merge");
|
||||
return;
|
||||
}
|
||||
|
||||
HRegion currentRegion = null;
|
||||
long currentSize = 0;
|
||||
HRegion nextRegion = null;
|
||||
long nextSize = 0;
|
||||
for(int i = 0; i < regions.length - 1; i++) {
|
||||
if(currentRegion == null) {
|
||||
currentRegion =
|
||||
new HRegion(dir, hlog, fs, conf, regions[i], null, null);
|
||||
|
||||
currentSize = currentRegion.largestHStore();
|
||||
}
|
||||
nextRegion =
|
||||
new HRegion(dir, hlog, fs, conf, regions[i + 1], null, null);
|
||||
|
||||
nextSize = nextRegion.largestHStore();
|
||||
|
||||
if((currentSize + nextSize) <= (DESIRED_MAX_FILE_SIZE / 2)) {
|
||||
// We merge two adjacent regions if their total size is less than
|
||||
// one half of the desired maximum size
|
||||
|
||||
LOG.info("merging regions " + currentRegion.getRegionName()
|
||||
+ " and " + nextRegion.getRegionName());
|
||||
|
||||
HRegion mergedRegion = HRegion.closeAndMerge(currentRegion, nextRegion);
|
||||
|
||||
updateMeta(currentRegion.getRegionName(), nextRegion.getRegionName(),
|
||||
mergedRegion);
|
||||
|
||||
currentRegion = null;
|
||||
i++;
|
||||
continue;
|
||||
|
||||
} else {
|
||||
LOG.info("not merging regions " + currentRegion.getRegionName()
|
||||
+ " and " + nextRegion.getRegionName());
|
||||
}
|
||||
|
||||
currentRegion.close();
|
||||
currentRegion = nextRegion;
|
||||
currentSize = nextSize;
|
||||
}
|
||||
if(currentRegion != null) {
|
||||
currentRegion.close();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract TreeSet<HRegionInfo> next() throws IOException;
|
||||
|
||||
protected abstract void updateMeta(Text oldRegion1, Text oldRegion2,
|
||||
HRegion newRegion) throws IOException;
|
||||
|
||||
}
|
||||
|
||||
private static class OnlineMerger extends Merger {
|
||||
private HClient client;
|
||||
private HScannerInterface metaScanner;
|
||||
private HRegionInfo latestRegion;
|
||||
|
||||
public OnlineMerger(Configuration conf, FileSystem fs, HClient client,
|
||||
Text tableName) throws IOException {
|
||||
|
||||
super(conf, fs, tableName);
|
||||
this.client = client;
|
||||
client.openTable(META_TABLE_NAME);
|
||||
this.metaScanner = client.obtainScanner(META_COLS, new Text());
|
||||
this.latestRegion = null;
|
||||
}
|
||||
|
||||
private HRegionInfo nextRegion() throws IOException {
|
||||
try {
|
||||
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
|
||||
if(! metaScanner.next(key, results)) {
|
||||
more = false;
|
||||
return null;
|
||||
}
|
||||
byte[] bytes = results.get(COL_REGIONINFO);
|
||||
if(bytes == null || bytes.length == 0) {
|
||||
throw new NoSuchElementException("meta region entry missing "
|
||||
+ COL_REGIONINFO);
|
||||
}
|
||||
HRegionInfo region = new HRegionInfo(bytes);
|
||||
if(!region.offLine) {
|
||||
throw new TableNotDisabledException("region " + region.regionName
|
||||
+ " is not disabled");
|
||||
}
|
||||
return region;
|
||||
|
||||
} catch(IOException e) {
|
||||
try {
|
||||
metaScanner.close();
|
||||
|
||||
} catch(IOException ex) {
|
||||
LOG.error(ex);
|
||||
}
|
||||
more = false;
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
protected TreeSet<HRegionInfo> next() throws IOException {
|
||||
TreeSet<HRegionInfo> regions = new TreeSet<HRegionInfo>();
|
||||
if(latestRegion == null) {
|
||||
latestRegion = nextRegion();
|
||||
}
|
||||
if(latestRegion != null) {
|
||||
regions.add(latestRegion);
|
||||
}
|
||||
latestRegion = nextRegion();
|
||||
if(latestRegion != null) {
|
||||
regions.add(latestRegion);
|
||||
}
|
||||
return regions;
|
||||
}
|
||||
|
||||
protected void updateMeta(Text oldRegion1, Text oldRegion2,
|
||||
HRegion newRegion) throws IOException {
|
||||
Text[] regionsToDelete = {
|
||||
oldRegion1,
|
||||
oldRegion2
|
||||
};
|
||||
for(int r = 0; r < regionsToDelete.length; r++) {
|
||||
if(regionsToDelete[r].equals(latestRegion.regionName)) {
|
||||
latestRegion = null;
|
||||
}
|
||||
long lockid = -1L;
|
||||
try {
|
||||
lockid = client.startUpdate(regionsToDelete[r]);
|
||||
client.delete(lockid, COL_REGIONINFO);
|
||||
client.delete(lockid, COL_SERVER);
|
||||
client.delete(lockid, COL_STARTCODE);
|
||||
client.commit(lockid);
|
||||
lockid = -1L;
|
||||
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("updated columns in row: " + regionsToDelete[r]);
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
if(lockid != -1L) {
|
||||
client.abort(lockid);
|
||||
}
|
||||
|
||||
} catch(IOException iex) {
|
||||
LOG.error(iex);
|
||||
}
|
||||
}
|
||||
}
|
||||
ByteArrayOutputStream byteValue = new ByteArrayOutputStream();
|
||||
DataOutputStream s = new DataOutputStream(byteValue);
|
||||
newRegion.getRegionInfo().offLine = true;
|
||||
newRegion.getRegionInfo().write(s);
|
||||
long lockid = -1L;
|
||||
try {
|
||||
lockid = client.startUpdate(newRegion.getRegionName());
|
||||
client.put(lockid, COL_REGIONINFO, byteValue.toByteArray());
|
||||
client.commit(lockid);
|
||||
lockid = -1L;
|
||||
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("updated columns in row: "
|
||||
+ newRegion.getRegionName());
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
if(lockid != -1L) {
|
||||
client.abort(lockid);
|
||||
}
|
||||
|
||||
} catch(IOException iex) {
|
||||
LOG.error(iex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class OfflineMerger extends Merger {
|
||||
private Path dir;
|
||||
private TreeSet<HRegionInfo> metaRegions;
|
||||
private TreeMap<Text, BytesWritable> results;
|
||||
|
||||
public OfflineMerger(Configuration conf, FileSystem fs, Text tableName)
|
||||
throws IOException {
|
||||
|
||||
super(conf, fs, tableName);
|
||||
this.dir = new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR));
|
||||
this.metaRegions = new TreeSet<HRegionInfo>();
|
||||
this.results = new TreeMap<Text, BytesWritable>();
|
||||
|
||||
// Scan root region to find all the meta regions
|
||||
|
||||
HRegion root = new HRegion(dir, hlog,fs, conf, HGlobals.rootRegionInfo,
|
||||
null, null);
|
||||
|
||||
HInternalScannerInterface rootScanner =
|
||||
root.getScanner(META_COLS, new Text());
|
||||
|
||||
try {
|
||||
while(rootScanner.next(key, results)) {
|
||||
for(BytesWritable b: results.values()) {
|
||||
byte[] bytes = new byte[b.getSize()];
|
||||
System.arraycopy(b.get(), 0, bytes, 0, bytes.length);
|
||||
in.reset(bytes, bytes.length);
|
||||
info.readFields(in);
|
||||
metaRegions.add(info);
|
||||
results.clear();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
rootScanner.close();
|
||||
try {
|
||||
root.close();
|
||||
|
||||
} catch(IOException e) {
|
||||
LOG.error(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected TreeSet<HRegionInfo> next() throws IOException {
|
||||
more = false;
|
||||
return metaRegions;
|
||||
}
|
||||
|
||||
protected void updateMeta(Text oldRegion1, Text oldRegion2,
|
||||
HRegion newRegion) throws IOException {
|
||||
|
||||
HRegion root =
|
||||
new HRegion(dir, hlog, fs, conf, HGlobals.rootRegionInfo, null, null);
|
||||
|
||||
Text[] regionsToDelete = {
|
||||
oldRegion1,
|
||||
oldRegion2
|
||||
};
|
||||
for(int r = 0; r < regionsToDelete.length; r++) {
|
||||
long lockid = -1L;
|
||||
try {
|
||||
lockid = root.startUpdate(regionsToDelete[r]);
|
||||
root.delete(lockid, COL_REGIONINFO);
|
||||
root.delete(lockid, COL_SERVER);
|
||||
root.delete(lockid, COL_STARTCODE);
|
||||
root.commit(lockid);
|
||||
lockid = -1L;
|
||||
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("updated columns in row: " + regionsToDelete[r]);
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
if(lockid != -1L) {
|
||||
root.abort(lockid);
|
||||
}
|
||||
|
||||
} catch(IOException iex) {
|
||||
LOG.error(iex);
|
||||
}
|
||||
}
|
||||
}
|
||||
ByteArrayOutputStream byteValue = new ByteArrayOutputStream();
|
||||
DataOutputStream s = new DataOutputStream(byteValue);
|
||||
newRegion.getRegionInfo().offLine = true;
|
||||
newRegion.getRegionInfo().write(s);
|
||||
long lockid = -1L;
|
||||
try {
|
||||
lockid = root.startUpdate(newRegion.getRegionName());
|
||||
root.put(lockid, COL_REGIONINFO,
|
||||
new BytesWritable(byteValue.toByteArray()));
|
||||
root.commit(lockid);
|
||||
lockid = -1L;
|
||||
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("updated columns in row: "
|
||||
+ newRegion.getRegionName());
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
if(lockid != -1L) {
|
||||
root.abort(lockid);
|
||||
}
|
||||
|
||||
} catch(IOException iex) {
|
||||
LOG.error(iex);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -142,8 +142,7 @@ public class HRegion implements HConstants {
|
|||
|
||||
TreeSet<HStoreFile> alreadyMerged = new TreeSet<HStoreFile>();
|
||||
TreeMap<Text, Vector<HStoreFile>> filesToMerge = new TreeMap<Text, Vector<HStoreFile>>();
|
||||
for(Iterator<HStoreFile> it = srcA.flushcache(true).iterator(); it.hasNext(); ) {
|
||||
HStoreFile src = it.next();
|
||||
for(HStoreFile src: srcA.flushcache(true)) {
|
||||
Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
|
||||
if(v == null) {
|
||||
v = new Vector<HStoreFile>();
|
||||
|
@ -156,8 +155,7 @@ public class HRegion implements HConstants {
|
|||
LOG.debug("flushing and getting file names for region " + srcB.getRegionName());
|
||||
}
|
||||
|
||||
for(Iterator<HStoreFile> it = srcB.flushcache(true).iterator(); it.hasNext(); ) {
|
||||
HStoreFile src = it.next();
|
||||
for(HStoreFile src: srcB.flushcache(true)) {
|
||||
Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
|
||||
if(v == null) {
|
||||
v = new Vector<HStoreFile>();
|
||||
|
@ -189,9 +187,7 @@ public class HRegion implements HConstants {
|
|||
}
|
||||
|
||||
filesToMerge.clear();
|
||||
for(Iterator<HStoreFile> it = srcA.close().iterator(); it.hasNext(); ) {
|
||||
HStoreFile src = it.next();
|
||||
|
||||
for(HStoreFile src: srcA.close()) {
|
||||
if(! alreadyMerged.contains(src)) {
|
||||
Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
|
||||
if(v == null) {
|
||||
|
@ -207,9 +203,7 @@ public class HRegion implements HConstants {
|
|||
+ srcB.getRegionName());
|
||||
}
|
||||
|
||||
for(Iterator<HStoreFile> it = srcB.close().iterator(); it.hasNext(); ) {
|
||||
HStoreFile src = it.next();
|
||||
|
||||
for(HStoreFile src: srcB.close()) {
|
||||
if(! alreadyMerged.contains(src)) {
|
||||
Vector<HStoreFile> v = filesToMerge.get(src.getColFamily());
|
||||
if(v == null) {
|
||||
|
@ -246,6 +240,59 @@ public class HRegion implements HConstants {
|
|||
return dstRegion;
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal method to create a new HRegion. Used by createTable and by the
|
||||
* bootstrap code in the HMaster constructor
|
||||
*
|
||||
* @param fs - file system to create region in
|
||||
* @param dir - base directory
|
||||
* @param conf - configuration object
|
||||
* @param desc - table descriptor
|
||||
* @param regionId - region id
|
||||
* @param startKey - first key in region
|
||||
* @param endKey - last key in region
|
||||
* @return - new HRegion
|
||||
* @throws IOException
|
||||
*/
|
||||
public static HRegion createNewHRegion(FileSystem fs, Path dir,
|
||||
Configuration conf, HTableDescriptor desc, long regionId, Text startKey,
|
||||
Text endKey) throws IOException {
|
||||
|
||||
HRegionInfo info = new HRegionInfo(regionId, desc, startKey, endKey);
|
||||
Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName);
|
||||
fs.mkdirs(regionDir);
|
||||
|
||||
return new HRegion(dir,
|
||||
new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf),
|
||||
fs, conf, info, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Inserts a new table's meta information into the meta table. Used by
|
||||
* the HMaster bootstrap code.
|
||||
*
|
||||
* @param meta - HRegion to be updated
|
||||
* @param table - HRegion of new table
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public static void addRegionToMeta(HRegion meta, HRegion table)
|
||||
throws IOException {
|
||||
|
||||
// The row key is the region name
|
||||
|
||||
long writeid = meta.startUpdate(table.getRegionName());
|
||||
|
||||
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
|
||||
DataOutputStream s = new DataOutputStream(bytes);
|
||||
|
||||
table.getRegionInfo().write(s);
|
||||
|
||||
meta.put(writeid, COL_REGIONINFO, new BytesWritable(bytes.toByteArray()));
|
||||
|
||||
meta.commit(writeid);
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// Members
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -627,6 +674,28 @@ public class HRegion implements HConstants {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return - returns the size of the largest HStore
|
||||
*/
|
||||
public long largestHStore() {
|
||||
long maxsize = 0;
|
||||
lock.obtainReadLock();
|
||||
try {
|
||||
Text key = new Text();
|
||||
for(HStore h: stores.values()) {
|
||||
long size = h.getLargestFileSize(key);
|
||||
|
||||
if(size > maxsize) { // Largest so far
|
||||
maxsize = size;
|
||||
}
|
||||
}
|
||||
return maxsize;
|
||||
|
||||
} finally {
|
||||
lock.releaseReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the region should be compacted.
|
||||
*/
|
||||
|
|
|
@ -22,20 +22,20 @@ import java.io.DataOutput;
|
|||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
|
||||
/**
|
||||
* HRegion information.
|
||||
* Contains HRegion id, start and end keys, a reference to this
|
||||
* HRegions' table descriptor, etc.
|
||||
*/
|
||||
public class HRegionInfo implements Writable {
|
||||
public class HRegionInfo implements WritableComparable {
|
||||
public Text regionName;
|
||||
public long regionId;
|
||||
public HTableDescriptor tableDesc;
|
||||
public Text startKey;
|
||||
public Text endKey;
|
||||
public Text regionName;
|
||||
public boolean offLine;
|
||||
public HTableDescriptor tableDesc;
|
||||
|
||||
public HRegionInfo() {
|
||||
this.regionId = 0;
|
||||
|
@ -87,6 +87,22 @@ public class HRegionInfo implements Writable {
|
|||
this.tableDesc.toString() + "}";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
return this.compareTo(o) == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = this.regionName.hashCode();
|
||||
result ^= Long.valueOf(this.regionId).hashCode();
|
||||
result ^= this.startKey.hashCode();
|
||||
result ^= this.endKey.hashCode();
|
||||
result ^= Boolean.valueOf(this.offLine).hashCode();
|
||||
result ^= this.tableDesc.hashCode();
|
||||
return result;
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// Writable
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -108,4 +124,13 @@ public class HRegionInfo implements Writable {
|
|||
this.regionName.readFields(in);
|
||||
this.offLine = in.readBoolean();
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// Comparable
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
public int compareTo(Object o) {
|
||||
HRegionInfo other = (HRegionInfo)o;
|
||||
return regionName.compareTo(other.regionName);
|
||||
}
|
||||
}
|
|
@ -824,33 +824,6 @@ public class HRegionServer
|
|||
}
|
||||
}
|
||||
|
||||
/*****************************************************************************
|
||||
* TODO - Figure out how the master is to determine when regions should be
|
||||
* merged. It once it makes this determination, it needs to ensure that
|
||||
* the regions to be merged are first being served by the same
|
||||
* HRegionServer and if not, move them so they are.
|
||||
*
|
||||
* For now, we do not do merging. Splits are driven by the HRegionServer.
|
||||
****************************************************************************/
|
||||
/*
|
||||
private void mergeRegions(Text regionNameA, Text regionNameB) throws IOException {
|
||||
locking.writeLock().lock();
|
||||
try {
|
||||
HRegion srcA = regions.remove(regionNameA);
|
||||
HRegion srcB = regions.remove(regionNameB);
|
||||
HRegion newRegion = HRegion.closeAndMerge(srcA, srcB);
|
||||
regions.put(newRegion.getRegionName(), newRegion);
|
||||
|
||||
reportClose(srcA);
|
||||
reportClose(srcB);
|
||||
reportOpen(newRegion);
|
||||
|
||||
} finally {
|
||||
locking.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// HRegionInterface
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -20,7 +20,6 @@ import java.io.DataOutput;
|
|||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.TreeMap;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
|
|
@ -40,10 +40,31 @@ public class MiniHBaseCluster implements HConstants {
|
|||
private HRegionServer[] regionServers;
|
||||
private Thread[] regionThreads;
|
||||
|
||||
/**
|
||||
* Starts a MiniHBaseCluster on top of a new MiniDFSCluster
|
||||
*
|
||||
* @param conf
|
||||
* @param nRegionNodes
|
||||
*/
|
||||
public MiniHBaseCluster(Configuration conf, int nRegionNodes) {
|
||||
this(conf, nRegionNodes, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts a MiniHBaseCluster on top of an existing HDFSCluster
|
||||
*
|
||||
* @param conf
|
||||
* @param nRegionNodes
|
||||
* @param dfsCluster
|
||||
*/
|
||||
public MiniHBaseCluster(Configuration conf, int nRegionNodes,
|
||||
MiniDFSCluster dfsCluster) {
|
||||
|
||||
this.conf = conf;
|
||||
this.cluster = dfsCluster;
|
||||
init(nRegionNodes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param conf
|
||||
|
@ -56,21 +77,22 @@ public class MiniHBaseCluster implements HConstants {
|
|||
final boolean miniHdfsFilesystem) {
|
||||
this.conf = conf;
|
||||
|
||||
try {
|
||||
try {
|
||||
if(System.getProperty(StaticTestEnvironment.TEST_DIRECTORY_KEY) == null) {
|
||||
File testDir = new File(new File("").getAbsolutePath(),
|
||||
"build/contrib/hbase/test");
|
||||
|
||||
String dir = testDir.getAbsolutePath();
|
||||
LOG.info("Setting test.build.data to " + dir);
|
||||
System.setProperty(StaticTestEnvironment.TEST_DIRECTORY_KEY, dir);
|
||||
}
|
||||
|
||||
if (miniHdfsFilesystem) {
|
||||
this.cluster =
|
||||
new MiniDFSCluster(this.conf, 2, true, (String[])null);
|
||||
try {
|
||||
this.cluster = new MiniDFSCluster(this.conf, 2, true, (String[])null);
|
||||
|
||||
} catch(Throwable t) {
|
||||
LOG.error("Failed setup of mini dfs cluster", t);
|
||||
t.printStackTrace();
|
||||
return;
|
||||
}
|
||||
}
|
||||
init(nRegionNodes);
|
||||
}
|
||||
|
||||
private void init(int nRegionNodes) {
|
||||
try {
|
||||
try {
|
||||
this.fs = FileSystem.get(conf);
|
||||
this.parentdir = new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR));
|
||||
fs.mkdirs(parentdir);
|
||||
|
|
|
@ -0,0 +1,170 @@
|
|||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.dfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
/** Tests region merging */
|
||||
public class TestMerge extends HBaseTestCase {
|
||||
private static final Text COLUMN_NAME = new Text("contents:");
|
||||
private Random rand;
|
||||
private HTableDescriptor desc;
|
||||
private BytesWritable value;
|
||||
|
||||
private MiniDFSCluster dfsCluster;
|
||||
private FileSystem fs;
|
||||
private Path dir;
|
||||
|
||||
private MiniHBaseCluster hCluster;
|
||||
|
||||
public void testMerge() {
|
||||
setup();
|
||||
startMiniDFSCluster();
|
||||
createRegions();
|
||||
try {
|
||||
HMerge.merge(conf, fs, HConstants.META_TABLE_NAME);
|
||||
|
||||
hCluster = new MiniHBaseCluster(conf, 1, dfsCluster);
|
||||
try {
|
||||
HMerge.merge(conf, fs, desc.getName());
|
||||
|
||||
} finally {
|
||||
hCluster.shutdown();
|
||||
}
|
||||
|
||||
} catch(Throwable t) {
|
||||
t.printStackTrace();
|
||||
fail();
|
||||
|
||||
} finally {
|
||||
dfsCluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private void setup() {
|
||||
rand = new Random();
|
||||
desc = new HTableDescriptor("test");
|
||||
desc.addFamily(new HColumnDescriptor(COLUMN_NAME.toString()));
|
||||
|
||||
// We will use the same value for the rows as that is not really important here
|
||||
|
||||
String partialValue = String.valueOf(System.currentTimeMillis());
|
||||
StringBuilder val = new StringBuilder();
|
||||
while(val.length() < 1024) {
|
||||
val.append(partialValue);
|
||||
}
|
||||
try {
|
||||
value = new BytesWritable(val.toString().getBytes(HConstants.UTF8_ENCODING));
|
||||
|
||||
} catch(UnsupportedEncodingException e) {
|
||||
fail();
|
||||
}
|
||||
}
|
||||
|
||||
private void startMiniDFSCluster() {
|
||||
try {
|
||||
dfsCluster = new MiniDFSCluster(conf, 2, true, (String[])null);
|
||||
fs = dfsCluster.getFileSystem();
|
||||
dir = new Path("/hbase");
|
||||
fs.mkdirs(dir);
|
||||
|
||||
} catch(Throwable t) {
|
||||
t.printStackTrace();
|
||||
fail();
|
||||
}
|
||||
}
|
||||
|
||||
private void createRegions() {
|
||||
// We create three data regions: The first is too large to merge since it
|
||||
// will be > 64 MB in size. The second two will be smaller and will be
|
||||
// selected for merging.
|
||||
|
||||
// To ensure that the first region is larger than 64MB we need to write at
|
||||
// least 65536 rows. We will make certain by writing 70000
|
||||
|
||||
try {
|
||||
Text row_70001 = new Text("row_70001");
|
||||
Text row_80001 = new Text("row_80001");
|
||||
|
||||
HRegion[] regions = {
|
||||
createAregion(null, row_70001, 1, 70000),
|
||||
createAregion(row_70001, row_80001, 70001, 10000),
|
||||
createAregion(row_80001, null, 80001, 10000)
|
||||
};
|
||||
|
||||
// Now create the root and meta regions and insert the data regions
|
||||
// created above into the meta
|
||||
|
||||
HRegion root = HRegion.createNewHRegion(fs, dir, conf,
|
||||
HGlobals.rootTableDesc, 0L, null, null);
|
||||
HRegion meta = HRegion.createNewHRegion(fs, dir, conf,
|
||||
HGlobals.metaTableDesc, 1L, null, null);
|
||||
|
||||
HRegion.addRegionToMeta(root, meta);
|
||||
|
||||
for(int i = 0; i < regions.length; i++) {
|
||||
HRegion.addRegionToMeta(meta, regions[i]);
|
||||
}
|
||||
|
||||
root.close();
|
||||
root.getLog().close();
|
||||
fs.delete(new Path(root.getRegionDir(), HConstants.HREGION_LOGDIR_NAME));
|
||||
meta.close();
|
||||
meta.getLog().close();
|
||||
fs.delete(new Path(meta.getRegionDir(), HConstants.HREGION_LOGDIR_NAME));
|
||||
|
||||
} catch(Throwable t) {
|
||||
t.printStackTrace();
|
||||
fail();
|
||||
}
|
||||
}
|
||||
|
||||
private HRegion createAregion(Text startKey, Text endKey, int firstRow, int nrows)
|
||||
throws IOException {
|
||||
HRegion region = HRegion.createNewHRegion(fs, dir, conf, desc,
|
||||
rand.nextLong(), startKey, endKey);
|
||||
|
||||
System.out.println("created region " + region.getRegionName());
|
||||
|
||||
for(int i = firstRow; i < firstRow + nrows; i++) {
|
||||
long lockid = region.startUpdate(new Text("row_"
|
||||
+ String.format("%1$05d", i)));
|
||||
|
||||
region.put(lockid, COLUMN_NAME, value);
|
||||
region.commit(lockid);
|
||||
if(i % 10000 == 0) {
|
||||
System.out.println("Flushing write #" + i);
|
||||
region.flushcache(false);
|
||||
}
|
||||
}
|
||||
System.out.println("Rolling log...");
|
||||
region.log.rollWriter();
|
||||
region.compactStores();
|
||||
region.close();
|
||||
region.getLog().close();
|
||||
fs.delete(new Path(region.getRegionDir(), HConstants.HREGION_LOGDIR_NAME));
|
||||
region.getRegionInfo().offLine = true;
|
||||
return region;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue