HBASE-442 Move internal classes out of HRegionServer

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@630968 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bryan Duxbury 2008-02-25 19:50:02 +00:00
parent d280d4d0ec
commit f48c1d7893
6 changed files with 565 additions and 408 deletions

View File

@ -65,7 +65,8 @@ Hbase Change Log
HBASE-457 Factor Master into Master, RegionManager, and ServerManager HBASE-457 Factor Master into Master, RegionManager, and ServerManager
HBASE-464 HBASE-419 introduced javadoc errors HBASE-464 HBASE-419 introduced javadoc errors
HBASE-468 Move HStoreKey back to o.a.h.h HBASE-468 Move HStoreKey back to o.a.h.h
HBASE-442 Move internal classes out of HRegionServer
Branch 0.1 Branch 0.1
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -0,0 +1,201 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.util.Writables;
/**
* Compact region on request and then run split if appropriate
*/
class CompactSplitThread extends Thread
implements RegionUnavailableListener, HConstants {
static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
private HTable root = null;
private HTable meta = null;
private long startTime;
private final long frequency;
private HRegionServer server;
private HBaseConfiguration conf;
private final BlockingQueue<QueueEntry> compactionQueue =
new LinkedBlockingQueue<QueueEntry>();
/** constructor */
public CompactSplitThread(HRegionServer server) {
super();
this.server = server;
this.conf = server.conf;
this.frequency =
conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency",
20 * 1000);
}
/** {@inheritDoc} */
@Override
public void run() {
while (!server.isStopRequested()) {
QueueEntry e = null;
try {
e = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
if (e == null) {
continue;
}
e.getRegion().compactIfNeeded();
split(e.getRegion());
} catch (InterruptedException ex) {
continue;
} catch (IOException ex) {
LOG.error("Compaction failed" +
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
RemoteExceptionHandler.checkIOException(ex));
if (!server.checkFileSystem()) {
break;
}
} catch (Exception ex) {
LOG.error("Compaction failed" +
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
ex);
if (!server.checkFileSystem()) {
break;
}
}
}
LOG.info(getName() + " exiting");
}
/**
* @param e QueueEntry for region to be compacted
*/
public void compactionRequested(QueueEntry e) {
compactionQueue.add(e);
}
void compactionRequested(final HRegion r) {
compactionRequested(new QueueEntry(r, System.currentTimeMillis()));
}
private void split(final HRegion region) throws IOException {
final HRegionInfo oldRegionInfo = region.getRegionInfo();
final HRegion[] newRegions = region.splitRegion(this);
if (newRegions == null) {
// Didn't need to be split
return;
}
// When a region is split, the META table needs to updated if we're
// splitting a 'normal' region, and the ROOT table needs to be
// updated if we are splitting a META region.
HTable t = null;
if (region.getRegionInfo().isMetaTable()) {
// We need to update the root region
if (this.root == null) {
this.root = new HTable(conf, ROOT_TABLE_NAME);
}
t = root;
} else {
// For normal regions we need to update the meta region
if (meta == null) {
meta = new HTable(conf, META_TABLE_NAME);
}
t = meta;
}
LOG.info("Updating " + t.getTableName() + " with region split info");
// Mark old region as offline and split in META.
// NOTE: there is no need for retry logic here. HTable does it for us.
oldRegionInfo.setOffline(true);
oldRegionInfo.setSplit(true);
BatchUpdate update = new BatchUpdate(oldRegionInfo.getRegionName());
update.put(COL_REGIONINFO, Writables.getBytes(oldRegionInfo));
update.put(COL_SPLITA, Writables.getBytes(newRegions[0].getRegionInfo()));
update.put(COL_SPLITB, Writables.getBytes(newRegions[1].getRegionInfo()));
t.commit(update);
// Add new regions to META
for (int i = 0; i < newRegions.length; i++) {
update = new BatchUpdate(newRegions[i].getRegionName());
update.put(COL_REGIONINFO, Writables.getBytes(
newRegions[i].getRegionInfo()));
t.commit(update);
}
// Now tell the master about the new regions
if (LOG.isDebugEnabled()) {
LOG.debug("Reporting region split to master");
}
server.reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(),
newRegions[1].getRegionInfo());
LOG.info("region split, META updated, and report to master all" +
" successful. Old region=" + oldRegionInfo.toString() +
", new regions: " + newRegions[0].toString() + ", " +
newRegions[1].toString() + ". Split took " +
StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
// Do not serve the new regions. Let the Master assign them.
}
/** {@inheritDoc} */
public void closing(final Text regionName) {
startTime = System.currentTimeMillis();
server.getWriteLock().lock();
try {
// Remove region from regions Map and add it to the Map of retiring
// regions.
server.setRegionClosing(regionName);
if (LOG.isDebugEnabled()) {
LOG.debug(regionName.toString() + " closing (" +
"Adding to retiringRegions)");
}
} finally {
server.getWriteLock().unlock();
}
}
/** {@inheritDoc} */
public void closed(final Text regionName) {
server.getWriteLock().lock();
try {
server.setRegionClosed(regionName);
if (LOG.isDebugEnabled()) {
LOG.debug(regionName.toString() + " closed");
}
} finally {
server.getWriteLock().unlock();
}
}
}

