HBASE-722 Shutdown and Compactions

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@712720 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2008-11-10 18:56:05 +00:00
parent d2e77503f8
commit 5ec924f50b
4 changed files with 258 additions and 222 deletions

View File

@ -66,6 +66,7 @@ Release 0.19.0 - Unreleased
HBASE-984 Fix javadoc warnings
HBASE-985 Fix javadoc warnings
HBASE-951 Either shut down master or let it finish cleanup
HBASE-964 Startup stuck "waiting for root region"
IMPROVEMENTS
HBASE-901 Add a limit to key length, check key and value length on client side
@ -104,7 +105,7 @@ Release 0.19.0 - Unreleased
HBASE-975 Improve MapFile performance for start and end key
HBASE-961 Delete multiple columns by regular expression
(Samuel Guo via Stack)
HBASE-964 Startup stuck "waiting for root region"
HBASE-722 Shutdown and Compactions
NEW FEATURES
HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters]

View File

@ -73,11 +73,11 @@ class CompactSplitThread extends Thread implements HConstants {
@Override
public void run() {
while (!server.isStopRequested()) {
while (!this.server.isStopRequested()) {
HRegion r = null;
try {
r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
if (r != null) {
if (r != null && !this.server.isStopRequested()) {
synchronized (regionsInQueue) {
regionsInQueue.remove(r);
}
@ -85,7 +85,7 @@ class CompactSplitThread extends Thread implements HConstants {
try {
// Don't interrupt us while we are working
byte [] midKey = r.compactStores();
if (midKey != null) {
if (midKey != null && !this.server.isStopRequested()) {
split(r, midKey);
}
} finally {
@ -119,6 +119,9 @@ class CompactSplitThread extends Thread implements HConstants {
* @param r HRegion store belongs to
*/
public synchronized void compactionRequested(HRegion r) {
if (this.server.stopRequested.get()) {
return;
}
LOG.debug("Compaction requested for region: " +
Bytes.toString(r.getRegionName()));
synchronized (regionsInQueue) {

View File

@ -112,213 +112,6 @@ public class HRegion implements HConstants {
final AtomicBoolean closed = new AtomicBoolean(false);
private final RegionHistorian historian;
/**
* Merge two HRegions. The regions must be adjacent andmust not overlap.
*
* @param srcA
* @param srcB
* @return new merged HRegion
* @throws IOException
*/
public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
throws IOException {
HRegion a = srcA;
HRegion b = srcB;
// Make sure that srcA comes first; important for key-ordering during
// write of the merged file.
if (srcA.getStartKey() == null) {
if (srcB.getStartKey() == null) {
throw new IOException("Cannot merge two regions with null start key");
}
// A's start key is null but B's isn't. Assume A comes before B
} else if ((srcB.getStartKey() == null) // A is not null but B is
|| (HStoreKey.compareTwoRowKeys(srcA.getRegionInfo(),
srcA.getStartKey(), srcB.getStartKey()) > 0)) { // A > B
a = srcB;
b = srcA;
}
if (!HStoreKey.equalsTwoRowKeys(srcA.getRegionInfo(),
a.getEndKey(), b.getStartKey())) {
throw new IOException("Cannot merge non-adjacent regions");
}
return merge(a, b);
}
/**
* Merge two regions whether they are adjacent or not.
*
* @param a region a
* @param b region b
* @return new merged region
* @throws IOException
*/
public static HRegion merge(HRegion a, HRegion b) throws IOException {
if (!a.getRegionInfo().getTableDesc().getNameAsString().equals(
b.getRegionInfo().getTableDesc().getNameAsString())) {
throw new IOException("Regions do not belong to the same table");
}
FileSystem fs = a.getFilesystem();
// Make sure each region's cache is empty
a.flushcache();
b.flushcache();
// Compact each region so we only have one store file per family
a.compactStores(true);
if (LOG.isDebugEnabled()) {
LOG.debug("Files for region: " + a);
listPaths(fs, a.getRegionDir());
}
b.compactStores(true);
if (LOG.isDebugEnabled()) {
LOG.debug("Files for region: " + b);
listPaths(fs, b.getRegionDir());
}
HBaseConfiguration conf = a.getConf();
HTableDescriptor tabledesc = a.getTableDesc();
HLog log = a.getLog();
Path basedir = a.getBaseDir();
final byte [] startKey = HStoreKey.equalsTwoRowKeys(a.getRegionInfo(),
a.getStartKey(), EMPTY_BYTE_ARRAY) ||
HStoreKey.equalsTwoRowKeys(a.getRegionInfo(),
b.getStartKey(), EMPTY_BYTE_ARRAY) ? EMPTY_BYTE_ARRAY :
HStoreKey.compareTwoRowKeys(a.getRegionInfo(), a.getStartKey(),
b.getStartKey()) <= 0 ?
a.getStartKey() : b.getStartKey();
final byte [] endKey = HStoreKey.equalsTwoRowKeys(a.getRegionInfo(),
a.getEndKey(), EMPTY_BYTE_ARRAY) ||
HStoreKey.equalsTwoRowKeys(b.getRegionInfo(), b.getEndKey(),
EMPTY_BYTE_ARRAY) ? EMPTY_BYTE_ARRAY :
HStoreKey.compareTwoRowKeys(a.getRegionInfo(), a.getEndKey(),
b.getEndKey()) <= 0 ?
b.getEndKey() : a.getEndKey();
HRegionInfo newRegionInfo = new HRegionInfo(tabledesc, startKey, endKey);
LOG.info("Creating new region " + newRegionInfo.toString());
int encodedName = newRegionInfo.getEncodedName();
Path newRegionDir = HRegion.getRegionDir(a.getBaseDir(), encodedName);
if(fs.exists(newRegionDir)) {
throw new IOException("Cannot merge; target file collision at " +
newRegionDir);
}
fs.mkdirs(newRegionDir);
LOG.info("starting merge of regions: " + a + " and " + b +
" into new region " + newRegionInfo.toString() +
" with start key <" + startKey + "> and end key <" + endKey + ">");
// Move HStoreFiles under new region directory
Map<byte [], List<HStoreFile>> byFamily =
new TreeMap<byte [], List<HStoreFile>>(Bytes.BYTES_COMPARATOR);
byFamily = filesByFamily(byFamily, a.close());
byFamily = filesByFamily(byFamily, b.close());
for (Map.Entry<byte [], List<HStoreFile>> es : byFamily.entrySet()) {
byte [] colFamily = es.getKey();
makeColumnFamilyDirs(fs, basedir, encodedName, colFamily, tabledesc);
// Because we compacted the source regions we should have no more than two
// HStoreFiles per family and there will be no reference store
List<HStoreFile> srcFiles = es.getValue();
if (srcFiles.size() == 2) {
long seqA = srcFiles.get(0).loadInfo(fs);
long seqB = srcFiles.get(1).loadInfo(fs);
if (seqA == seqB) {
// We can't have duplicate sequence numbers
if (LOG.isDebugEnabled()) {
LOG.debug("Adjusting sequence id of storeFile " + srcFiles.get(1) +
" down by one; sequence id A=" + seqA + ", sequence id B=" +
seqB);
}
srcFiles.get(1).writeInfo(fs, seqB - 1);
}
}
for (HStoreFile hsf: srcFiles) {
HStoreFile dst = new HStoreFile(conf, fs, basedir,
newRegionInfo, colFamily, -1, null);
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming " + hsf + " to " + dst);
}
hsf.rename(fs, dst);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Files for new region");
listPaths(fs, newRegionDir);
}
HRegion dstRegion = new HRegion(basedir, log, fs, conf, newRegionInfo, null);
dstRegion.initialize(null, null);
dstRegion.compactStores();
if (LOG.isDebugEnabled()) {
LOG.debug("Files for new region");
listPaths(fs, dstRegion.getRegionDir());
}
deleteRegion(fs, a.getRegionDir());
deleteRegion(fs, b.getRegionDir());
LOG.info("merge completed. New region is " + dstRegion);
return dstRegion;
}
/*
* Fills a map with a vector of store files keyed by column family.
* @param byFamily Map to fill.
* @param storeFiles Store files to process.
* @return Returns <code>byFamily</code>
*/
private static Map<byte [], List<HStoreFile>> filesByFamily(
Map<byte [], List<HStoreFile>> byFamily, List<HStoreFile> storeFiles) {
for (HStoreFile src: storeFiles) {
List<HStoreFile> v = byFamily.get(src.getColFamily());
if (v == null) {
v = new ArrayList<HStoreFile>();
byFamily.put(src.getColFamily(), v);
}
v.add(src);
}
return byFamily;
}
/*
* Method to list files in use by region
*/
static void listFiles(FileSystem fs, HRegion r) throws IOException {
listPaths(fs, r.getRegionDir());
}
/*
* List the files under the specified directory
*
* @param fs
* @param dir
* @throws IOException
*/
private static void listPaths(FileSystem fs, Path dir) throws IOException {
if (LOG.isDebugEnabled()) {
FileStatus[] stats = fs.listStatus(dir);
if (stats == null || stats.length == 0) {
return;
}
for (int i = 0; i < stats.length; i++) {
String path = stats[i].getPath().toString();
if (stats[i].isDir()) {
LOG.debug("d " + path);
listPaths(fs, stats[i].getPath());
} else {
LOG.debug("f " + path + " size=" + stats[i].getLen());
}
}
}
}
//////////////////////////////////////////////////////////////////////////////
// Members
//////////////////////////////////////////////////////////////////////////////
@ -911,7 +704,6 @@ public class HRegion implements HConstants {
String timeTaken = StringUtils.formatTimeDiff(System.currentTimeMillis(),
startTime);
LOG.info("compaction completed on region " + this + " in " + timeTaken);
this.historian.addRegionCompaction(regionInfo, timeTaken);
} finally {
synchronized (writestate) {
@ -2412,4 +2204,211 @@ public class HRegion implements HConstants {
fs.mkdirs(HStoreFile.getMapDir(basedir, encodedRegionName, colFamily));
fs.mkdirs(HStoreFile.getInfoDir(basedir, encodedRegionName, colFamily));
}
/**
* Merge two HRegions. The regions must be adjacent andmust not overlap.
*
* @param srcA
* @param srcB
* @return new merged HRegion
* @throws IOException
*/
public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB)
throws IOException {
HRegion a = srcA;
HRegion b = srcB;
// Make sure that srcA comes first; important for key-ordering during
// write of the merged file.
if (srcA.getStartKey() == null) {
if (srcB.getStartKey() == null) {
throw new IOException("Cannot merge two regions with null start key");
}
// A's start key is null but B's isn't. Assume A comes before B
} else if ((srcB.getStartKey() == null) // A is not null but B is
|| (HStoreKey.compareTwoRowKeys(srcA.getRegionInfo(),
srcA.getStartKey(), srcB.getStartKey()) > 0)) { // A > B
a = srcB;
b = srcA;
}
if (!HStoreKey.equalsTwoRowKeys(srcA.getRegionInfo(),
a.getEndKey(), b.getStartKey())) {
throw new IOException("Cannot merge non-adjacent regions");
}
return merge(a, b);
}
/**
* Merge two regions whether they are adjacent or not.
*
* @param a region a
* @param b region b
* @return new merged region
* @throws IOException
*/
public static HRegion merge(HRegion a, HRegion b) throws IOException {
if (!a.getRegionInfo().getTableDesc().getNameAsString().equals(
b.getRegionInfo().getTableDesc().getNameAsString())) {
throw new IOException("Regions do not belong to the same table");
}
FileSystem fs = a.getFilesystem();
// Make sure each region's cache is empty
a.flushcache();
b.flushcache();
// Compact each region so we only have one store file per family
a.compactStores(true);
if (LOG.isDebugEnabled()) {
LOG.debug("Files for region: " + a);
listPaths(fs, a.getRegionDir());
}
b.compactStores(true);
if (LOG.isDebugEnabled()) {
LOG.debug("Files for region: " + b);
listPaths(fs, b.getRegionDir());
}
HBaseConfiguration conf = a.getConf();
HTableDescriptor tabledesc = a.getTableDesc();
HLog log = a.getLog();
Path basedir = a.getBaseDir();
final byte [] startKey = HStoreKey.equalsTwoRowKeys(a.getRegionInfo(),
a.getStartKey(), EMPTY_BYTE_ARRAY) ||
HStoreKey.equalsTwoRowKeys(a.getRegionInfo(),
b.getStartKey(), EMPTY_BYTE_ARRAY) ? EMPTY_BYTE_ARRAY :
HStoreKey.compareTwoRowKeys(a.getRegionInfo(), a.getStartKey(),
b.getStartKey()) <= 0 ?
a.getStartKey() : b.getStartKey();
final byte [] endKey = HStoreKey.equalsTwoRowKeys(a.getRegionInfo(),
a.getEndKey(), EMPTY_BYTE_ARRAY) ||
HStoreKey.equalsTwoRowKeys(b.getRegionInfo(), b.getEndKey(),
EMPTY_BYTE_ARRAY) ? EMPTY_BYTE_ARRAY :
HStoreKey.compareTwoRowKeys(a.getRegionInfo(), a.getEndKey(),
b.getEndKey()) <= 0 ?
b.getEndKey() : a.getEndKey();
HRegionInfo newRegionInfo = new HRegionInfo(tabledesc, startKey, endKey);
LOG.info("Creating new region " + newRegionInfo.toString());
int encodedName = newRegionInfo.getEncodedName();
Path newRegionDir = HRegion.getRegionDir(a.getBaseDir(), encodedName);
if(fs.exists(newRegionDir)) {
throw new IOException("Cannot merge; target file collision at " +
newRegionDir);
}
fs.mkdirs(newRegionDir);
LOG.info("starting merge of regions: " + a + " and " + b +
" into new region " + newRegionInfo.toString() +
" with start key <" + startKey + "> and end key <" + endKey + ">");
// Move HStoreFiles under new region directory
Map<byte [], List<HStoreFile>> byFamily =
new TreeMap<byte [], List<HStoreFile>>(Bytes.BYTES_COMPARATOR);
byFamily = filesByFamily(byFamily, a.close());
byFamily = filesByFamily(byFamily, b.close());
for (Map.Entry<byte [], List<HStoreFile>> es : byFamily.entrySet()) {
byte [] colFamily = es.getKey();
makeColumnFamilyDirs(fs, basedir, encodedName, colFamily, tabledesc);
// Because we compacted the source regions we should have no more than two
// HStoreFiles per family and there will be no reference store
List<HStoreFile> srcFiles = es.getValue();
if (srcFiles.size() == 2) {
long seqA = srcFiles.get(0).loadInfo(fs);
long seqB = srcFiles.get(1).loadInfo(fs);
if (seqA == seqB) {
// We can't have duplicate sequence numbers
if (LOG.isDebugEnabled()) {
LOG.debug("Adjusting sequence id of storeFile " + srcFiles.get(1) +
" down by one; sequence id A=" + seqA + ", sequence id B=" +
seqB);
}
srcFiles.get(1).writeInfo(fs, seqB - 1);
}
}
for (HStoreFile hsf: srcFiles) {
HStoreFile dst = new HStoreFile(conf, fs, basedir,
newRegionInfo, colFamily, -1, null);
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming " + hsf + " to " + dst);
}
hsf.rename(fs, dst);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Files for new region");
listPaths(fs, newRegionDir);
}
HRegion dstRegion = new HRegion(basedir, log, fs, conf, newRegionInfo, null);
dstRegion.initialize(null, null);
dstRegion.compactStores();
if (LOG.isDebugEnabled()) {
LOG.debug("Files for new region");
listPaths(fs, dstRegion.getRegionDir());
}
deleteRegion(fs, a.getRegionDir());
deleteRegion(fs, b.getRegionDir());
LOG.info("merge completed. New region is " + dstRegion);
return dstRegion;
}
/*
* Fills a map with a vector of store files keyed by column family.
* @param byFamily Map to fill.
* @param storeFiles Store files to process.
* @return Returns <code>byFamily</code>
*/
private static Map<byte [], List<HStoreFile>> filesByFamily(
Map<byte [], List<HStoreFile>> byFamily, List<HStoreFile> storeFiles) {
for (HStoreFile src: storeFiles) {
List<HStoreFile> v = byFamily.get(src.getColFamily());
if (v == null) {
v = new ArrayList<HStoreFile>();
byFamily.put(src.getColFamily(), v);
}
v.add(src);
}
return byFamily;
}
/*
* Method to list files in use by region
*/
static void listFiles(FileSystem fs, HRegion r) throws IOException {
listPaths(fs, r.getRegionDir());
}
/*
* List the files under the specified directory
*
* @param fs
* @param dir
* @throws IOException
*/
private static void listPaths(FileSystem fs, Path dir) throws IOException {
if (LOG.isDebugEnabled()) {
FileStatus[] stats = fs.listStatus(dir);
if (stats == null || stats.length == 0) {
return;
}
for (int i = 0; i < stats.length; i++) {
String path = stats[i].getPath().toString();
if (stats[i].isDir()) {
LOG.debug("d " + path);
listPaths(fs, stats[i].getPath());
} else {
LOG.debug("f " + path + " size=" + stats[i].getLen());
}
}
}
}
}

View File

@ -1056,6 +1056,31 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
return regionsToClose;
}
/*
* Thread to run close of a region.
*/
private static class RegionCloserThread extends Thread {
private final HRegion r;
public RegionCloserThread(final HRegion r) {
super(Thread.currentThread().getName() + ".regionCloser." + r.toString());
this.r = r;
}
@Override
public void run() {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Closing region " + r.toString());
}
r.close();
} catch (IOException e) {
LOG.error("Error closing region " + r.toString(),
RemoteExceptionHandler.checkIOException(e));
}
}
}
/** Called as the first stage of cluster shutdown. */
void closeUserRegions() {
ArrayList<HRegion> regionsToClose = new ArrayList<HRegion>();
@ -1075,15 +1100,23 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
} finally {
this.lock.writeLock().unlock();
}
for(HRegion region: regionsToClose) {
if (LOG.isDebugEnabled()) {
LOG.debug("closing region " + Bytes.toString(region.getRegionName()));
}
// Run region closes in parallel.
Set<Thread> threads = new HashSet<Thread>();
try {
region.close();
} catch (IOException e) {
LOG.error("error closing region " + region.getRegionName(),
RemoteExceptionHandler.checkIOException(e));
for (final HRegion r : regionsToClose) {
RegionCloserThread t = new RegionCloserThread(r);
t.start();
threads.add(t);
}
} finally {
for (Thread t : threads) {
while (t.isAlive()) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
this.quiesced.set(true);
@ -1620,7 +1653,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// Copy over all regions. Regions are sorted by size with biggest first.
synchronized (this.onlineRegions) {
for (HRegion region : this.onlineRegions.values()) {
sortedRegions.put(region.memcacheSize.get(), region);
sortedRegions.put(Long.valueOf(region.memcacheSize.get()), region);
}
}
return sortedRegions;