HBASE-2461 Split doesn't handle IOExceptions when creating new region reference files
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@982089 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
77aef8f799
commit
f7b8d9cf7a
@ -464,6 +464,8 @@ Release 0.21.0 - Unreleased
|
||||
HBASE-2755 Duplicate assignment of a region after region server recovery
|
||||
(Kannan Muthukkaruppan via Stack)
|
||||
HBASE-2892 Replication metrics aren't updated
|
||||
HBASE-2461 Split doesn't handle IOExceptions when creating new region
|
||||
reference files
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1760 Cleanup TODOs in HTable
|
||||
|
@ -19,17 +19,6 @@
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
@ -37,14 +26,18 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Compact region on request and then run split if appropriate
|
||||
*/
|
||||
class CompactSplitThread extends Thread {
|
||||
static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
|
||||
|
||||
private HTable root = null;
|
||||
private HTable meta = null;
|
||||
private final long frequency;
|
||||
private final ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
@ -68,7 +61,6 @@ class CompactSplitThread extends Thread {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
int count = 0;
|
||||
while (!this.server.isStopRequested()) {
|
||||
HRegion r = null;
|
||||
try {
|
||||
@ -144,78 +136,39 @@ class CompactSplitThread extends Thread {
|
||||
}
|
||||
}
|
||||
|
||||
private void split(final HRegion region, final byte [] midKey)
|
||||
private void split(final HRegion parent, final byte [] midKey)
|
||||
throws IOException {
|
||||
final HRegionInfo oldRegionInfo = region.getRegionInfo();
|
||||
final long startTime = System.currentTimeMillis();
|
||||
final HRegion[] newRegions = region.splitRegion(midKey);
|
||||
if (newRegions == null) {
|
||||
// Didn't need to be split
|
||||
SplitTransaction st = new SplitTransaction(parent, midKey);
|
||||
// If prepare does not return true, for some reason -- logged inside in
|
||||
// the prepare call -- we are not ready to split just now. Just return.
|
||||
if (!st.prepare()) return;
|
||||
try {
|
||||
st.execute(this.server);
|
||||
} catch (IOException ioe) {
|
||||
try {
|
||||
LOG.info("Running rollback of failed split of " +
|
||||
parent.getRegionNameAsString() + "; " + ioe.getMessage());
|
||||
st.rollback(this.server);
|
||||
LOG.info("Successful rollback of failed split of " +
|
||||
parent.getRegionNameAsString());
|
||||
} catch (RuntimeException e) {
|
||||
// If failed rollback, kill this server to avoid having a hole in table.
|
||||
LOG.info("Failed rollback of failed split of " +
|
||||
parent.getRegionNameAsString() + " -- aborting server", e);
|
||||
this.server.abort("Failed split");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// When a region is split, the META table needs to updated if we're
|
||||
// splitting a 'normal' region, and the ROOT table needs to be
|
||||
// updated if we are splitting a META region.
|
||||
HTable t = null;
|
||||
if (region.getRegionInfo().isMetaTable()) {
|
||||
// We need to update the root region
|
||||
if (this.root == null) {
|
||||
this.root = new HTable(conf, HConstants.ROOT_TABLE_NAME);
|
||||
}
|
||||
t = root;
|
||||
} else {
|
||||
// For normal regions we need to update the meta region
|
||||
if (meta == null) {
|
||||
meta = new HTable(conf, HConstants.META_TABLE_NAME);
|
||||
}
|
||||
t = meta;
|
||||
}
|
||||
|
||||
// Mark old region as offline and split in META.
|
||||
// NOTE: there is no need for retry logic here. HTable does it for us.
|
||||
oldRegionInfo.setOffline(true);
|
||||
oldRegionInfo.setSplit(true);
|
||||
// Inform the HRegionServer that the parent HRegion is no-longer online.
|
||||
this.server.removeFromOnlineRegions(oldRegionInfo);
|
||||
|
||||
Put put = new Put(oldRegionInfo.getRegionName());
|
||||
put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
|
||||
Writables.getBytes(oldRegionInfo));
|
||||
put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
|
||||
HConstants.EMPTY_BYTE_ARRAY);
|
||||
put.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
|
||||
HConstants.EMPTY_BYTE_ARRAY);
|
||||
put.add(HConstants.CATALOG_FAMILY, HConstants.SPLITA_QUALIFIER,
|
||||
Writables.getBytes(newRegions[0].getRegionInfo()));
|
||||
put.add(HConstants.CATALOG_FAMILY, HConstants.SPLITB_QUALIFIER,
|
||||
Writables.getBytes(newRegions[1].getRegionInfo()));
|
||||
t.put(put);
|
||||
|
||||
// If we crash here, then the daughters will not be added and we'll have
|
||||
// and offlined parent but no daughters to take up the slack. hbase-2244
|
||||
// adds fixup to the metascanners.
|
||||
|
||||
// Add new regions to META
|
||||
for (int i = 0; i < newRegions.length; i++) {
|
||||
put = new Put(newRegions[i].getRegionName());
|
||||
put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
|
||||
Writables.getBytes(newRegions[i].getRegionInfo()));
|
||||
t.put(put);
|
||||
}
|
||||
|
||||
// If we crash here, the master will not know of the new daughters and they
|
||||
// will not be assigned. The metascanner when it runs will notice and take
|
||||
// care of assigning the new daughters.
|
||||
|
||||
// Now tell the master about the new regions
|
||||
server.reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(),
|
||||
newRegions[1].getRegionInfo());
|
||||
|
||||
LOG.info("region split, META updated, and report to master all" +
|
||||
" successful. Old region=" + oldRegionInfo.toString() +
|
||||
", new regions: " + newRegions[0].toString() + ", " +
|
||||
newRegions[1].toString() + ". Split took " +
|
||||
// Now tell the master about the new regions. If we fail here, its OK.
|
||||
// Basescanner will do fix up. And reporting split to master is going away.
|
||||
// TODO: Verify this still holds in new master rewrite.
|
||||
this.server.reportSplit(parent.getRegionInfo(), st.getFirstDaughter(),
|
||||
st.getSecondDaughter());
|
||||
LOG.info("Region split, META updated, and report to master. Parent=" +
|
||||
parent.getRegionInfo() + ", new regions: " +
|
||||
st.getFirstDaughter() + ", " + st.getSecondDaughter() + ". Split took " +
|
||||
StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
|
||||
}
|
||||
|
||||
|
@ -67,7 +67,6 @@ import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.io.Reference.Range;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
@ -82,6 +81,7 @@ import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.eclipse.jdt.core.dom.ThisExpression;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
@ -123,7 +123,6 @@ import com.google.common.collect.Lists;
|
||||
*/
|
||||
public class HRegion implements HeapSize { // , Writable{
|
||||
public static final Log LOG = LogFactory.getLog(HRegion.class);
|
||||
static final String SPLITDIR = "splits";
|
||||
static final String MERGEDIR = "merges";
|
||||
|
||||
final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
@ -218,7 +217,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||
private final long blockingMemStoreSize;
|
||||
final long threadWakeFrequency;
|
||||
// Used to guard splits and closes
|
||||
private final ReentrantReadWriteLock splitsAndClosesLock =
|
||||
final ReentrantReadWriteLock splitsAndClosesLock =
|
||||
new ReentrantReadWriteLock();
|
||||
private final ReentrantReadWriteLock newScannerLock =
|
||||
new ReentrantReadWriteLock();
|
||||
@ -226,7 +225,6 @@ public class HRegion implements HeapSize { // , Writable{
|
||||
// Stop updates lock
|
||||
private final ReentrantReadWriteLock updatesLock =
|
||||
new ReentrantReadWriteLock();
|
||||
private final Object splitLock = new Object();
|
||||
private boolean splitRequest;
|
||||
|
||||
private final ReadWriteConsistencyControl rwcc =
|
||||
@ -291,7 +289,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||
this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY,
|
||||
10 * 1000);
|
||||
String encodedNameStr = this.regionInfo.getEncodedName();
|
||||
this.regiondir = new Path(tableDir, encodedNameStr);
|
||||
this.regiondir = getRegionDir(this.tableDir, encodedNameStr);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
// Write out region name as string and its encoded name.
|
||||
LOG.debug("Creating region " + this);
|
||||
@ -324,12 +322,16 @@ public class HRegion implements HeapSize { // , Writable{
|
||||
*/
|
||||
public long initialize(final Progressable reporter)
|
||||
throws IOException {
|
||||
// A region can be reopened if failed a split; reset flags
|
||||
this.closing.set(false);
|
||||
this.closed.set(false);
|
||||
|
||||
// Write HRI to a file in case we need to recover .META.
|
||||
checkRegioninfoOnFilesystem();
|
||||
|
||||
// Remove temporary data left over from old regions
|
||||
cleanupTmpDir();
|
||||
|
||||
|
||||
// Load in all the HStores. Get maximum seqid.
|
||||
long maxSeqId = -1;
|
||||
for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
|
||||
@ -346,7 +348,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||
// Get rid of any splits or merges that were lost in-progress. Clean out
|
||||
// these directories here on open. We may be opening a region that was
|
||||
// being split but we crashed in the middle of it all.
|
||||
FSUtils.deleteDirectory(this.fs, new Path(regiondir, SPLITDIR));
|
||||
SplitTransaction.cleanupAnySplitDetritus(this);
|
||||
FSUtils.deleteDirectory(this.fs, new Path(regiondir, MERGEDIR));
|
||||
|
||||
// See if region is meant to run read-only.
|
||||
@ -369,7 +371,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||
* @param initialFiles
|
||||
* @throws IOException
|
||||
*/
|
||||
private static void moveInitialFilesIntoPlace(final FileSystem fs,
|
||||
static void moveInitialFilesIntoPlace(final FileSystem fs,
|
||||
final Path initialFiles, final Path regiondir)
|
||||
throws IOException {
|
||||
if (initialFiles != null && fs.exists(initialFiles)) {
|
||||
@ -468,71 +470,69 @@ public class HRegion implements HeapSize { // , Writable{
|
||||
*
|
||||
* @throws IOException e
|
||||
*/
|
||||
public List<StoreFile> close(final boolean abort) throws IOException {
|
||||
public synchronized List<StoreFile> close(final boolean abort)
|
||||
throws IOException {
|
||||
if (isClosed()) {
|
||||
LOG.warn("region " + this + " already closed");
|
||||
LOG.warn("Region " + this + " already closed");
|
||||
return null;
|
||||
}
|
||||
synchronized (splitLock) {
|
||||
boolean wasFlushing = false;
|
||||
synchronized (writestate) {
|
||||
// Disable compacting and flushing by background threads for this
|
||||
// region.
|
||||
writestate.writesEnabled = false;
|
||||
wasFlushing = writestate.flushing;
|
||||
LOG.debug("Closing " + this + ": disabling compactions & flushes");
|
||||
while (writestate.compacting || writestate.flushing) {
|
||||
LOG.debug("waiting for" +
|
||||
(writestate.compacting ? " compaction" : "") +
|
||||
(writestate.flushing ?
|
||||
(writestate.compacting ? "," : "") + " cache flush" :
|
||||
"") + " to complete for region " + this);
|
||||
try {
|
||||
writestate.wait();
|
||||
} catch (InterruptedException iex) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
}
|
||||
// If we were not just flushing, is it worth doing a preflush...one
|
||||
// that will clear out of the bulk of the memstore before we put up
|
||||
// the close flag?
|
||||
if (!abort && !wasFlushing && worthPreFlushing()) {
|
||||
LOG.info("Running close preflush of " + this.getRegionNameAsString());
|
||||
internalFlushcache();
|
||||
}
|
||||
newScannerLock.writeLock().lock();
|
||||
this.closing.set(true);
|
||||
try {
|
||||
splitsAndClosesLock.writeLock().lock();
|
||||
LOG.debug("Updates disabled for region, no outstanding scanners on " +
|
||||
this);
|
||||
boolean wasFlushing = false;
|
||||
synchronized (writestate) {
|
||||
// Disable compacting and flushing by background threads for this
|
||||
// region.
|
||||
writestate.writesEnabled = false;
|
||||
wasFlushing = writestate.flushing;
|
||||
LOG.debug("Closing " + this + ": disabling compactions & flushes");
|
||||
while (writestate.compacting || writestate.flushing) {
|
||||
LOG.debug("waiting for" +
|
||||
(writestate.compacting ? " compaction" : "") +
|
||||
(writestate.flushing ?
|
||||
(writestate.compacting ? "," : "") + " cache flush" :
|
||||
"") + " to complete for region " + this);
|
||||
try {
|
||||
// Write lock means no more row locks can be given out. Wait on
|
||||
// outstanding row locks to come in before we close so we do not drop
|
||||
// outstanding updates.
|
||||
waitOnRowLocks();
|
||||
LOG.debug("No more row locks outstanding on region " + this);
|
||||
|
||||
// Don't flush the cache if we are aborting
|
||||
if (!abort) {
|
||||
internalFlushcache();
|
||||
}
|
||||
|
||||
List<StoreFile> result = new ArrayList<StoreFile>();
|
||||
for (Store store: stores.values()) {
|
||||
result.addAll(store.close());
|
||||
}
|
||||
this.closed.set(true);
|
||||
LOG.info("Closed " + this);
|
||||
return result;
|
||||
} finally {
|
||||
splitsAndClosesLock.writeLock().unlock();
|
||||
writestate.wait();
|
||||
} catch (InterruptedException iex) {
|
||||
// continue
|
||||
}
|
||||
} finally {
|
||||
newScannerLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
// If we were not just flushing, is it worth doing a preflush...one
|
||||
// that will clear out of the bulk of the memstore before we put up
|
||||
// the close flag?
|
||||
if (!abort && !wasFlushing && worthPreFlushing()) {
|
||||
LOG.info("Running close preflush of " + this.getRegionNameAsString());
|
||||
internalFlushcache();
|
||||
}
|
||||
newScannerLock.writeLock().lock();
|
||||
this.closing.set(true);
|
||||
try {
|
||||
splitsAndClosesLock.writeLock().lock();
|
||||
LOG.debug("Updates disabled for region, no outstanding scanners on " + this);
|
||||
try {
|
||||
// Write lock means no more row locks can be given out. Wait on
|
||||
// outstanding row locks to come in before we close so we do not drop
|
||||
// outstanding updates.
|
||||
waitOnRowLocks();
|
||||
LOG.debug("No more row locks outstanding on region " + this);
|
||||
|
||||
// Don't flush the cache if we are aborting
|
||||
if (!abort) {
|
||||
internalFlushcache();
|
||||
}
|
||||
|
||||
List<StoreFile> result = new ArrayList<StoreFile>();
|
||||
for (Store store: stores.values()) {
|
||||
result.addAll(store.close());
|
||||
}
|
||||
this.closed.set(true);
|
||||
LOG.info("Closed " + this);
|
||||
return result;
|
||||
} finally {
|
||||
splitsAndClosesLock.writeLock().unlock();
|
||||
}
|
||||
} finally {
|
||||
newScannerLock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -592,6 +592,17 @@ public class HRegion implements HeapSize { // , Writable{
|
||||
return this.regiondir;
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the Path of the HRegion
|
||||
*
|
||||
* @param tabledir qualified path for table
|
||||
* @param name ENCODED region name
|
||||
* @return Path of HRegion directory
|
||||
*/
|
||||
public static Path getRegionDir(final Path tabledir, final String name) {
|
||||
return new Path(tabledir, name);
|
||||
}
|
||||
|
||||
/** @return FileSystem being used by this region */
|
||||
public FileSystem getFilesystem() {
|
||||
return this.fs;
|
||||
@ -621,113 +632,6 @@ public class HRegion implements HeapSize { // , Writable{
|
||||
return size;
|
||||
}
|
||||
|
||||
/*
|
||||
* Split the HRegion to create two brand-new ones. This also closes
|
||||
* current HRegion. Split should be fast since we don't rewrite store files
|
||||
* but instead create new 'reference' store files that read off the top and
|
||||
* bottom ranges of parent store files.
|
||||
* @param splitRow row on which to split region
|
||||
* @return two brand-new HRegions or null if a split is not needed
|
||||
* @throws IOException
|
||||
*/
|
||||
HRegion [] splitRegion(final byte [] splitRow) throws IOException {
|
||||
prepareToSplit();
|
||||
synchronized (splitLock) {
|
||||
if (closed.get()) {
|
||||
return null;
|
||||
}
|
||||
// Add start/end key checking: hbase-428.
|
||||
byte [] startKey = this.regionInfo.getStartKey();
|
||||
byte [] endKey = this.regionInfo.getEndKey();
|
||||
if (this.comparator.matchingRows(startKey, 0, startKey.length,
|
||||
splitRow, 0, splitRow.length)) {
|
||||
LOG.debug("Startkey and midkey are same, not splitting");
|
||||
return null;
|
||||
}
|
||||
if (this.comparator.matchingRows(splitRow, 0, splitRow.length,
|
||||
endKey, 0, endKey.length)) {
|
||||
LOG.debug("Endkey and midkey are same, not splitting");
|
||||
return null;
|
||||
}
|
||||
LOG.info("Starting split of region " + this);
|
||||
Path splits = new Path(this.regiondir, SPLITDIR);
|
||||
if(!this.fs.exists(splits)) {
|
||||
this.fs.mkdirs(splits);
|
||||
}
|
||||
// Calculate regionid to use. Can't be less than that of parent else
|
||||
// it'll insert into wrong location over in .META. table: HBASE-710.
|
||||
long rid = EnvironmentEdgeManager.currentTimeMillis();
|
||||
if (rid < this.regionInfo.getRegionId()) {
|
||||
LOG.warn("Clock skew; parent regions id is " +
|
||||
this.regionInfo.getRegionId() + " but current time here is " + rid);
|
||||
rid = this.regionInfo.getRegionId() + 1;
|
||||
}
|
||||
HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
|
||||
startKey, splitRow, false, rid);
|
||||
Path dirA = getSplitDirForDaughter(splits, regionAInfo);
|
||||
HRegionInfo regionBInfo = new HRegionInfo(this.regionInfo.getTableDesc(),
|
||||
splitRow, endKey, false, rid);
|
||||
Path dirB = getSplitDirForDaughter(splits, regionBInfo);
|
||||
|
||||
// Now close the HRegion. Close returns all store files or null if not
|
||||
// supposed to close (? What to do in this case? Implement abort of close?)
|
||||
// Close also does wait on outstanding rows and calls a flush just-in-case.
|
||||
List<StoreFile> hstoreFilesToSplit = close(false);
|
||||
if (hstoreFilesToSplit == null) {
|
||||
LOG.warn("Close came back null (Implement abort of close?)");
|
||||
throw new RuntimeException("close returned empty vector of HStoreFiles");
|
||||
}
|
||||
|
||||
// Split each store file.
|
||||
for(StoreFile h: hstoreFilesToSplit) {
|
||||
StoreFile.split(fs,
|
||||
Store.getStoreHomedir(splits, regionAInfo.getEncodedName(),
|
||||
h.getFamily()),
|
||||
h, splitRow, Range.bottom);
|
||||
StoreFile.split(fs,
|
||||
Store.getStoreHomedir(splits, regionBInfo.getEncodedName(),
|
||||
h.getFamily()),
|
||||
h, splitRow, Range.top);
|
||||
}
|
||||
|
||||
// Create a region instance and then move the splits into place under
|
||||
// regionA and regionB.
|
||||
HRegion regionA =
|
||||
HRegion.newHRegion(tableDir, log, fs, conf, regionAInfo, null);
|
||||
moveInitialFilesIntoPlace(this.fs, dirA, regionA.getRegionDir());
|
||||
HRegion regionB =
|
||||
HRegion.newHRegion(tableDir, log, fs, conf, regionBInfo, null);
|
||||
moveInitialFilesIntoPlace(this.fs, dirB, regionB.getRegionDir());
|
||||
|
||||
return new HRegion [] {regionA, regionB};
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Get the daughter directories in the splits dir. The splits dir is under
|
||||
* the parent regions' directory.
|
||||
* @param splits
|
||||
* @param hri
|
||||
* @return Path to split dir.
|
||||
* @throws IOException
|
||||
*/
|
||||
private Path getSplitDirForDaughter(final Path splits, final HRegionInfo hri)
|
||||
throws IOException {
|
||||
Path d =
|
||||
new Path(splits, hri.getEncodedName());
|
||||
if (fs.exists(d)) {
|
||||
// This should never happen; the splits dir will be newly made when we
|
||||
// come in here. Even if we crashed midway through a split, the reopen
|
||||
// of the parent region clears out the dir in its initialize method.
|
||||
throw new IOException("Cannot split; target file collision at " + d);
|
||||
}
|
||||
return d;
|
||||
}
|
||||
|
||||
protected void prepareToSplit() {
|
||||
// nothing
|
||||
}
|
||||
|
||||
/*
|
||||
* Do preparation for pending compaction.
|
||||
* @throws IOException
|
||||
@ -2686,17 +2590,6 @@ public class HRegion implements HeapSize { // , Writable{
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the Path of the HRegion
|
||||
*
|
||||
* @param tabledir qualified path for table
|
||||
* @param name ENCODED region name
|
||||
* @return Path of HRegion directory
|
||||
*/
|
||||
public static Path getRegionDir(final Path tabledir, final String name) {
|
||||
return new Path(tabledir, name);
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the Path of the HRegion
|
||||
*
|
||||
@ -3086,7 +2979,7 @@ public class HRegion implements HeapSize { // , Writable{
|
||||
|
||||
public static final long FIXED_OVERHEAD = ClassSize.align(
|
||||
(4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN +
|
||||
(20 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
|
||||
(19 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
|
||||
|
||||
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
|
||||
ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) +
|
||||
|
@ -122,7 +122,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState;
|
||||
* the HMaster. There are many HRegionServers in a single HBase deployment.
|
||||
*/
|
||||
public class HRegionServer implements HRegionInterface,
|
||||
HBaseRPCErrorHandler, Runnable, Watcher, Stoppable {
|
||||
HBaseRPCErrorHandler, Runnable, Watcher, Stoppable, OnlineRegions {
|
||||
public static final Log LOG = LogFactory.getLog(HRegionServer.class);
|
||||
private static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING);
|
||||
private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED);
|
||||
@ -1464,12 +1464,7 @@ public class HRegionServer implements HRegionInterface,
|
||||
}
|
||||
return;
|
||||
}
|
||||
this.lock.writeLock().lock();
|
||||
try {
|
||||
this.onlineRegions.put(mapKey, region);
|
||||
} finally {
|
||||
this.lock.writeLock().unlock();
|
||||
}
|
||||
addToOnlineRegions(region);
|
||||
}
|
||||
try {
|
||||
HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_OPEN, regionInfo);
|
||||
@ -2146,13 +2141,16 @@ public class HRegionServer implements HRegionInterface,
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method removes HRegion corresponding to hri from the Map of onlineRegions.
|
||||
*
|
||||
* @param hri the HRegionInfo corresponding to the HRegion to-be-removed.
|
||||
* @return the removed HRegion, or null if the HRegion was not in onlineRegions.
|
||||
*/
|
||||
HRegion removeFromOnlineRegions(HRegionInfo hri) {
|
||||
public void addToOnlineRegions(final HRegion r) {
|
||||
this.lock.writeLock().lock();
|
||||
try {
|
||||
this.onlineRegions.put(Bytes.mapKey(r.getRegionInfo().getRegionName()), r);
|
||||
} finally {
|
||||
this.lock.writeLock().unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public HRegion removeFromOnlineRegions(HRegionInfo hri) {
|
||||
this.lock.writeLock().lock();
|
||||
HRegion toReturn = null;
|
||||
try {
|
||||
|
@ -31,7 +31,6 @@ import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableSet;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -51,6 +50,7 @@ import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.ReadWriteConsistencyControl;
|
||||
@ -60,14 +60,14 @@ import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.mapred.MiniMRCluster;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.hadoop.hdfs.DFSClient;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.mapred.MiniMRCluster;
|
||||
import org.apache.hadoop.security.UnixUserGroupInformation;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
@ -500,6 +500,33 @@ public class HBaseTestingUtility {
|
||||
t.flushCommits();
|
||||
return rowCount;
|
||||
}
|
||||
/**
|
||||
* Load region with rows from 'aaa' to 'zzz'.
|
||||
* @param r Region
|
||||
* @param f Family
|
||||
* @return Count of rows loaded.
|
||||
* @throws IOException
|
||||
*/
|
||||
public int loadRegion(final HRegion r, final byte[] f)
|
||||
throws IOException {
|
||||
byte[] k = new byte[3];
|
||||
int rowCount = 0;
|
||||
for (byte b1 = 'a'; b1 <= 'z'; b1++) {
|
||||
for (byte b2 = 'a'; b2 <= 'z'; b2++) {
|
||||
for (byte b3 = 'a'; b3 <= 'z'; b3++) {
|
||||
k[0] = b1;
|
||||
k[1] = b2;
|
||||
k[2] = b3;
|
||||
Put put = new Put(k);
|
||||
put.add(f, null, k);
|
||||
if (r.getLog() == null) put.setWriteToWAL(false);
|
||||
r.put(put);
|
||||
rowCount++;
|
||||
}
|
||||
}
|
||||
}
|
||||
return rowCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the number of rows in the given table.
|
||||
|
@ -40,6 +40,13 @@ public class TestImmutableBytesWritable extends TestCase {
|
||||
new ImmutableBytesWritable(Bytes.toBytes("xxabc"), 2, 2).hashCode());
|
||||
}
|
||||
|
||||
public void testSpecificCompare() {
|
||||
ImmutableBytesWritable ibw1 = new ImmutableBytesWritable(new byte[]{0x0f});
|
||||
ImmutableBytesWritable ibw2 = new ImmutableBytesWritable(new byte[]{0x00, 0x00});
|
||||
ImmutableBytesWritable.Comparator c = new ImmutableBytesWritable.Comparator();
|
||||
assertFalse("ibw1 < ibw2", c.compare( ibw1, ibw2 ) < 0 );
|
||||
}
|
||||
|
||||
public void testComparison() throws Exception {
|
||||
runTests("aa", "b", -1);
|
||||
runTests("aa", "aa", 0);
|
||||
|
@ -50,9 +50,11 @@ import org.apache.hadoop.hbase.regionserver.HRegion.RegionScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
@ -1247,54 +1249,38 @@ public class TestHRegion extends HBaseTestCase {
|
||||
public void testMerge() throws IOException {
|
||||
byte [] tableName = Bytes.toBytes("testtable");
|
||||
byte [][] families = {fam1, fam2, fam3};
|
||||
|
||||
HBaseConfiguration hc = initSplit();
|
||||
//Setting up region
|
||||
String method = this.getName();
|
||||
initHRegion(tableName, method, hc, families);
|
||||
|
||||
try {
|
||||
LOG.info("" + addContent(region, fam3));
|
||||
region.flushcache();
|
||||
byte [] splitRow = region.compactStores();
|
||||
assertNotNull(splitRow);
|
||||
LOG.info("SplitRow: " + Bytes.toString(splitRow));
|
||||
HRegion [] regions = split(region, splitRow);
|
||||
HRegion [] subregions = splitRegion(region, splitRow);
|
||||
try {
|
||||
// Need to open the regions.
|
||||
// TODO: Add an 'open' to HRegion... don't do open by constructing
|
||||
// instance.
|
||||
for (int i = 0; i < regions.length; i++) {
|
||||
regions[i] = openClosedRegion(regions[i]);
|
||||
for (int i = 0; i < subregions.length; i++) {
|
||||
openClosedRegion(subregions[i]);
|
||||
subregions[i].compactStores();
|
||||
}
|
||||
Path oldRegionPath = region.getRegionDir();
|
||||
Path oldRegion1 = subregions[0].getRegionDir();
|
||||
Path oldRegion2 = subregions[1].getRegionDir();
|
||||
long startTime = System.currentTimeMillis();
|
||||
HRegion subregions [] = region.splitRegion(splitRow);
|
||||
if (subregions != null) {
|
||||
LOG.info("Split region elapsed time: "
|
||||
+ ((System.currentTimeMillis() - startTime) / 1000.0));
|
||||
assertEquals("Number of subregions", subregions.length, 2);
|
||||
for (int i = 0; i < subregions.length; i++) {
|
||||
subregions[i] = openClosedRegion(subregions[i]);
|
||||
subregions[i].compactStores();
|
||||
}
|
||||
|
||||
// Now merge it back together
|
||||
Path oldRegion1 = subregions[0].getRegionDir();
|
||||
Path oldRegion2 = subregions[1].getRegionDir();
|
||||
startTime = System.currentTimeMillis();
|
||||
region = HRegion.mergeAdjacent(subregions[0], subregions[1]);
|
||||
LOG.info("Merge regions elapsed time: " +
|
||||
((System.currentTimeMillis() - startTime) / 1000.0));
|
||||
fs.delete(oldRegion1, true);
|
||||
fs.delete(oldRegion2, true);
|
||||
fs.delete(oldRegionPath, true);
|
||||
}
|
||||
region = HRegion.mergeAdjacent(subregions[0], subregions[1]);
|
||||
LOG.info("Merge regions elapsed time: " +
|
||||
((System.currentTimeMillis() - startTime) / 1000.0));
|
||||
fs.delete(oldRegion1, true);
|
||||
fs.delete(oldRegion2, true);
|
||||
fs.delete(oldRegionPath, true);
|
||||
LOG.info("splitAndMerge completed.");
|
||||
} finally {
|
||||
for (int i = 0; i < regions.length; i++) {
|
||||
for (int i = 0; i < subregions.length; i++) {
|
||||
try {
|
||||
regions[i].close();
|
||||
subregions[i].close();
|
||||
} catch (IOException e) {
|
||||
// Ignore.
|
||||
}
|
||||
@ -1308,6 +1294,38 @@ public class TestHRegion extends HBaseTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param parent Region to split.
|
||||
* @param midkey Key to split around.
|
||||
* @return The Regions we created.
|
||||
* @throws IOException
|
||||
*/
|
||||
HRegion [] splitRegion(final HRegion parent, final byte [] midkey)
|
||||
throws IOException {
|
||||
PairOfSameType<HRegion> result = null;
|
||||
SplitTransaction st = new SplitTransaction(parent, midkey);
|
||||
// If prepare does not return true, for some reason -- logged inside in
|
||||
// the prepare call -- we are not ready to split just now. Just return.
|
||||
if (!st.prepare()) return null;
|
||||
try {
|
||||
result = st.execute(null);
|
||||
} catch (IOException ioe) {
|
||||
try {
|
||||
LOG.info("Running rollback of failed split of " +
|
||||
parent.getRegionNameAsString() + "; " + ioe.getMessage());
|
||||
st.rollback(null);
|
||||
LOG.info("Successful rollback of failed split of " +
|
||||
parent.getRegionNameAsString());
|
||||
return null;
|
||||
} catch (RuntimeException e) {
|
||||
// If failed rollback, kill this server to avoid having a hole in table.
|
||||
LOG.info("Failed rollback of failed split of " +
|
||||
parent.getRegionNameAsString() + " -- aborting server", e);
|
||||
}
|
||||
}
|
||||
return new HRegion [] {result.getFirst(), result.getSecond()};
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// Scanner tests
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
@ -2140,7 +2158,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||
byte [] splitRow = region.compactStores();
|
||||
assertNotNull(splitRow);
|
||||
LOG.info("SplitRow: " + Bytes.toString(splitRow));
|
||||
HRegion [] regions = split(region, splitRow);
|
||||
HRegion [] regions = splitRegion(region, splitRow);
|
||||
try {
|
||||
// Need to open the regions.
|
||||
// TODO: Add an 'open' to HRegion... don't do open by constructing
|
||||
@ -2180,7 +2198,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||
for (int i = 0; i < regions.length; i++) {
|
||||
HRegion[] rs = null;
|
||||
if (midkeys[i] != null) {
|
||||
rs = split(regions[i], midkeys[i]);
|
||||
rs = splitRegion(regions[i], midkeys[i]);
|
||||
for (int j = 0; j < rs.length; j++) {
|
||||
sortedMap.put(Bytes.toString(rs[j].getRegionName()),
|
||||
openClosedRegion(rs[j]));
|
||||
@ -2233,7 +2251,7 @@ public class TestHRegion extends HBaseTestCase {
|
||||
|
||||
HRegion [] regions = null;
|
||||
try {
|
||||
regions = region.splitRegion(Bytes.toBytes("" + splitRow));
|
||||
regions = splitRegion(region, Bytes.toBytes("" + splitRow));
|
||||
//Opening the regions returned.
|
||||
for (int i = 0; i < regions.length; i++) {
|
||||
regions[i] = openClosedRegion(regions[i]);
|
||||
@ -2784,15 +2802,6 @@ public class TestHRegion extends HBaseTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
protected HRegion [] split(final HRegion r, final byte [] splitRow)
|
||||
throws IOException {
|
||||
// Assert can get mid key from passed region.
|
||||
assertGet(r, fam3, splitRow);
|
||||
HRegion [] regions = r.splitRegion(splitRow);
|
||||
assertEquals(regions.length, 2);
|
||||
return regions;
|
||||
}
|
||||
|
||||
private HBaseConfiguration initSplit() {
|
||||
HBaseConfiguration conf = new HBaseConfiguration();
|
||||
// Always compact if there is more than one store file.
|
||||
@ -2827,6 +2836,11 @@ public class TestHRegion extends HBaseTestCase {
|
||||
}
|
||||
HRegionInfo info = new HRegionInfo(htd, null, null, false);
|
||||
Path path = new Path(DIR + callingMethod);
|
||||
if (fs.exists(path)) {
|
||||
if (!fs.delete(path, true)) {
|
||||
throw new IOException("Failed delete of " + path);
|
||||
}
|
||||
}
|
||||
region = HRegion.createHRegion(info, path, conf);
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user