View File

@ -0,0 +1,148 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import java.util.Set;
import java.util.Iterator;
import java.util.ConcurrentModificationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
/** Flush cache upon request */
class Flusher extends Thread implements CacheFlushListener {
static final Log LOG = LogFactory.getLog(Flusher.class);
private final DelayQueue<QueueEntry> flushQueue =
new DelayQueue<QueueEntry>();
private final long optionalFlushPeriod;
private final HRegionServer server;
private final HBaseConfiguration conf;
private final Integer lock = new Integer(0);
/** constructor */
public Flusher(final HRegionServer server) {
super();
this.server = server;
conf = server.conf;
this.optionalFlushPeriod = conf.getLong(
"hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L);
}
/** {@inheritDoc} */
@Override
public void run() {
while (!server.isStopRequested()) {
QueueEntry e = null;
try {
e = flushQueue.poll(server.threadWakeFrequency, TimeUnit.MILLISECONDS);
if (e == null) {
continue;
}
synchronized(lock) { // Don't interrupt while we're working
if (e.getRegion().flushcache()) {
server.compactionRequested(e);
}
e.setExpirationTime(System.currentTimeMillis() +
optionalFlushPeriod);
flushQueue.add(e);
}
// Now ensure that all the active regions are in the queue
Set<HRegion> regions = server.getRegionsToCheck();
for (HRegion r: regions) {
e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod);
synchronized (flushQueue) {
if (!flushQueue.contains(e)) {
flushQueue.add(e);
}
}
}
// Now make sure that the queue only contains active regions
synchronized (flushQueue) {
for (Iterator<QueueEntry> i = flushQueue.iterator(); i.hasNext(); ) {
e = i.next();
if (!regions.contains(e.getRegion())) {
i.remove();
}
}
}
} catch (InterruptedException ex) {
continue;
} catch (ConcurrentModificationException ex) {
continue;
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
// section, we get a DroppedSnapshotException and a replay of hlog
// is required. Currently the only way to do this is a restart of
// the server.
LOG.fatal("Replay of hlog required. Forcing server restart", ex);
if (!server.checkFileSystem()) {
break;
}
server.stop();
} catch (IOException ex) {
LOG.error("Cache flush failed" +
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
RemoteExceptionHandler.checkIOException(ex));
if (!server.checkFileSystem()) {
break;
}
} catch (Exception ex) {
LOG.error("Cache flush failed" +
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
ex);
if (!server.checkFileSystem()) {
break;
}
}
}
flushQueue.clear();
LOG.info(getName() + " exiting");
}
/** {@inheritDoc} */
public void flushRequested(HRegion region) {
QueueEntry e = new QueueEntry(region, System.currentTimeMillis());
synchronized (flushQueue) {
if (flushQueue.contains(e)) {
flushQueue.remove(e);
}
flushQueue.add(e);
}
}
/**
* Only interrupt once it's done with a run through the work loop.
*/
void interruptPolitely() {
synchronized (lock) {
interrupt();
}
}
}

View File

