HBASE-728, HBASE-956, HBASE-955 Address thread naming, which threads are Chores, vs Threads, make HLog manager the write ahead log and not extend it to provided optional HLog sync operations.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@708322 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2008-10-27 21:25:35 +00:00
parent 7e1d49358d
commit cf7ae9adcb
10 changed files with 124 additions and 21 deletions

View File

@ -46,6 +46,9 @@ Release 0.19.0 - Unreleased
HBASE-954 Don't reassign root region until ProcessServerShutdown has split HBASE-954 Don't reassign root region until ProcessServerShutdown has split
the former region server's log the former region server's log
HBASE-953 PerformanceEvaluation tests if table exists by comparing descriptors HBASE-953 PerformanceEvaluation tests if table exists by comparing descriptors
HBASE-728, HBASE-956, HBASE-955 Address thread naming, which threads are
Chores, vs Threads, make HLog manager the write ahead log and
not extend it to provided optional HLog sync operations.
IMPROVEMENTS IMPROVEMENTS
HBASE-901 Add a limit to key length, check key and value length on client side HBASE-901 Add a limit to key length, check key and value length on client side

View File

@ -31,6 +31,9 @@ import org.apache.hadoop.hbase.util.Sleeper;
* If an unhandled exception, the threads exit is logged. * If an unhandled exception, the threads exit is logged.
* Implementers just need to add checking if there is work to be done and if * Implementers just need to add checking if there is work to be done and if
* so, do it. Its the base of most of the chore threads in hbase. * so, do it. Its the base of most of the chore threads in hbase.
*
* Don't subclass Chore if the task relies on being woken up for something to
* do, such as an entry being added to a queue, etc.
*/ */
public abstract class Chore extends Thread { public abstract class Chore extends Thread {
private final Log LOG = LogFactory.getLog(this.getClass()); private final Log LOG = LogFactory.getLog(this.getClass());

View File

@ -44,6 +44,11 @@ import java.io.IOException;
* <p>The Leases class is a general reusable class for this kind of pattern. * <p>The Leases class is a general reusable class for this kind of pattern.
* An instance of the Leases class will create a thread to do its dirty work. * An instance of the Leases class will create a thread to do its dirty work.
* You should close() the instance if you want to clean up the thread properly. * You should close() the instance if you want to clean up the thread properly.
*
* <p>
* NOTE: This class extends Thread rather than Chore because the sleep time
* can be interrupted when there is something to do, rather than the Chore
* sleep time which is invariant.
*/ */
public class Leases extends Thread { public class Leases extends Thread {
private static final Log LOG = LogFactory.getLog(Leases.class.getName()); private static final Log LOG = LogFactory.getLog(Leases.class.getName());

View File

@ -79,6 +79,10 @@ import org.apache.hadoop.ipc.Server;
/** /**
* HMaster is the "master server" for a HBase. * HMaster is the "master server" for a HBase.
* There is only one HMaster for a single HBase deployment. * There is only one HMaster for a single HBase deployment.
*
* NOTE: This class extends Thread rather than Chore because the sleep time
* can be interrupted when there is something to do, rather than the Chore
* sleep time which is invariant.
*/ */
public class HMaster extends Thread implements HConstants, HMasterInterface, public class HMaster extends Thread implements HConstants, HMasterInterface,
HMasterRegionInterface { HMasterRegionInterface {

View File

@ -40,6 +40,10 @@ import org.apache.hadoop.hbase.util.Writables;
/** /**
* Compact region on request and then run split if appropriate * Compact region on request and then run split if appropriate
*
* NOTE: This class extends Thread rather than Chore because the sleep time
* can be interrupted when there is something to do, rather than the Chore
* sleep time which is invariant.
*/ */
class CompactSplitThread extends Thread implements HConstants { class CompactSplitThread extends Thread implements HConstants {
static final Log LOG = LogFactory.getLog(CompactSplitThread.class); static final Log LOG = LogFactory.getLog(CompactSplitThread.class);

View File

@ -201,12 +201,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
final CompactSplitThread compactSplitThread; final CompactSplitThread compactSplitThread;
// Cache flushing // Cache flushing
final Flusher cacheFlusher; final MemcacheFlusher cacheFlusher;
// HLog and HLog roller. log is protected rather than private to avoid // HLog and HLog roller. log is protected rather than private to avoid
// eclipse warning when accessed by inner classes // eclipse warning when accessed by inner classes
protected HLog log; protected volatile HLog log;
final LogRoller logRoller; final LogRoller logRoller;
final LogFlusher logFlusher;
// flag set after we're done setting up server threads (used for testing) // flag set after we're done setting up server threads (used for testing)
protected volatile boolean isOnline; protected volatile boolean isOnline;
@ -244,7 +245,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
conf.getInt("hbase.master.lease.period", 120 * 1000); conf.getInt("hbase.master.lease.period", 120 * 1000);
// Cache flushing thread. // Cache flushing thread.
this.cacheFlusher = new Flusher(conf, this); this.cacheFlusher = new MemcacheFlusher(conf, this);
// Compaction thread // Compaction thread
this.compactSplitThread = new CompactSplitThread(this); this.compactSplitThread = new CompactSplitThread(this);
@ -252,6 +253,10 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// Log rolling thread // Log rolling thread
this.logRoller = new LogRoller(this); this.logRoller = new LogRoller(this);
// Log flushing thread
this.logFlusher =
new LogFlusher(this.threadWakeFrequency, this.stopRequested);
// Task thread to process requests from Master // Task thread to process requests from Master
this.worker = new Worker(); this.worker = new Worker();
this.workerThread = new Thread(worker); this.workerThread = new Thread(worker);
@ -350,6 +355,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
try { try {
serverInfo.setStartCode(System.currentTimeMillis()); serverInfo.setStartCode(System.currentTimeMillis());
log = setupHLog(); log = setupHLog();
this.logFlusher.setHLog(log);
} catch (IOException e) { } catch (IOException e) {
this.abortRequested = true; this.abortRequested = true;
this.stopRequested.set(true); this.stopRequested.set(true);
@ -439,8 +445,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// Send interrupts to wake up threads if sleeping so they notice shutdown. // Send interrupts to wake up threads if sleeping so they notice shutdown.
// TODO: Should we check they are alive? If OOME could have exited already // TODO: Should we check they are alive? If OOME could have exited already
cacheFlusher.interruptIfNecessary(); cacheFlusher.interruptIfNecessary();
logFlusher.interrupt();
compactSplitThread.interruptIfNecessary(); compactSplitThread.interruptIfNecessary();
this.logRoller.interruptIfNecessary(); logRoller.interruptIfNecessary();
if (abortRequested) { if (abortRequested) {
if (this.fsOk) { if (this.fsOk) {
@ -523,6 +530,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
this.fs = FileSystem.get(this.conf); this.fs = FileSystem.get(this.conf);
this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR)); this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
this.log = setupHLog(); this.log = setupHLog();
this.logFlusher.setHLog(log);
startServiceThreads(); startServiceThreads();
isOnline = true; isOnline = true;
} catch (IOException e) { } catch (IOException e) {
@ -562,14 +570,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
" because logdir " + logdir.toString() + " exists"); " because logdir " + logdir.toString() + " exists");
} }
HLog newlog = new HLog(fs, logdir, conf, logRoller); HLog newlog = new HLog(fs, logdir, conf, logRoller);
newlog.start();
return newlog; return newlog;
} }
/* /*
* Start Chore Threads, Server, Worker and lease checker threads. Install an * Start maintanence Threads, Server, Worker and lease checker threads.
* UncaughtExceptionHandler that calls abort of RegionServer if we get * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
* an unhandled exception. We cannot set the handler on all threads. * get an unhandled exception. We cannot set the handler on all threads.
* Server's internal Listener thread is off limits. For Server, if an OOME, * Server's internal Listener thread is off limits. For Server, if an OOME,
* it waits a while then retries. Meantime, a flush or a compaction that * it waits a while then retries. Meantime, a flush or a compaction that
* tries to run should trigger same critical condition and the shutdown will * tries to run should trigger same critical condition and the shutdown will
@ -588,6 +595,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
}; };
Threads.setDaemonThreadRunning(this.logRoller, n + ".logRoller", Threads.setDaemonThreadRunning(this.logRoller, n + ".logRoller",
handler); handler);
Threads.setDaemonThreadRunning(this.logFlusher, n + ".logFlusher",
handler);
Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher", Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
handler); handler);
Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor", Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",
@ -1138,7 +1147,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
} }
} }
public void batchUpdate(final byte [] regionName, BatchUpdate b, long lockId) public void batchUpdate(final byte [] regionName, BatchUpdate b,
@SuppressWarnings("unused") long lockId)
throws IOException { throws IOException {
if (b.getRow() == null) if (b.getRow() == null)
throw new IllegalArgumentException("update has null row"); throw new IllegalArgumentException("update has null row");

View File

@ -286,6 +286,7 @@ public class HStore implements HConstants {
* lower than maxSeqID. (Because we know such log messages are already * lower than maxSeqID. (Because we know such log messages are already
* reflected in the MapFiles.) * reflected in the MapFiles.)
*/ */
@SuppressWarnings("unchecked")
private void doReconstructionLog(final Path reconstructionLog, private void doReconstructionLog(final Path reconstructionLog,
final long maxSeqID, final Progressable reporter) final long maxSeqID, final Progressable reporter)
throws UnsupportedEncodingException, IOException { throws UnsupportedEncodingException, IOException {
@ -769,6 +770,7 @@ public class HStore implements HConstants {
*/ */
StoreSize compact(boolean majorCompaction) throws IOException { StoreSize compact(boolean majorCompaction) throws IOException {
boolean forceSplit = this.info.shouldSplit(false); boolean forceSplit = this.info.shouldSplit(false);
boolean doMajorCompaction = majorCompaction;
synchronized (compactLock) { synchronized (compactLock) {
long maxId = -1; long maxId = -1;
int nrows = -1; int nrows = -1;
@ -785,10 +787,10 @@ public class HStore implements HConstants {
maxId = this.storefiles.lastKey().longValue(); maxId = this.storefiles.lastKey().longValue();
} }
// Check to see if we need to do a major compaction on this region. // Check to see if we need to do a major compaction on this region.
// If so, change majorCompaction to true to skip the incremental compacting below. // If so, change doMajorCompaction to true to skip the incremental
// Only check if majorCompaction is not true. // compacting below. Only check if doMajorCompaction is not true.
long lastMajorCompaction = 0L; long lastMajorCompaction = 0L;
if (!majorCompaction) { if (!doMajorCompaction) {
Path mapdir = HStoreFile.getMapDir(basedir, info.getEncodedName(), family.getName()); Path mapdir = HStoreFile.getMapDir(basedir, info.getEncodedName(), family.getName());
long lowTimestamp = getLowestTimestamp(fs, mapdir); long lowTimestamp = getLowestTimestamp(fs, mapdir);
lastMajorCompaction = System.currentTimeMillis() - lowTimestamp; lastMajorCompaction = System.currentTimeMillis() - lowTimestamp;
@ -799,10 +801,10 @@ public class HStore implements HConstants {
". Time since last major compaction: " + ". Time since last major compaction: " +
((System.currentTimeMillis() - lowTimestamp)/1000) + " seconds"); ((System.currentTimeMillis() - lowTimestamp)/1000) + " seconds");
} }
majorCompaction = true; doMajorCompaction = true;
} }
} }
if (!majorCompaction && !hasReferences(filesToCompact) && if (!doMajorCompaction && !hasReferences(filesToCompact) &&
filesToCompact.size() < compactionThreshold) { filesToCompact.size() < compactionThreshold) {
return checkSplit(forceSplit); return checkSplit(forceSplit);
} }
@ -828,7 +830,7 @@ public class HStore implements HConstants {
fileSizes[i] = len; fileSizes[i] = len;
totalSize += len; totalSize += len;
} }
if (!majorCompaction && !hasReferences(filesToCompact)) { if (!doMajorCompaction && !hasReferences(filesToCompact)) {
// Here we select files for incremental compaction. // Here we select files for incremental compaction.
// The rule is: if the largest(oldest) one is more than twice the // The rule is: if the largest(oldest) one is more than twice the
// size of the second, skip the largest, and continue to next..., // size of the second, skip the largest, and continue to next...,
@ -895,7 +897,7 @@ public class HStore implements HConstants {
this.compression, this.family.isBloomfilter(), nrows); this.compression, this.family.isBloomfilter(), nrows);
writer.setIndexInterval(family.getMapFileIndexInterval()); writer.setIndexInterval(family.getMapFileIndexInterval());
try { try {
compact(writer, rdrs, majorCompaction); compact(writer, rdrs, doMajorCompaction);
} finally { } finally {
writer.close(); writer.close();
} }
@ -908,7 +910,7 @@ public class HStore implements HConstants {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Completed compaction of " + this.storeNameStr + LOG.debug("Completed compaction of " + this.storeNameStr +
" store size is " + StringUtils.humanReadableInt(storeSize) + " store size is " + StringUtils.humanReadableInt(storeSize) +
(majorCompaction? "": "; time since last major compaction: " + (doMajorCompaction? "": "; time since last major compaction: " +
(lastMajorCompaction/1000) + " seconds")); (lastMajorCompaction/1000) + " seconds"));
} }
} }
@ -1450,6 +1452,7 @@ public class HStore implements HConstants {
* @return Found row * @return Found row
* @throws IOException * @throws IOException
*/ */
@SuppressWarnings("unchecked")
byte [] getRowKeyAtOrBefore(final byte [] row) byte [] getRowKeyAtOrBefore(final byte [] row)
throws IOException{ throws IOException{
// Map of HStoreKeys that are candidates for holding the row key that // Map of HStoreKeys that are candidates for holding the row key that

View File

@ -0,0 +1,60 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Chore;
/**
* LogFlusher is a Chore that wakes every threadWakeInterval and calls
* the HLog to do an optional sync if there are unflushed entries, and the
* optionalFlushInterval has passed since the last flush.
*/
public class LogFlusher extends Chore {
static final Log LOG = LogFactory.getLog(LogFlusher.class);
private final AtomicReference<HLog> log =
new AtomicReference<HLog>(null);
LogFlusher(final int period, final AtomicBoolean stop) {
super(period, stop);
}
void setHLog(HLog log) {
synchronized (log) {
this.log.set(log);
}
}
@Override
protected void chore() {
synchronized (log) {
HLog hlog = log.get();
if (hlog != null) {
hlog.optionalSync();
}
}
}
}

View File

@ -27,7 +27,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.RemoteExceptionHandler;
/** Runs periodically to determine if the HLog should be rolled */ /**
* Runs periodically to determine if the HLog should be rolled
*
* NOTE: This class extends Thread rather than Chore because the sleep time
* can be interrupted when there is something to do, rather than the Chore
* sleep time which is invariant.
*/
class LogRoller extends Thread implements LogRollListener { class LogRoller extends Thread implements LogRollListener {
static final Log LOG = LogFactory.getLog(LogRoller.class); static final Log LOG = LogFactory.getLog(LogRoller.class);
private final ReentrantLock rollLock = new ReentrantLock(); private final ReentrantLock rollLock = new ReentrantLock();

View File

@ -37,10 +37,15 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler;
/** /**
* Thread that flushes cache on request * Thread that flushes cache on request
*
* NOTE: This class extends Thread rather than Chore because the sleep time
* can be interrupted when there is something to do, rather than the Chore
* sleep time which is invariant.
*
* @see FlushRequester * @see FlushRequester
*/ */
class Flusher extends Thread implements FlushRequester { class MemcacheFlusher extends Thread implements FlushRequester {
static final Log LOG = LogFactory.getLog(Flusher.class); static final Log LOG = LogFactory.getLog(MemcacheFlusher.class);
private final BlockingQueue<HRegion> flushQueue = private final BlockingQueue<HRegion> flushQueue =
new LinkedBlockingQueue<HRegion>(); new LinkedBlockingQueue<HRegion>();
@ -57,7 +62,7 @@ class Flusher extends Thread implements FlushRequester {
* @param conf * @param conf
* @param server * @param server
*/ */
public Flusher(final HBaseConfiguration conf, final HRegionServer server) { public MemcacheFlusher(final HBaseConfiguration conf, final HRegionServer server) {
super(); super();
this.server = server; this.server = server;
threadWakeFrequency = conf.getLong( threadWakeFrequency = conf.getLong(