HADOOP-2587 Splits blocked by compactions cause region to be offline for duration of compaction.
Patch verified by Billy Pearson git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@612161 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ea6f5071da
commit
b78012df61
@ -123,13 +123,12 @@ Trunk (unreleased changes)
|
||||
HADOOP-2490 Failure in nightly #346 (Added debugging of hudson failures).
|
||||
HADOOP-2558 fixes for build up on hudson (part 1, part 2, part 3, part 4)
|
||||
HADOOP-2500 Unreadable region kills region servers
|
||||
HADOOP-2587 Splits blocked by compactions cause region to be offline for
|
||||
duration of compaction.
|
||||
(Bryan Duxbury via Stack)
|
||||
HADOOP-2579 Initializing a new HTable object against a nonexistent table
|
||||
throws a NoServerForRegionException instead of a
|
||||
TableNotFoundException when a different table has been created
|
||||
previously (Bryan Duxbury via Stack)
|
||||
HADOOP-2587 Splits blocked by compactions cause region to be offline for
|
||||
duration of compaction.
|
||||
|
||||
IMPROVEMENTS
|
||||
HADOOP-2401 Add convenience put method that takes writable
|
||||
|
@ -373,31 +373,45 @@ public class HRegion implements HConstants {
|
||||
*/
|
||||
List<HStoreFile> close(boolean abort,
|
||||
final RegionUnavailableListener listener) throws IOException {
|
||||
Text regionName = this.regionInfo.getRegionName();
|
||||
if (isClosed()) {
|
||||
LOG.info("region " + this.regionInfo.getRegionName() + " already closed");
|
||||
LOG.info("region " + regionName + " already closed");
|
||||
return null;
|
||||
}
|
||||
synchronized (splitLock) {
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
synchronized (writestate) {
|
||||
while (writestate.compacting || writestate.flushing) {
|
||||
try {
|
||||
writestate.wait();
|
||||
} catch (InterruptedException iex) {
|
||||
// continue
|
||||
}
|
||||
synchronized (writestate) {
|
||||
while (writestate.compacting || writestate.flushing) {
|
||||
LOG.debug("waiting for" +
|
||||
(writestate.compacting ? " compaction" : "") +
|
||||
(writestate.flushing ?
|
||||
(writestate.compacting ? "," : "") + " cache flush" :
|
||||
""
|
||||
) + " to complete for region " + regionName
|
||||
);
|
||||
try {
|
||||
writestate.wait();
|
||||
} catch (InterruptedException iex) {
|
||||
// continue
|
||||
}
|
||||
// Disable compacting and flushing by background threads for this
|
||||
// region.
|
||||
writestate.writesEnabled = false;
|
||||
}
|
||||
|
||||
// Disable compacting and flushing by background threads for this
|
||||
// region.
|
||||
writestate.writesEnabled = false;
|
||||
LOG.debug("compactions and cache flushes disabled for region " +
|
||||
regionName);
|
||||
}
|
||||
lock.writeLock().lock();
|
||||
LOG.debug("new updates and scanners for region " + regionName +
|
||||
" disabled");
|
||||
|
||||
try {
|
||||
// Wait for active scanners to finish. The write lock we hold will prevent
|
||||
// new scanners from being created.
|
||||
|
||||
synchronized (activeScannerCount) {
|
||||
while (activeScannerCount.get() != 0) {
|
||||
LOG.debug("waiting for " + activeScannerCount.get() +
|
||||
" scanners to finish");
|
||||
try {
|
||||
activeScannerCount.wait();
|
||||
|
||||
@ -406,12 +420,14 @@ public class HRegion implements HConstants {
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG.debug("no more active scanners for region " + regionName);
|
||||
|
||||
// Write lock means no more row locks can be given out. Wait on
|
||||
// outstanding row locks to come in before we close so we do not drop
|
||||
// outstanding updates.
|
||||
waitOnRowLocks();
|
||||
|
||||
LOG.debug("no more write locks outstanding on region " + regionName);
|
||||
|
||||
if (listener != null) {
|
||||
// If there is a listener, let them know that we have now
|
||||
// acquired all the necessary locks and are starting to
|
||||
@ -551,8 +567,6 @@ public class HRegion implements HConstants {
|
||||
if (closed.get() || !needsSplit(midKey)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
Path splits = new Path(this.regiondir, SPLITDIR);
|
||||
if(!this.fs.exists(splits)) {
|
||||
this.fs.mkdirs(splits);
|
||||
@ -618,10 +632,6 @@ public class HRegion implements HConstants {
|
||||
LOG.debug("Cleaned up " + splits.toString() + " " + deleted);
|
||||
}
|
||||
HRegion regions[] = new HRegion [] {regionA, regionB};
|
||||
LOG.info("Region split of " + this.regionInfo.getRegionName() +
|
||||
" complete; " + "new regions: " + regions[0].getRegionName() + ", " +
|
||||
regions[1].getRegionName() + ". Split took " +
|
||||
StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
|
||||
return regions;
|
||||
}
|
||||
}
|
||||
@ -771,39 +781,39 @@ public class HRegion implements HConstants {
|
||||
* because a Snapshot was not properly persisted.
|
||||
*/
|
||||
boolean flushcache() throws IOException {
|
||||
lock.readLock().lock(); // Prevent splits and closes
|
||||
try {
|
||||
if (this.closed.get()) {
|
||||
return false;
|
||||
}
|
||||
synchronized (writestate) {
|
||||
if ((!writestate.flushing) && writestate.writesEnabled) {
|
||||
writestate.flushing = true;
|
||||
if (this.closed.get()) {
|
||||
return false;
|
||||
}
|
||||
synchronized (writestate) {
|
||||
if ((!writestate.flushing) && writestate.writesEnabled) {
|
||||
writestate.flushing = true;
|
||||
|
||||
} else {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("NOT flushing memcache for region " +
|
||||
this.regionInfo.getRegionName() + ", flushing=" +
|
||||
writestate.flushing + ", writesEnabled=" +
|
||||
writestate.writesEnabled);
|
||||
}
|
||||
return false;
|
||||
} else {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("NOT flushing memcache for region " +
|
||||
this.regionInfo.getRegionName() + ", flushing=" +
|
||||
writestate.flushing + ", writesEnabled=" +
|
||||
writestate.writesEnabled);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
long startTime = -1;
|
||||
synchronized (updateLock) {// Stop updates while we snapshot the memcaches
|
||||
startTime = snapshotMemcaches();
|
||||
}
|
||||
}
|
||||
try {
|
||||
lock.readLock().lock(); // Prevent splits and closes
|
||||
try {
|
||||
long startTime = -1;
|
||||
synchronized (updateLock) {// Stop updates while we snapshot the memcaches
|
||||
startTime = snapshotMemcaches();
|
||||
}
|
||||
return internalFlushcache(startTime);
|
||||
} finally {
|
||||
synchronized (writestate) {
|
||||
writestate.flushing = false;
|
||||
writestate.notifyAll();
|
||||
}
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
} finally {
|
||||
lock.readLock().unlock();
|
||||
synchronized (writestate) {
|
||||
writestate.flushing = false;
|
||||
writestate.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1043,6 +1053,7 @@ public class HRegion implements HConstants {
|
||||
* <i>ts</i>.
|
||||
*
|
||||
* @param row row key
|
||||
* @param ts
|
||||
* @return map of values
|
||||
* @throws IOException
|
||||
*/
|
||||
@ -1537,6 +1548,7 @@ public class HRegion implements HConstants {
|
||||
private void waitOnRowLocks() {
|
||||
synchronized (rowsToLocks) {
|
||||
while (this.rowsToLocks.size() > 0) {
|
||||
LOG.debug("waiting for " + this.rowsToLocks.size() + " row locks");
|
||||
try {
|
||||
this.rowsToLocks.wait();
|
||||
} catch (InterruptedException e) {
|
||||
|
@ -232,6 +232,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
|
||||
private HTable root = null;
|
||||
private HTable meta = null;
|
||||
private long startTime;
|
||||
|
||||
/** constructor */
|
||||
public Splitter() {
|
||||
@ -240,6 +241,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void closing(final Text regionName) {
|
||||
startTime = System.currentTimeMillis();
|
||||
lock.writeLock().lock();
|
||||
try {
|
||||
// Remove region from regions Map and add it to the Map of retiring
|
||||
@ -367,10 +369,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
}
|
||||
reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(),
|
||||
newRegions[1].getRegionInfo());
|
||||
LOG.info("region split, META update, and report to master all" +
|
||||
LOG.info("region split, META updated, and report to master all" +
|
||||
" successful. Old region=" + oldRegionInfo.getRegionName() +
|
||||
", new regions: " + newRegions[0].getRegionName() + ", " +
|
||||
newRegions[1].getRegionName());
|
||||
newRegions[1].getRegionName() + ". Split took " +
|
||||
StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
|
||||
|
||||
// Do not serve the new regions. Let the Master assign them.
|
||||
}
|
||||
|
@ -1105,17 +1105,17 @@ public class HTable implements HConstants {
|
||||
// No more tries
|
||||
throw e;
|
||||
}
|
||||
try {
|
||||
Thread.sleep(pause);
|
||||
} catch (InterruptedException ie) {
|
||||
// continue
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("reloading table servers because: " + e.getMessage());
|
||||
}
|
||||
currentRegionLocation = getRegionLocation(localStartKey, true);
|
||||
}
|
||||
}
|
||||
try {
|
||||
Thread.sleep(pause);
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
} catch (IOException e) {
|
||||
close();
|
||||
if (e instanceof RemoteException) {
|
||||
|
@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegion;
|
||||
import org.apache.hadoop.hbase.HScannerInterface;
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.HTable;
|
||||
@ -249,13 +250,12 @@ public class TestTableIndex extends MultiRegionTable {
|
||||
}
|
||||
|
||||
private void verify() throws IOException {
|
||||
// Sleep before we start the verify to ensure that when the scanner takes
|
||||
// its snapshot, all the updates have made it into the cache.
|
||||
try {
|
||||
Thread.sleep(conf.getLong("hbase.regionserver.optionalcacheflushinterval",
|
||||
60L * 1000L));
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
// Force a cache flush for every online region to ensure that when the
|
||||
// scanner takes its snapshot, all the updates have made it into the cache.
|
||||
for (HRegion r : hCluster.getRegionThreads().get(0).getRegionServer().
|
||||
getOnlineRegions().values()) {
|
||||
HRegionIncommon region = new HRegionIncommon(r);
|
||||
region.flushcache();
|
||||
}
|
||||
|
||||
Path localDir = new Path(getUnitTestdir(getName()), "index_" +
|
||||
|
Loading…
x
Reference in New Issue
Block a user