@ -185,415 +185,18 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
} }
/** Queue entry passed to flusher, compactor and splitter threads */
class QueueEntry implements Delayed {
private final HRegion region;
private long expirationTime;
QueueEntry(HRegion region, long expirationTime) {
this.region = region;
this.expirationTime = expirationTime;
}
/** {@inheritDoc} */
@Override
public boolean equals(Object o) {
QueueEntry other = (QueueEntry) o;
return this.hashCode() == other.hashCode();
}
/** {@inheritDoc} */
@Override
public int hashCode() {
return this.region.getRegionInfo().hashCode();
}
/** {@inheritDoc} */
public long getDelay(TimeUnit unit) {
return unit.convert(this.expirationTime - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
}
/** {@inheritDoc} */
public int compareTo(Delayed o) {
long delta = this.getDelay(TimeUnit.MILLISECONDS) -
o.getDelay(TimeUnit.MILLISECONDS);
int value = 0;
if (delta > 0) {
value = 1;
} else if (delta < 0) {
value = -1;
}
return value;
}
/** @return the region */
public HRegion getRegion() {
return region;
}
/** @param expirationTime the expirationTime to set */
public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}
// Compactions // Compactions
final CompactSplitThread compactSplitThread; final CompactSplitThread compactSplitThread;
// Needed during shutdown so we send an interrupt after completion of a
// compaction, not in the midst.
final Integer compactSplitLock = new Integer(0);
/** Compact region on request and then run split if appropriate
*/
private class CompactSplitThread extends Thread
implements RegionUnavailableListener {
private HTable root = null;
private HTable meta = null;
private long startTime;
private final long frequency;
private final BlockingQueue<QueueEntry> compactionQueue =
new LinkedBlockingQueue<QueueEntry>();
/** constructor */
public CompactSplitThread() {
super();
this.frequency =
conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency",
20 * 1000);
}
/** {@inheritDoc} */
@Override
public void run() {
while (!stopRequested.get()) {
QueueEntry e = null;
try {
e = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
if (e == null) {
continue;
}
e.getRegion().compactIfNeeded();
split(e.getRegion());
} catch (InterruptedException ex) {
continue;
} catch (IOException ex) {
LOG.error("Compaction failed" +
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
RemoteExceptionHandler.checkIOException(ex));
if (!checkFileSystem()) {
break;
}
} catch (Exception ex) {
LOG.error("Compaction failed" +
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
ex);
if (!checkFileSystem()) {
break;
}
}
}
LOG.info(getName() + " exiting");
}
/**
* @param e QueueEntry for region to be compacted
*/
public void compactionRequested(QueueEntry e) {
compactionQueue.add(e);
}
void compactionRequested(final HRegion r) {
compactionRequested(new QueueEntry(r, System.currentTimeMillis()));
}
private void split(final HRegion region) throws IOException {
final HRegionInfo oldRegionInfo = region.getRegionInfo();
final HRegion[] newRegions = region.splitRegion(this);
if (newRegions == null) {
// Didn't need to be split
return;
}
// When a region is split, the META table needs to updated if we're
// splitting a 'normal' region, and the ROOT table needs to be
// updated if we are splitting a META region.
HTable t = null;
if (region.getRegionInfo().isMetaTable()) {
// We need to update the root region
if (this.root == null) {
this.root = new HTable(conf, ROOT_TABLE_NAME);
}
t = root;
} else {
// For normal regions we need to update the meta region
if (meta == null) {
meta = new HTable(conf, META_TABLE_NAME);
}
t = meta;
}
LOG.info("Updating " + t.getTableName() + " with region split info");
// Mark old region as offline and split in META.
// NOTE: there is no need for retry logic here. HTable does it for us.
oldRegionInfo.setOffline(true);
oldRegionInfo.setSplit(true);
BatchUpdate update = new BatchUpdate(oldRegionInfo.getRegionName());
update.put(COL_REGIONINFO, Writables.getBytes(oldRegionInfo));
update.put(COL_SPLITA, Writables.getBytes(newRegions[0].getRegionInfo()));
update.put(COL_SPLITB, Writables.getBytes(newRegions[1].getRegionInfo()));
t.commit(update);
// Add new regions to META
for (int i = 0; i < newRegions.length; i++) {
update = new BatchUpdate(newRegions[i].getRegionName());
update.put(COL_REGIONINFO, Writables.getBytes(
newRegions[i].getRegionInfo()));
t.commit(update);
}
// Now tell the master about the new regions
if (LOG.isDebugEnabled()) {
LOG.debug("Reporting region split to master");
}
reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(),
newRegions[1].getRegionInfo());
LOG.info("region split, META updated, and report to master all" +
" successful. Old region=" + oldRegionInfo.toString() +
", new regions: " + newRegions[0].toString() + ", " +
newRegions[1].toString() + ". Split took " +
StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
// Do not serve the new regions. Let the Master assign them.
}
/** {@inheritDoc} */
public void closing(final Text regionName) {
startTime = System.currentTimeMillis();
lock.writeLock().lock();
try {
// Remove region from regions Map and add it to the Map of retiring
// regions.
retiringRegions.put(regionName, onlineRegions.remove(regionName));
if (LOG.isDebugEnabled()) {
LOG.debug(regionName.toString() + " closing (" +
"Adding to retiringRegions)");
}
} finally {
lock.writeLock().unlock();
}
}
/** {@inheritDoc} */
public void closed(final Text regionName) {
lock.writeLock().lock();
try {
retiringRegions.remove(regionName);
if (LOG.isDebugEnabled()) {
LOG.debug(regionName.toString() + " closed");
}
} finally {
lock.writeLock().unlock();
}
}
}
// Cache flushing // Cache flushing
final Flusher cacheFlusher; final Flusher cacheFlusher;
// Needed during shutdown so we send an interrupt after completion of a
// flush, not in the midst.
final Integer cacheFlusherLock = new Integer(0);
/** Flush cache upon request */
class Flusher extends Thread implements CacheFlushListener {
private final DelayQueue<QueueEntry> flushQueue =
new DelayQueue<QueueEntry>();
private final long optionalFlushPeriod;
/** constructor */
public Flusher() {
super();
this.optionalFlushPeriod = conf.getLong(
"hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L);
}
/** {@inheritDoc} */
@Override
public void run() {
while (!stopRequested.get()) {
QueueEntry e = null;
try {
e = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
if (e == null) {
continue;
}
synchronized(cacheFlusherLock) { // Don't interrupt while we're working
if (e.getRegion().flushcache()) {
compactSplitThread.compactionRequested(e);
}
e.setExpirationTime(System.currentTimeMillis() +
optionalFlushPeriod);
flushQueue.add(e);
}
// Now insure that all the active regions are in the queue
Set<HRegion> regions = getRegionsToCheck();
for (HRegion r: regions) {
e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod);
synchronized (flushQueue) {
if (!flushQueue.contains(e)) {
flushQueue.add(e);
}
}
}
// Now make sure that the queue only contains active regions
synchronized (flushQueue) {
for (Iterator<QueueEntry> i = flushQueue.iterator(); i.hasNext(); ) {
e = i.next();
if (!regions.contains(e.getRegion())) {
i.remove();
}
}
}
} catch (InterruptedException ex) {
continue;
} catch (ConcurrentModificationException ex) {
continue;
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
// section, we get a DroppedSnapshotException and a replay of hlog
// is required. Currently the only way to do this is a restart of
// the server.
LOG.fatal("Replay of hlog required. Forcing server restart", ex);
if (!checkFileSystem()) {
break;
}
HRegionServer.this.stop();
} catch (IOException ex) {
LOG.error("Cache flush failed" +
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
RemoteExceptionHandler.checkIOException(ex));
if (!checkFileSystem()) {
break;
}
} catch (Exception ex) {
LOG.error("Cache flush failed" +
(e != null ? (" for region " + e.getRegion().getRegionName()) : ""),
ex);
if (!checkFileSystem()) {
break;
}
}
}
flushQueue.clear();
LOG.info(getName() + " exiting");
}
/** {@inheritDoc} */
public void flushRequested(HRegion region) {
QueueEntry e = new QueueEntry(region, System.currentTimeMillis());
synchronized (flushQueue) {
if (flushQueue.contains(e)) {
flushQueue.remove(e);
}
flushQueue.add(e);
}
}
}
// HLog and HLog roller. log is protected rather than private to avoid // HLog and HLog roller. log is protected rather than private to avoid
// eclipse warning when accessed by inner classes // eclipse warning when accessed by inner classes
protected HLog log; protected HLog log;
final LogRoller logRoller; final LogRoller logRoller;
final Integer logRollerLock = new Integer(0); final Integer logRollerLock = new Integer(0);
/** Runs periodically to determine if the HLog should be rolled */
class LogRoller extends Thread implements LogRollListener {
private final Integer rollLock = new Integer(0);
private final long optionalLogRollInterval;
private long lastLogRollTime;
private volatile boolean rollLog;
/** constructor */
public LogRoller() {
super();
this.optionalLogRollInterval = conf.getLong(
"hbase.regionserver.optionallogrollinterval", 30L * 60L * 1000L);
this.rollLog = false;
lastLogRollTime = System.currentTimeMillis();
}
/** {@inheritDoc} */
@Override
public void run() {
while (!stopRequested.get()) {
while (!rollLog && !stopRequested.get()) {
long now = System.currentTimeMillis();
if (this.lastLogRollTime + this.optionalLogRollInterval <= now) {
rollLog = true;
this.lastLogRollTime = now;
} else {
synchronized (rollLock) {
try {
rollLock.wait(threadWakeFrequency);
} catch (InterruptedException e) {
continue;
}
}
}
}
if (!rollLog) {
// There's only two reasons to break out of the while loop.
// 1. Log roll requested
// 2. Stop requested
// so if a log roll was not requested, continue and break out of loop
continue;
}
synchronized (logRollerLock) {
try {
LOG.info("Rolling hlog. Number of entries: " + log.getNumEntries());
log.rollWriter();
} catch (IOException ex) {
LOG.error("Log rolling failed",
RemoteExceptionHandler.checkIOException(ex));
checkFileSystem();
} catch (Exception ex) {
LOG.error("Log rolling failed", ex);
checkFileSystem();
} finally {
rollLog = false;
}
}
}
}
/** {@inheritDoc} */
public void logRollRequested() {
synchronized (rollLock) {
rollLog = true;
rollLock.notifyAll();
}
}
}
/** /**
* Starts a HRegionServer at the default location * Starts a HRegionServer at the default location
* @param conf * @param conf
@ -624,13 +227,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
conf.getInt("hbase.master.lease.period", 30 * 1000); conf.getInt("hbase.master.lease.period", 30 * 1000);
// Cache flushing thread. // Cache flushing thread.
this.cacheFlusher = new Flusher(); this.cacheFlusher = new Flusher(this);
// Compaction thread // Compaction thread
this.compactSplitThread = new CompactSplitThread(); this.compactSplitThread = new CompactSplitThread(this);
// Log rolling thread // Log rolling thread
this.logRoller = new LogRoller(); this.logRoller = new LogRoller(this);
// Task thread to process requests from Master // Task thread to process requests from Master
this.worker = new Worker(); this.worker = new Worker();
@ -817,12 +420,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// Send interrupts to wake up threads if sleeping so they notice shutdown. // Send interrupts to wake up threads if sleeping so they notice shutdown.
// TODO: Should we check they are alive? If OOME could have exited already // TODO: Should we check they are alive? If OOME could have exited already
synchronized(cacheFlusherLock) { cacheFlusher.interruptPolitely();
this.cacheFlusher.interrupt(); compactSplitThread.interrupt();
}
synchronized (compactSplitLock) {
this.compactSplitThread.interrupt();
}
synchronized (logRollerLock) { synchronized (logRollerLock) {
this.logRoller.interrupt(); this.logRoller.interrupt();
} }
@ -1592,9 +1191,28 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
} }
/** @return the info server */ /** @return the info server */
/**
* Get the InfoServer this HRegionServer has put up.
*/
public InfoServer getInfoServer() { public InfoServer getInfoServer() {
return infoServer; return infoServer;
} }
/**
* Check if a stop has been requested.
*/
public boolean isStopRequested() {
return stopRequested.get();
}
/** Get the write lock for the server */
ReentrantReadWriteLock.WriteLock getWriteLock() {
return lock.writeLock();
}
void compactionRequested(QueueEntry e) {
compactSplitThread.compactionRequested(e);
}
/** /**
* @return Immutable list of this servers regions. * @return Immutable list of this servers regions.
@ -1624,6 +1242,16 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
return getRegion(regionName, false); return getRegion(regionName, false);
} }
/** Move a region from online to closing. */
void setRegionClosing(final Text regionName) {
retiringRegions.put(regionName, onlineRegions.remove(regionName));
}
/** Set a region as closed. */
void setRegionClosed(final Text regionName) {
retiringRegions.remove(regionName);
}
/** /**
* Protected utility method for safely obtaining an HRegion handle. * Protected utility method for safely obtaining an HRegion handle.
* @param regionName Name of online {@link HRegion} to return * @param regionName Name of online {@link HRegion} to return
@ -1633,7 +1261,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* @throws NotServingRegionException * @throws NotServingRegionException
*/ */
protected HRegion getRegion(final Text regionName, protected HRegion getRegion(final Text regionName,
final boolean checkRetiringRegions) final boolean checkRetiringRegions)
throws NotServingRegionException { throws NotServingRegionException {
HRegion region = null; HRegion region = null;
this.lock.readLock().lock(); this.lock.readLock().lock();

View File

@ -0,0 +1,101 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
/** Runs periodically to determine if the HLog should be rolled */
class LogRoller extends Thread implements LogRollListener {
static final Log LOG = LogFactory.getLog(LogRoller.class);
private final Integer rollLock = new Integer(0);
private final long optionalLogRollInterval;
private long lastLogRollTime;
private volatile boolean rollLog;
private final HRegionServer server;
private final HBaseConfiguration conf;
/** constructor */
public LogRoller(final HRegionServer server) {
super();
this.server = server;
conf = server.conf;
this.optionalLogRollInterval = conf.getLong(
"hbase.regionserver.optionallogrollinterval", 30L * 60L * 1000L);
this.rollLog = false;
lastLogRollTime = System.currentTimeMillis();
}
/** {@inheritDoc} */
@Override
public void run() {
while (!server.isStopRequested()) {
while (!rollLog && !server.isStopRequested()) {
long now = System.currentTimeMillis();
if (this.lastLogRollTime + this.optionalLogRollInterval <= now) {
rollLog = true;
this.lastLogRollTime = now;
} else {
synchronized (rollLock) {
try {
rollLock.wait(server.threadWakeFrequency);
} catch (InterruptedException e) {
continue;
}
}
}
}
if (!rollLog) {
// There's only two reasons to break out of the while loop.
// 1. Log roll requested
// 2. Stop requested
// so if a log roll was not requested, continue and break out of loop
continue;
}
synchronized (server.logRollerLock) {
try {
LOG.info("Rolling hlog. Number of entries: " + server.getLog().getNumEntries());
server.getLog().rollWriter();
} catch (IOException ex) {
LOG.error("Log rolling failed",
RemoteExceptionHandler.checkIOException(ex));
server.checkFileSystem();
} catch (Exception ex) {
LOG.error("Log rolling failed", ex);
server.checkFileSystem();
} finally {
rollLog = false;
}
}
}
}
/** {@inheritDoc} */
public void logRollRequested() {
synchronized (rollLock) {
rollLog = true;
rollLock.notifyAll();
}
}
}

View File

@ -0,0 +1,78 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Delayed;
/** Queue entry passed to flusher, compactor and splitter threads */
class QueueEntry implements Delayed {
private final HRegion region;
private long expirationTime;
QueueEntry(HRegion region, long expirationTime) {
this.region = region;
this.expirationTime = expirationTime;
}
/** {@inheritDoc} */
@Override
public boolean equals(Object o) {
QueueEntry other = (QueueEntry) o;
return this.hashCode() == other.hashCode();
}
/** {@inheritDoc} */
@Override
public int hashCode() {
return this.region.getRegionInfo().hashCode();
}
/** {@inheritDoc} */
public long getDelay(TimeUnit unit) {
return unit.convert(this.expirationTime - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
}
/** {@inheritDoc} */
public int compareTo(Delayed o) {
long delta = this.getDelay(TimeUnit.MILLISECONDS) -
o.getDelay(TimeUnit.MILLISECONDS);
int value = 0;
if (delta > 0) {
value = 1;
} else if (delta < 0) {
value = -1;
}
return value;
}
/** @return the region */
public HRegion getRegion() {
return region;
}
/** @param expirationTime the expirationTime to set */
public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}