HADOOP-2139 (phase 2) Make region server more event driven

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@597959 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-11-25 07:17:38 +00:00
parent 9052780e2d
commit cbe167c981
25 changed files with 1192 additions and 802 deletions

View File

@ -42,6 +42,7 @@ Trunk (unreleased changes)
HADOOP-2176 Htable.deleteAll documentation is ambiguous HADOOP-2176 Htable.deleteAll documentation is ambiguous
HADOOP-2139 (phase 1) Increase parallelism in region servers. HADOOP-2139 (phase 1) Increase parallelism in region servers.
HADOOP-2267 [Hbase Shell] Change the prompt's title from 'hbase' to 'hql'. HADOOP-2267 [Hbase Shell] Change the prompt's title from 'hbase' to 'hql'.
HADOOP-2139 (phase 2) Make region server more event driven
Release 0.15.1 Release 0.15.1
Branch 0.15 Branch 0.15

View File

@ -143,6 +143,14 @@
hbase.server.thread.wakefrequency. hbase.server.thread.wakefrequency.
</description> </description>
</property> </property>
<property>
<name>hbase.regionserver.optionalcacheflushinterval</name>
<value>60000</value>
<description>
Amount of time to wait since the last time a region was flushed before
invoking an optional cache flush. Default 60,000.
</description>
</property>
<property> <property>
<name>hbase.hregion.memcache.flush.size</name> <name>hbase.hregion.memcache.flush.size</name>
<value>16777216</value> <value>16777216</value>

View File

@ -0,0 +1,36 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
/**
* Implementors of this interface want to be notified when an HRegion
* determines that a cache flush is needed. A CacheFlushListener (or null)
* must be passed to the HRegion constructor.
*/
public interface CacheFlushListener {
/**
* Tell the listener the cache needs to be flushed.
*
* @param region the HRegion requesting the cache flush
*/
void flushRequested(HRegion region);
}

View File

@ -22,10 +22,13 @@ package org.apache.hadoop.hbase;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -89,7 +92,9 @@ public class HLog implements HConstants {
final FileSystem fs; final FileSystem fs;
final Path dir; final Path dir;
final Configuration conf; final Configuration conf;
final LogRollListener listener;
final long threadWakeFrequency; final long threadWakeFrequency;
private final int maxlogentries;
/* /*
* Current log file. * Current log file.
@ -99,12 +104,13 @@ public class HLog implements HConstants {
/* /*
* Map of all log files but the current one. * Map of all log files but the current one.
*/ */
final TreeMap<Long, Path> outputfiles = new TreeMap<Long, Path>(); final SortedMap<Long, Path> outputfiles =
Collections.synchronizedSortedMap(new TreeMap<Long, Path>());
/* /*
* Map of region to last sequence/edit id. * Map of region to last sequence/edit id.
*/ */
final Map<Text, Long> lastSeqWritten = new HashMap<Text, Long>(); final Map<Text, Long> lastSeqWritten = new ConcurrentHashMap<Text, Long>();
volatile boolean closed = false; volatile boolean closed = false;
@ -119,6 +125,10 @@ public class HLog implements HConstants {
// synchronized is insufficient because a cache flush spans two method calls. // synchronized is insufficient because a cache flush spans two method calls.
private final Lock cacheFlushLock = new ReentrantLock(); private final Lock cacheFlushLock = new ReentrantLock();
// We synchronize on updateLock to prevent updates and to prevent a log roll
// during an update
private final Integer updateLock = new Integer(0);
/** /**
* Split up a bunch of log files, that are no longer being written to, into * Split up a bunch of log files, that are no longer being written to, into
* new files, one per region. Delete the old log files when finished. * new files, one per region. Delete the old log files when finished.
@ -207,12 +217,15 @@ public class HLog implements HConstants {
* @param conf * @param conf
* @throws IOException * @throws IOException
*/ */
HLog(final FileSystem fs, final Path dir, final Configuration conf) HLog(final FileSystem fs, final Path dir, final Configuration conf,
throws IOException { final LogRollListener listener) throws IOException {
this.fs = fs; this.fs = fs;
this.dir = dir; this.dir = dir;
this.conf = conf; this.conf = conf;
this.listener = listener;
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
this.maxlogentries =
conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000);
if (fs.exists(dir)) { if (fs.exists(dir)) {
throw new IOException("Target HLog directory already exists: " + dir); throw new IOException("Target HLog directory already exists: " + dir);
} }
@ -256,28 +269,13 @@ public class HLog implements HConstants {
* *
* @throws IOException * @throws IOException
*/ */
synchronized void rollWriter() throws IOException { void rollWriter() throws IOException {
boolean locked = false; this.cacheFlushLock.lock();
while (!locked && !closed) {
if (this.cacheFlushLock.tryLock()) {
locked = true;
break;
}
try { try {
this.wait(threadWakeFrequency);
} catch (InterruptedException e) {
// continue
}
}
if (closed) { if (closed) {
if (locked) { return;
this.cacheFlushLock.unlock();
} }
throw new IOException("Cannot roll log; log is closed"); synchronized (updateLock) {
}
// If we get here we have locked out both cache flushes and appends
try {
if (this.writer != null) { if (this.writer != null) {
// Close the current writer, get a new one. // Close the current writer, get a new one.
this.writer.close(); this.writer.close();
@ -311,14 +309,13 @@ public class HLog implements HConstants {
} else { } else {
// Get oldest edit/sequence id. If logs are older than this id, // Get oldest edit/sequence id. If logs are older than this id,
// then safe to remove. // then safe to remove.
Long oldestOutstandingSeqNum =
Collections.min(this.lastSeqWritten.values());
// Get the set of all log files whose final ID is older than or
// equal to the oldest pending region operation
TreeSet<Long> sequenceNumbers = TreeSet<Long> sequenceNumbers =
new TreeSet<Long>(this.lastSeqWritten.values()); new TreeSet<Long>(this.outputfiles.headMap(
long oldestOutstandingSeqNum = sequenceNumbers.first().longValue(); (oldestOutstandingSeqNum + Long.valueOf(1L))).keySet());
// Get the set of all log files whose final ID is older than the
// oldest pending region operation
sequenceNumbers.clear();
sequenceNumbers.addAll(this.outputfiles.headMap(
Long.valueOf(oldestOutstandingSeqNum)).keySet());
// Now remove old log files (if any) // Now remove old log files (if any)
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
// Find region associated with oldest key -- helps debugging. // Find region associated with oldest key -- helps debugging.
@ -330,8 +327,8 @@ public class HLog implements HConstants {
} }
} }
LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " + LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " +
"using oldest outstanding seqnum of " + oldestOutstandingSeqNum + "using oldest outstanding seqnum of " +
" from region " + oldestRegion); oldestOutstandingSeqNum + " from region " + oldestRegion);
} }
if (sequenceNumbers.size() > 0) { if (sequenceNumbers.size() > 0) {
for (Long seq : sequenceNumbers) { for (Long seq : sequenceNumbers) {
@ -341,13 +338,13 @@ public class HLog implements HConstants {
} }
} }
this.numEntries = 0; this.numEntries = 0;
}
} finally { } finally {
this.cacheFlushLock.unlock(); this.cacheFlushLock.unlock();
} }
} }
private void deleteLogFile(final Path p, final Long seqno) private void deleteLogFile(final Path p, final Long seqno) throws IOException {
throws IOException {
LOG.info("removing old log file " + p.toString() + LOG.info("removing old log file " + p.toString() +
" whose highest sequence/edit id is " + seqno); " whose highest sequence/edit id is " + seqno);
this.fs.delete(p); this.fs.delete(p);
@ -367,7 +364,7 @@ public class HLog implements HConstants {
* *
* @throws IOException * @throws IOException
*/ */
synchronized void closeAndDelete() throws IOException { void closeAndDelete() throws IOException {
close(); close();
fs.delete(dir); fs.delete(dir);
} }
@ -377,13 +374,20 @@ public class HLog implements HConstants {
* *
* @throws IOException * @throws IOException
*/ */
synchronized void close() throws IOException { void close() throws IOException {
cacheFlushLock.lock();
try {
synchronized (updateLock) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("closing log writer in " + this.dir.toString()); LOG.debug("closing log writer in " + this.dir.toString());
} }
this.writer.close(); this.writer.close();
this.closed = true; this.closed = true;
} }
} finally {
cacheFlushLock.unlock();
}
}
/** /**
* Append a set of edits to the log. Log edits are keyed by regionName, * Append a set of edits to the log. Log edits are keyed by regionName,
@ -409,12 +413,13 @@ public class HLog implements HConstants {
* @param timestamp * @param timestamp
* @throws IOException * @throws IOException
*/ */
synchronized void append(Text regionName, Text tableName, void append(Text regionName, Text tableName,
TreeMap<HStoreKey, byte[]> edits) throws IOException { TreeMap<HStoreKey, byte[]> edits) throws IOException {
if (closed) { if (closed) {
throw new IOException("Cannot append; log is closed"); throw new IOException("Cannot append; log is closed");
} }
synchronized (updateLock) {
long seqNum[] = obtainSeqNum(edits.size()); long seqNum[] = obtainSeqNum(edits.size());
// The 'lastSeqWritten' map holds the sequence number of the oldest // The 'lastSeqWritten' map holds the sequence number of the oldest
// write for each region. When the cache is flushed, the entry for the // write for each region. When the cache is flushed, the entry for the
@ -434,6 +439,12 @@ public class HLog implements HConstants {
this.numEntries++; this.numEntries++;
} }
} }
if (this.numEntries > this.maxlogentries) {
if (listener != null) {
listener.logRollRequested();
}
}
}
/** @return How many items have been added to the log */ /** @return How many items have been added to the log */
int getNumEntries() { int getNumEntries() {
@ -451,6 +462,11 @@ public class HLog implements HConstants {
return value; return value;
} }
/** @return the number of log files in use */
int getNumLogFiles() {
return outputfiles.size();
}
/** /**
* Obtain a specified number of sequence numbers * Obtain a specified number of sequence numbers
* *
@ -487,20 +503,21 @@ public class HLog implements HConstants {
/** /**
* Complete the cache flush * Complete the cache flush
* *
* Protected by this and cacheFlushLock * Protected by cacheFlushLock
* *
* @param regionName * @param regionName
* @param tableName * @param tableName
* @param logSeqId * @param logSeqId
* @throws IOException * @throws IOException
*/ */
synchronized void completeCacheFlush(final Text regionName, void completeCacheFlush(final Text regionName, final Text tableName,
final Text tableName, final long logSeqId) final long logSeqId) throws IOException {
throws IOException {
try { try {
if (this.closed) { if (this.closed) {
return; return;
} }
synchronized (updateLock) {
this.writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId), this.writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
new HLogEdit(HLog.METACOLUMN, HLogEdit.completeCacheFlush.get(), new HLogEdit(HLog.METACOLUMN, HLogEdit.completeCacheFlush.get(),
System.currentTimeMillis())); System.currentTimeMillis()));
@ -509,21 +526,20 @@ public class HLog implements HConstants {
if (seq != null && logSeqId >= seq.longValue()) { if (seq != null && logSeqId >= seq.longValue()) {
this.lastSeqWritten.remove(regionName); this.lastSeqWritten.remove(regionName);
} }
}
} finally { } finally {
this.cacheFlushLock.unlock(); this.cacheFlushLock.unlock();
notifyAll(); // wake up the log roller if it is waiting
} }
} }
/** /**
* Abort a cache flush. This method will clear waits on * Abort a cache flush.
* {@link #insideCacheFlush}. Call if the flush fails. Note that the only * Call if the flush fails. Note that the only recovery for an aborted flush
* recovery for an aborted flush currently is a restart of the regionserver so * currently is a restart of the regionserver so the snapshot content dropped
* the snapshot content dropped by the failure gets restored to the memcache. * by the failure gets restored to the memcache.
*/ */
synchronized void abortCacheFlush() { void abortCacheFlush() {
this.cacheFlushLock.unlock(); this.cacheFlushLock.unlock();
notifyAll();
} }
private static void usage() { private static void usage() {

View File

@ -30,6 +30,7 @@ import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.SortedSet;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.TreeMap; import java.util.TreeMap;
@ -70,7 +71,8 @@ import org.apache.hadoop.ipc.Server;
* There is only one HMaster for a single HBase deployment. * There is only one HMaster for a single HBase deployment.
*/ */
public class HMaster extends Thread implements HConstants, HMasterInterface, public class HMaster extends Thread implements HConstants, HMasterInterface,
HMasterRegionInterface { HMasterRegionInterface {
static final Log LOG = LogFactory.getLog(HMaster.class.getName()); static final Log LOG = LogFactory.getLog(HMaster.class.getName());
/** {@inheritDoc} */ /** {@inheritDoc} */
@ -100,8 +102,10 @@ HMasterRegionInterface {
int numRetries; int numRetries;
long maxRegionOpenTime; long maxRegionOpenTime;
DelayQueue<PendingServerShutdown> shutdownQueue; DelayQueue<ProcessServerShutdown> shutdownQueue =
BlockingQueue<PendingOperation> msgQueue; new DelayQueue<ProcessServerShutdown>();
BlockingQueue<RegionServerOperation> msgQueue =
new LinkedBlockingQueue<RegionServerOperation>();
int leaseTimeout; int leaseTimeout;
private Leases serverLeases; private Leases serverLeases;
@ -113,7 +117,7 @@ HMasterRegionInterface {
int metaRescanInterval; int metaRescanInterval;
final AtomicReference<HServerAddress> rootRegionLocation = final AtomicReference<HServerAddress> rootRegionLocation =
new AtomicReference<HServerAddress>(); new AtomicReference<HServerAddress>(null);
Lock splitLogLock = new ReentrantLock(); Lock splitLogLock = new ReentrantLock();
@ -409,6 +413,8 @@ HMasterRegionInterface {
protected void checkAssigned(final HRegionInfo info, protected void checkAssigned(final HRegionInfo info,
final String serverName, final long startCode) throws IOException { final String serverName, final long startCode) throws IOException {
synchronized (serversToServerInfo) {
// Skip region - if ... // Skip region - if ...
if(info.isOffline() // offline if(info.isOffline() // offline
|| killedRegions.contains(info.getRegionName()) // queued for offline || killedRegions.contains(info.getRegionName()) // queued for offline
@ -431,13 +437,11 @@ HMasterRegionInterface {
} }
return; return;
} }
synchronized (serversToServerInfo) {
storedInfo = serversToServerInfo.get(serverName); storedInfo = serversToServerInfo.get(serverName);
if (deadServers.contains(serverName)) { if (deadServers.contains(serverName)) {
deadServer = true; deadServer = true;
} }
} }
}
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Checking " + info.getRegionName() + " is assigned"); LOG.debug("Checking " + info.getRegionName() + " is assigned");
} }
@ -466,7 +470,7 @@ HMasterRegionInterface {
// Recover the region server's log if there is one. // Recover the region server's log if there is one.
// This is only done from here if we are restarting and there is stale // This is only done from here if we are restarting and there is stale
// data in the meta region. Once we are on-line, dead server log // data in the meta region. Once we are on-line, dead server log
// recovery is handled by lease expiration and PendingServerShutdown // recovery is handled by lease expiration and ProcessServerShutdown
if (serverName.length() != 0) { if (serverName.length() != 0) {
StringBuilder dirName = new StringBuilder("log_"); StringBuilder dirName = new StringBuilder("log_");
dirName.append(serverName.replace(":", "_")); dirName.append(serverName.replace(":", "_"));
@ -494,6 +498,7 @@ HMasterRegionInterface {
} }
} }
} }
}
volatile boolean rootScanned; volatile boolean rootScanned;
@ -505,7 +510,6 @@ HMasterRegionInterface {
} }
private void scanRoot() { private void scanRoot() {
boolean succeeded = false;
int tries = 0; int tries = 0;
while (!closed.get() && tries < numRetries) { while (!closed.get() && tries < numRetries) {
synchronized (rootRegionLocation) { synchronized (rootRegionLocation) {
@ -530,7 +534,6 @@ HMasterRegionInterface {
scanRegion(new MetaRegion(rootRegionLocation.get(), scanRegion(new MetaRegion(rootRegionLocation.get(),
HRegionInfo.rootRegionInfo.getRegionName(), null)); HRegionInfo.rootRegionInfo.getRegionName(), null));
} }
succeeded = true;
break; break;
} catch (IOException e) { } catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e); e = RemoteExceptionHandler.checkIOException(e);
@ -554,12 +557,6 @@ HMasterRegionInterface {
} }
sleeper.sleep(); sleeper.sleep();
} }
if (!succeeded) {
// We tried numretries to reach root and failed. Is it gone.
// Currently we just flounder. Should we reallocate root?
// This would be catastrophic?
// unassignRootRegion();
}
} }
@Override @Override
@ -756,7 +753,9 @@ HMasterRegionInterface {
@Override @Override
protected void maintenanceScan() { protected void maintenanceScan() {
ArrayList<MetaRegion> regions = new ArrayList<MetaRegion>(); ArrayList<MetaRegion> regions = new ArrayList<MetaRegion>();
synchronized (onlineMetaRegions) {
regions.addAll(onlineMetaRegions.values()); regions.addAll(onlineMetaRegions.values());
}
for (MetaRegion r: regions) { for (MetaRegion r: regions) {
scanOneMetaRegion(r); scanOneMetaRegion(r);
} }
@ -801,6 +800,26 @@ HMasterRegionInterface {
MetaScanner metaScannerThread; MetaScanner metaScannerThread;
Integer metaScannerLock = new Integer(0); Integer metaScannerLock = new Integer(0);
/////////////////////////////////////////////////////////////////////////////
//
// Access to all of the following objects MUST be synchronized on
// serversToServerInfo
/** The map of known server names to server info */
final Map<String, HServerInfo> serversToServerInfo =
new HashMap<String, HServerInfo>();
/** Set of known dead servers */
final Set<String> deadServers = new HashSet<String>();
/** SortedMap server load -> Set of server names */
final SortedMap<HServerLoad, Set<String>> loadToServers =
new TreeMap<HServerLoad, Set<String>>();
/** Map of server names -> server load */
final Map<String, HServerLoad> serversToLoad =
new HashMap<String, HServerLoad>();
/** /**
* The 'unassignedRegions' table maps from a region name to a HRegionInfo * The 'unassignedRegions' table maps from a region name to a HRegionInfo
* record, which includes the region's table, its id, and its start/end keys. * record, which includes the region's table, its id, and its start/end keys.
@ -812,55 +831,39 @@ HMasterRegionInterface {
* the region has been deployed. * the region has been deployed.
*/ */
final SortedMap<Text, HRegionInfo> unassignedRegions = final SortedMap<Text, HRegionInfo> unassignedRegions =
Collections.synchronizedSortedMap(new TreeMap<Text, HRegionInfo>()); new TreeMap<Text, HRegionInfo>();
/** /**
* The 'assignAttempts' table maps from regions to a timestamp that indicates * The 'assignAttempts' table maps from regions to a timestamp that indicates
* the last time we *tried* to assign the region to a RegionServer. If the * the last time we *tried* to assign the region to a RegionServer. If the
* timestamp is out of date, then we can try to reassign it. * timestamp is out of date, then we can try to reassign it.
*/ */
final Map<Text, Long> assignAttempts = final Map<Text, Long> assignAttempts = new HashMap<Text, Long>();
Collections.synchronizedMap(new HashMap<Text, Long>());
/** /**
* Regions that have been assigned, and the server has reported that it has * Regions that have been assigned, and the server has reported that it has
* started serving it, but that we have not yet recorded in the meta table. * started serving it, but that we have not yet recorded in the meta table.
*/ */
Set<Text> pendingRegions; final Set<Text> pendingRegions = new HashSet<Text>();
/** /**
* The 'killList' is a list of regions that are going to be closed, but not * The 'killList' is a list of regions that are going to be closed, but not
* reopened. * reopened.
*/ */
Map<String, HashMap<Text, HRegionInfo>> killList; final Map<String, HashMap<Text, HRegionInfo>> killList =
new HashMap<String, HashMap<Text, HRegionInfo>>();
/** 'killedRegions' contains regions that are in the process of being closed */ /** 'killedRegions' contains regions that are in the process of being closed */
Set<Text> killedRegions; final Set<Text> killedRegions = new HashSet<Text>();
/** /**
* 'regionsToDelete' contains regions that need to be deleted, but cannot be * 'regionsToDelete' contains regions that need to be deleted, but cannot be
* until the region server closes it * until the region server closes it
*/ */
Set<Text> regionsToDelete; final Set<Text> regionsToDelete = new HashSet<Text>();
/** //
* The map of known server names to server info /////////////////////////////////////////////////////////////////////////////
*
* Access to this map and loadToServers and serversToLoad must be synchronized
* on this object
*/
final Map<String, HServerInfo> serversToServerInfo =
new HashMap<String, HServerInfo>();
/** Set of known dead servers */
final Set<String> deadServers =
Collections.synchronizedSet(new HashSet<String>());
/** SortedMap server load -> Set of server names */
SortedMap<HServerLoad, Set<String>> loadToServers;
/** Map of server names -> server load */
Map<String, HServerLoad> serversToLoad;
/** Build the HMaster out of a raw configuration item. /** Build the HMaster out of a raw configuration item.
* *
@ -883,6 +886,7 @@ HMasterRegionInterface {
*/ */
public HMaster(Path dir, HServerAddress address, HBaseConfiguration conf) public HMaster(Path dir, HServerAddress address, HBaseConfiguration conf)
throws IOException { throws IOException {
this.fsOk = true; this.fsOk = true;
this.dir = dir; this.dir = dir;
this.conf = conf; this.conf = conf;
@ -929,9 +933,6 @@ HMasterRegionInterface {
this.maxRegionOpenTime = this.maxRegionOpenTime =
conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000); conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
this.shutdownQueue = new DelayQueue<PendingServerShutdown>();
this.msgQueue = new LinkedBlockingQueue<PendingOperation>();
this.leaseTimeout = conf.getInt("hbase.master.lease.period", 30 * 1000); this.leaseTimeout = conf.getInt("hbase.master.lease.period", 30 * 1000);
this.serverLeases = new Leases(this.leaseTimeout, this.serverLeases = new Leases(this.leaseTimeout,
conf.getInt("hbase.master.lease.thread.wakefrequency", 15 * 1000)); conf.getInt("hbase.master.lease.thread.wakefrequency", 15 * 1000));
@ -955,27 +956,10 @@ HMasterRegionInterface {
// Scans the meta table // Scans the meta table
this.initialMetaScanComplete = false; this.initialMetaScanComplete = false;
this.metaScannerThread = new MetaScanner(); this.metaScannerThread = new MetaScanner();
unassignRootRegion(); unassignRootRegion();
this.pendingRegions =
Collections.synchronizedSet(new HashSet<Text>());
this.killList =
Collections.synchronizedMap(
new HashMap<String, HashMap<Text, HRegionInfo>>());
this.killedRegions =
Collections.synchronizedSet(new HashSet<Text>());
this.regionsToDelete =
Collections.synchronizedSet(new HashSet<Text>());
this.loadToServers = new TreeMap<HServerLoad, Set<String>>();
this.serversToLoad = new HashMap<String, HServerLoad>();
this.sleeper = new Sleeper(this.threadWakeFrequency, this.closed); this.sleeper = new Sleeper(this.threadWakeFrequency, this.closed);
// We're almost open for business // We're almost open for business
@ -989,6 +973,9 @@ HMasterRegionInterface {
* without reporting in. Currently, we just flounder and never recover. We * without reporting in. Currently, we just flounder and never recover. We
* could 'notice' dead region server in root scanner -- if we failed access * could 'notice' dead region server in root scanner -- if we failed access
* multiple times -- but reassigning root is catastrophic. * multiple times -- but reassigning root is catastrophic.
*
* Note: This method must be called from inside a synchronized block on
* serversToServerInfo
*/ */
void unassignRootRegion() { void unassignRootRegion() {
this.rootRegionLocation.set(null); this.rootRegionLocation.set(null);
@ -996,7 +983,6 @@ HMasterRegionInterface {
HRegionInfo.rootRegionInfo); HRegionInfo.rootRegionInfo);
this.assignAttempts.put(HRegionInfo.rootRegionInfo.getRegionName(), this.assignAttempts.put(HRegionInfo.rootRegionInfo.getRegionName(),
Long.valueOf(0L)); Long.valueOf(0L));
// TODO: If the old root region server had a log, it needs splitting.
} }
/** /**
@ -1065,8 +1051,12 @@ HMasterRegionInterface {
* Main processing loop * Main processing loop
*/ */
try { try {
for (PendingOperation op = null; !closed.get(); ) { for (RegionServerOperation op = null; !closed.get(); ) {
if (rootRegionLocation.get() != null) {
// We can't process server shutdowns unless the root region is online
op = this.shutdownQueue.poll(); op = this.shutdownQueue.poll();
}
if (op == null ) { if (op == null ) {
try { try {
op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
@ -1239,32 +1229,36 @@ HMasterRegionInterface {
@SuppressWarnings("unused") @SuppressWarnings("unused")
public MapWritable regionServerStartup(HServerInfo serverInfo) public MapWritable regionServerStartup(HServerInfo serverInfo)
throws IOException { throws IOException {
String s = serverInfo.getServerAddress().toString().trim(); String s = serverInfo.getServerAddress().toString().trim();
HServerInfo storedInfo = null;
LOG.info("received start message from: " + s); LOG.info("received start message from: " + s);
// If we get the startup message but there's an old server by that
// name, then we can timeout the old one right away and register
// the new one.
synchronized (serversToServerInfo) { synchronized (serversToServerInfo) {
storedInfo = serversToServerInfo.remove(s);
HServerLoad load = serversToLoad.remove(s); HServerLoad load = serversToLoad.remove(s);
if (load != null) { if (load != null) {
// The startup message was from a known server.
// Remove stale information about the server's load.
Set<String> servers = loadToServers.get(load); Set<String> servers = loadToServers.get(load);
if (servers != null) { if (servers != null) {
servers.remove(s); servers.remove(s);
loadToServers.put(load, servers); loadToServers.put(load, servers);
} }
} }
serversToServerInfo.notifyAll();
} HServerInfo storedInfo = serversToServerInfo.remove(s);
if (storedInfo != null && !closed.get()) { if (storedInfo != null && !closed.get()) {
shutdownQueue.put(new PendingServerShutdown(storedInfo)); // The startup message was from a know server with the same name.
// Timeout the old one right away.
HServerAddress root = rootRegionLocation.get();
if (root != null && root.equals(storedInfo.getServerAddress())) {
unassignRootRegion();
}
shutdownQueue.put(new ProcessServerShutdown(storedInfo));
} }
// Either way, record the new server // record new server
synchronized (serversToServerInfo) {
HServerLoad load = new HServerLoad(); load = new HServerLoad();
serverInfo.setLoad(load); serverInfo.setLoad(load);
serversToServerInfo.put(s, serverInfo); serversToServerInfo.put(s, serverInfo);
serversToLoad.put(s, load); serversToLoad.put(s, load);
@ -1274,6 +1268,7 @@ HMasterRegionInterface {
} }
servers.add(s); servers.add(s);
loadToServers.put(load, servers); loadToServers.put(load, servers);
serversToServerInfo.notifyAll();
} }
if (!closed.get()) { if (!closed.get()) {
@ -1332,11 +1327,13 @@ HMasterRegionInterface {
onlineMetaRegions.remove(info.getStartKey()); onlineMetaRegions.remove(info.getStartKey());
} }
synchronized (serversToServerInfo) {
this.unassignedRegions.put(info.getRegionName(), info); this.unassignedRegions.put(info.getRegionName(), info);
this.assignAttempts.put(info.getRegionName(), Long.valueOf(0L)); this.assignAttempts.put(info.getRegionName(), Long.valueOf(0L));
} }
} }
} }
}
// We don't need to return anything to the server because it isn't // We don't need to return anything to the server because it isn't
// going to do any more work. // going to do any more work.
@ -1431,6 +1428,10 @@ HMasterRegionInterface {
boolean leaseCancelled = false; boolean leaseCancelled = false;
synchronized (serversToServerInfo) { synchronized (serversToServerInfo) {
HServerInfo info = serversToServerInfo.remove(serverName); HServerInfo info = serversToServerInfo.remove(serverName);
if (rootRegionLocation.get() != null &&
info.getServerAddress().equals(rootRegionLocation.get())) {
unassignRootRegion();
}
if (info != null) { if (info != null) {
// Only cancel lease and update load information once. // Only cancel lease and update load information once.
// This method can be called a couple of times during shutdown. // This method can be called a couple of times during shutdown.
@ -1464,13 +1465,17 @@ HMasterRegionInterface {
ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>(); ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>();
String serverName = info.getServerAddress().toString(); String serverName = info.getServerAddress().toString();
HashMap<Text, HRegionInfo> regionsToKill = killList.remove(serverName); HashMap<Text, HRegionInfo> regionsToKill = null;
synchronized (serversToServerInfo) {
regionsToKill = killList.remove(serverName);
}
// Get reports on what the RegionServer did. // Get reports on what the RegionServer did.
for (int i = 0; i < incomingMsgs.length; i++) { for (int i = 0; i < incomingMsgs.length; i++) {
HRegionInfo region = incomingMsgs[i].getRegionInfo(); HRegionInfo region = incomingMsgs[i].getRegionInfo();
synchronized (serversToServerInfo) {
switch (incomingMsgs[i].getMsg()) { switch (incomingMsgs[i].getMsg()) {
case HMsg.MSG_REPORT_OPEN: case HMsg.MSG_REPORT_OPEN:
@ -1500,8 +1505,8 @@ HMasterRegionInterface {
HRegionInfo.rootRegionInfo.getRegionName()) == 0) { HRegionInfo.rootRegionInfo.getRegionName()) == 0) {
// Store the Root Region location (in memory) // Store the Root Region location (in memory)
synchronized (rootRegionLocation) { synchronized (rootRegionLocation) {
this.rootRegionLocation. this.rootRegionLocation.set(
set(new HServerAddress(info.getServerAddress())); new HServerAddress(info.getServerAddress()));
this.rootRegionLocation.notifyAll(); this.rootRegionLocation.notifyAll();
} }
break; break;
@ -1515,7 +1520,7 @@ HMasterRegionInterface {
// Queue up an update to note the region location. // Queue up an update to note the region location.
try { try {
msgQueue.put(new PendingOpenReport(info, region)); msgQueue.put(new ProcessRegionOpen(info, region));
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException("Putting into msgQueue was interrupted.", e); throw new RuntimeException("Putting into msgQueue was interrupted.", e);
} }
@ -1531,9 +1536,7 @@ HMasterRegionInterface {
// Root region // Root region
rootRegionLocation.set(null); unassignRootRegion();
unassignedRegions.put(region.getRegionName(), region);
assignAttempts.put(region.getRegionName(), Long.valueOf(0L));
} else { } else {
boolean reassignRegion = true; boolean reassignRegion = true;
@ -1556,7 +1559,7 @@ HMasterRegionInterface {
assignAttempts.remove(region.getRegionName()); assignAttempts.remove(region.getRegionName());
try { try {
msgQueue.put(new PendingCloseReport(region, reassignRegion, msgQueue.put(new ProcessRegionClose(region, reassignRegion,
deleteRegion)); deleteRegion));
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -1594,15 +1597,18 @@ HMasterRegionInterface {
incomingMsgs[i].getMsg()); incomingMsgs[i].getMsg());
} }
} }
}
// Process the kill list // Process the kill list
synchronized (serversToServerInfo) {
if (regionsToKill != null) { if (regionsToKill != null) {
for (HRegionInfo i: regionsToKill.values()) { for (HRegionInfo i: regionsToKill.values()) {
returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE, i)); returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE, i));
killedRegions.add(i.getRegionName()); killedRegions.add(i.getRegionName());
} }
} }
}
// Figure out what the RegionServer ought to do, and write back. // Figure out what the RegionServer ought to do, and write back.
assignRegions(info, serverName, returnMsgs); assignRegions(info, serverName, returnMsgs);
@ -1617,10 +1623,18 @@ HMasterRegionInterface {
* @param serverName * @param serverName
* @param returnMsgs * @param returnMsgs
*/ */
private synchronized void assignRegions(HServerInfo info, String serverName, private void assignRegions(HServerInfo info, String serverName,
ArrayList<HMsg> returnMsgs) { ArrayList<HMsg> returnMsgs) {
TreeSet<Text> regionsToAssign = getRegionsToAssign(); long now = System.currentTimeMillis();
SortedSet<Text> regionsToAssign = new TreeSet<Text>();
synchronized (serversToServerInfo) {
for (Map.Entry<Text, Long> e: this.assignAttempts.entrySet()) {
long diff = now - e.getValue().longValue();
if (diff > this.maxRegionOpenTime) {
regionsToAssign.add(e.getKey());
}
}
int nRegionsToAssign = regionsToAssign.size(); int nRegionsToAssign = regionsToAssign.size();
if (nRegionsToAssign <= 0) { if (nRegionsToAssign <= 0) {
// No regions to assign. Return. // No regions to assign. Return.
@ -1636,7 +1650,6 @@ HMasterRegionInterface {
// Multiple servers in play. // Multiple servers in play.
// We need to allocate regions only to most lightly loaded servers. // We need to allocate regions only to most lightly loaded servers.
HServerLoad thisServersLoad = info.getLoad(); HServerLoad thisServersLoad = info.getLoad();
synchronized (this.serversToServerInfo) {
int nregions = regionsPerServer(nRegionsToAssign, thisServersLoad); int nregions = regionsPerServer(nRegionsToAssign, thisServersLoad);
nRegionsToAssign -= nregions; nRegionsToAssign -= nregions;
if (nRegionsToAssign > 0) { if (nRegionsToAssign > 0) {
@ -1695,7 +1708,7 @@ HMasterRegionInterface {
nregions = nRegionsToAssign; nregions = nRegionsToAssign;
} }
long now = System.currentTimeMillis(); now = System.currentTimeMillis();
for (Text regionName: regionsToAssign) { for (Text regionName: regionsToAssign) {
HRegionInfo regionInfo = this.unassignedRegions.get(regionName); HRegionInfo regionInfo = this.unassignedRegions.get(regionName);
LOG.info("assigning region " + regionName + " to server " + LOG.info("assigning region " + regionName + " to server " +
@ -1714,6 +1727,9 @@ HMasterRegionInterface {
* @param nRegionsToAssign * @param nRegionsToAssign
* @param thisServersLoad * @param thisServersLoad
* @return How many regions we can assign to more lightly loaded servers * @return How many regions we can assign to more lightly loaded servers
*
* Note: this method MUST be called from inside a synchronized block on
* serversToServerInfo
*/ */
private int regionsPerServer(final int nRegionsToAssign, private int regionsPerServer(final int nRegionsToAssign,
final HServerLoad thisServersLoad) { final HServerLoad thisServersLoad) {
@ -1744,10 +1760,11 @@ HMasterRegionInterface {
* @param serverName * @param serverName
* @param returnMsgs * @param returnMsgs
*/ */
private void assignRegionsToOneServer(final TreeSet<Text> regionsToAssign, private void assignRegionsToOneServer(final SortedSet<Text> regionsToAssign,
final String serverName, final ArrayList<HMsg> returnMsgs) { final String serverName, final ArrayList<HMsg> returnMsgs) {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
for (Text regionName: regionsToAssign) { for (Text regionName: regionsToAssign) {
synchronized (serversToServerInfo) {
HRegionInfo regionInfo = this.unassignedRegions.get(regionName); HRegionInfo regionInfo = this.unassignedRegions.get(regionName);
LOG.info("assigning region " + regionName + " to the only server " + LOG.info("assigning region " + regionName + " to the only server " +
serverName); serverName);
@ -1755,28 +1772,14 @@ HMasterRegionInterface {
returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo)); returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
} }
} }
/*
* @return List of regions to assign.
*/
private TreeSet<Text> getRegionsToAssign() {
long now = System.currentTimeMillis();
TreeSet<Text> regionsToAssign = new TreeSet<Text>();
for (Map.Entry<Text, Long> e: this.assignAttempts.entrySet()) {
long diff = now - e.getValue().longValue();
if (diff > this.maxRegionOpenTime) {
regionsToAssign.add(e.getKey());
}
}
return regionsToAssign;
} }
/* /*
* Some internal classes to manage msg-passing and client operations * Some internal classes to manage msg-passing and region server operations
*/ */
private abstract class PendingOperation { private abstract class RegionServerOperation {
PendingOperation() { RegionServerOperation() {
super(); super();
} }
@ -1788,9 +1791,9 @@ HMasterRegionInterface {
* The region server's log file needs to be split up for each region it was * The region server's log file needs to be split up for each region it was
* serving, and the regions need to get reassigned. * serving, and the regions need to get reassigned.
*/ */
private class PendingServerShutdown extends PendingOperation private class ProcessServerShutdown extends RegionServerOperation
implements Delayed { implements Delayed {
private final long expire; private long expire;
private HServerAddress deadServer; private HServerAddress deadServer;
private String deadServerName; private String deadServerName;
private Path oldLogDir; private Path oldLogDir;
@ -1812,7 +1815,7 @@ HMasterRegionInterface {
} }
} }
PendingServerShutdown(HServerInfo serverInfo) { ProcessServerShutdown(HServerInfo serverInfo) {
super(); super();
this.deadServer = serverInfo.getServerAddress(); this.deadServer = serverInfo.getServerAddress();
this.deadServerName = this.deadServer.toString(); this.deadServerName = this.deadServer.toString();
@ -1846,7 +1849,7 @@ HMasterRegionInterface {
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
public String toString() { public String toString() {
return "PendingServerShutdown of " + this.deadServer.toString(); return "ProcessServerShutdown of " + this.deadServer.toString();
} }
/** Finds regions that the dead region server was serving */ /** Finds regions that the dead region server was serving */
@ -1936,6 +1939,7 @@ HMasterRegionInterface {
ToDoEntry todo = new ToDoEntry(row, info); ToDoEntry todo = new ToDoEntry(row, info);
toDoList.add(todo); toDoList.add(todo);
synchronized (serversToServerInfo) {
if (killList.containsKey(deadServerName)) { if (killList.containsKey(deadServerName)) {
HashMap<Text, HRegionInfo> regionsToKill = HashMap<Text, HRegionInfo> regionsToKill =
killList.get(deadServerName); killList.get(deadServerName);
@ -1964,6 +1968,7 @@ HMasterRegionInterface {
pendingRegions.remove(info.getRegionName()); pendingRegions.remove(info.getRegionName());
} }
} }
}
} finally { } finally {
if(scannerId != -1L) { if(scannerId != -1L) {
try { try {
@ -1994,10 +1999,12 @@ HMasterRegionInterface {
for (Map.Entry<Text, HRegionInfo> e: regions.entrySet()) { for (Map.Entry<Text, HRegionInfo> e: regions.entrySet()) {
Text region = e.getKey(); Text region = e.getKey();
HRegionInfo regionInfo = e.getValue(); HRegionInfo regionInfo = e.getValue();
synchronized (serversToServerInfo) {
unassignedRegions.put(region, regionInfo); unassignedRegions.put(region, regionInfo);
assignAttempts.put(region, Long.valueOf(0L)); assignAttempts.put(region, Long.valueOf(0L));
} }
} }
}
@Override @Override
boolean process() throws IOException { boolean process() throws IOException {
@ -2023,15 +2030,30 @@ HMasterRegionInterface {
} }
if (!rootChecked) { if (!rootChecked) {
if (rootRegionLocation.get() != null && boolean rootRegionUnavailable = false;
deadServer.equals(rootRegionLocation.get())) { if (rootRegionLocation.get() == null) {
rootRegionUnavailable = true;
rootRegionLocation.set(null); } else if (deadServer.equals(rootRegionLocation.get())) {
unassignedRegions.put(HRegionInfo.rootRegionInfo.getRegionName(), // We should never get here because whenever an object of this type
HRegionInfo.rootRegionInfo); // is created, a check is made to see if it is the root server.
// and unassignRootRegion() is called then. However, in the
// unlikely event that we do end up here, let's do the right thing.
synchronized (serversToServerInfo) {
unassignRootRegion();
}
rootRegionUnavailable = true;
}
if (rootRegionUnavailable) {
// We can't do anything until the root region is on-line, put
// us back on the delay queue. Reset the future time at which
// we expect to be released from the DelayQueue we're inserted
// in on lease expiration.
this.expire = System.currentTimeMillis() + leaseTimeout / 2;
shutdownQueue.put(this);
assignAttempts.put(HRegionInfo.rootRegionInfo.getRegionName(), // Return true so run() does not put us back on the msgQueue
Long.valueOf(0L)); return true;
} }
rootChecked = true; rootChecked = true;
} }
@ -2106,6 +2128,7 @@ HMasterRegionInterface {
if (closed.get()) { if (closed.get()) {
return true; return true;
} }
synchronized (onlineMetaRegions) {
for (MetaRegion r: onlineMetaRegions.values()) { for (MetaRegion r: onlineMetaRegions.values()) {
HRegionInterface server = null; HRegionInterface server = null;
@ -2130,7 +2153,10 @@ HMasterRegionInterface {
Thread.currentThread().getName()); Thread.currentThread().getName());
} }
} }
}
synchronized (serversToServerInfo) {
deadServers.remove(deadServerName); deadServers.remove(deadServerName);
}
break; break;
} catch (IOException e) { } catch (IOException e) {
@ -2144,16 +2170,16 @@ HMasterRegionInterface {
} }
/** /**
* PendingCloseReport is instantiated when a region server reports that it * ProcessRegionClose is instantiated when a region server reports that it
* has closed a region. * has closed a region.
*/ */
private class PendingCloseReport extends PendingOperation { private class ProcessRegionClose extends RegionServerOperation {
private HRegionInfo regionInfo; private HRegionInfo regionInfo;
private boolean reassignRegion; private boolean reassignRegion;
private boolean deleteRegion; private boolean deleteRegion;
private boolean rootRegion; private boolean rootRegion;
PendingCloseReport(HRegionInfo regionInfo, boolean reassignRegion, ProcessRegionClose(HRegionInfo regionInfo, boolean reassignRegion,
boolean deleteRegion) { boolean deleteRegion) {
super(); super();
@ -2176,7 +2202,7 @@ HMasterRegionInterface {
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
public String toString() { public String toString() {
return "PendingCloseReport of " + this.regionInfo.getRegionName(); return "ProcessRegionClose of " + this.regionInfo.getRegionName();
} }
@Override @Override
@ -2220,6 +2246,7 @@ HMasterRegionInterface {
} }
MetaRegion r = null; MetaRegion r = null;
synchronized (onlineMetaRegions) {
if (onlineMetaRegions.containsKey(regionInfo.getRegionName())) { if (onlineMetaRegions.containsKey(regionInfo.getRegionName())) {
r = onlineMetaRegions.get(regionInfo.getRegionName()); r = onlineMetaRegions.get(regionInfo.getRegionName());
@ -2227,6 +2254,7 @@ HMasterRegionInterface {
r = onlineMetaRegions.get(onlineMetaRegions.headMap( r = onlineMetaRegions.get(onlineMetaRegions.headMap(
regionInfo.getRegionName()).lastKey()); regionInfo.getRegionName()).lastKey());
} }
}
metaRegionName = r.getRegionName(); metaRegionName = r.getRegionName();
server = connection.getHRegionConnection(r.getServer()); server = connection.getHRegionConnection(r.getServer());
} }
@ -2259,8 +2287,10 @@ HMasterRegionInterface {
if (reassignRegion) { if (reassignRegion) {
LOG.info("reassign region: " + regionInfo.getRegionName()); LOG.info("reassign region: " + regionInfo.getRegionName());
synchronized (serversToServerInfo) {
unassignedRegions.put(regionInfo.getRegionName(), regionInfo); unassignedRegions.put(regionInfo.getRegionName(), regionInfo);
assignAttempts.put(regionInfo.getRegionName(), Long.valueOf(0L)); assignAttempts.put(regionInfo.getRegionName(), Long.valueOf(0L));
}
} else if (deleteRegion) { } else if (deleteRegion) {
try { try {
@ -2277,17 +2307,17 @@ HMasterRegionInterface {
} }
/** /**
* PendingOpenReport is instantiated when a region server reports that it is * ProcessRegionOpen is instantiated when a region server reports that it is
* serving a region. This applies to all meta and user regions except the * serving a region. This applies to all meta and user regions except the
* root region which is handled specially. * root region which is handled specially.
*/ */
private class PendingOpenReport extends PendingOperation { private class ProcessRegionOpen extends RegionServerOperation {
private final boolean rootRegion; private final boolean rootRegion;
private final HRegionInfo region; private final HRegionInfo region;
private final HServerAddress serverAddress; private final HServerAddress serverAddress;
private final byte [] startCode; private final byte [] startCode;
PendingOpenReport(HServerInfo info, HRegionInfo region) ProcessRegionOpen(HServerInfo info, HRegionInfo region)
throws IOException { throws IOException {
// If true, the region which just came on-line is a META region. // If true, the region which just came on-line is a META region.
// We need to look in the ROOT region for its information. Otherwise, // We need to look in the ROOT region for its information. Otherwise,
@ -2345,10 +2375,10 @@ HMasterRegionInterface {
return false; return false;
} }
MetaRegion r = onlineMetaRegions.containsKey(region.getRegionName())? MetaRegion r = onlineMetaRegions.containsKey(region.getRegionName()) ?
onlineMetaRegions.get(region.getRegionName()): onlineMetaRegions.get(region.getRegionName()) :
onlineMetaRegions.get(onlineMetaRegions. onlineMetaRegions.get(onlineMetaRegions.headMap(
headMap(region.getRegionName()).lastKey()); region.getRegionName()).lastKey());
metaRegionName = r.getRegionName(); metaRegionName = r.getRegionName();
server = connection.getHRegionConnection(r.getServer()); server = connection.getHRegionConnection(r.getServer());
} }
@ -2384,7 +2414,9 @@ HMasterRegionInterface {
} }
} }
// If updated successfully, remove from pending list. // If updated successfully, remove from pending list.
synchronized (serversToServerInfo) {
pendingRegions.remove(region.getRegionName()); pendingRegions.remove(region.getRegionName());
}
break; break;
} catch (IOException e) { } catch (IOException e) {
if (tries == numRetries - 1) { if (tries == numRetries - 1) {
@ -2413,6 +2445,7 @@ HMasterRegionInterface {
closed.set(true); closed.set(true);
synchronized(msgQueue) { synchronized(msgQueue) {
msgQueue.clear(); // Empty the queue msgQueue.clear(); // Empty the queue
shutdownQueue.clear(); // Empty shut down queue
msgQueue.notifyAll(); // Wake main thread msgQueue.notifyAll(); // Wake main thread
} }
} }
@ -2467,12 +2500,15 @@ HMasterRegionInterface {
// for the table we want to create already exists, then table already // for the table we want to create already exists, then table already
// created. Throw already-exists exception. // created. Throw already-exists exception.
MetaRegion m = (onlineMetaRegions.size() == 1 ? MetaRegion m = null;
synchronized (onlineMetaRegions) {
m = (onlineMetaRegions.size() == 1 ?
onlineMetaRegions.get(onlineMetaRegions.firstKey()) : onlineMetaRegions.get(onlineMetaRegions.firstKey()) :
(onlineMetaRegions.containsKey(newRegion.getRegionName()) ? (onlineMetaRegions.containsKey(newRegion.getRegionName()) ?
onlineMetaRegions.get(newRegion.getRegionName()) : onlineMetaRegions.get(newRegion.getRegionName()) :
onlineMetaRegions.get(onlineMetaRegions.headMap( onlineMetaRegions.get(onlineMetaRegions.headMap(
newRegion.getTableDesc().getName()).lastKey()))); newRegion.getTableDesc().getName()).lastKey())));
}
Text metaRegionName = m.getRegionName(); Text metaRegionName = m.getRegionName();
HRegionInterface server = connection.getHRegionConnection(m.getServer()); HRegionInterface server = connection.getHRegionConnection(m.getServer());
@ -2522,8 +2558,10 @@ HMasterRegionInterface {
// 5. Get it assigned to a server // 5. Get it assigned to a server
synchronized (serversToServerInfo) {
this.unassignedRegions.put(regionName, info); this.unassignedRegions.put(regionName, info);
this.assignAttempts.put(regionName, Long.valueOf(0L)); this.assignAttempts.put(regionName, Long.valueOf(0L));
}
} finally { } finally {
synchronized (tableInCreation) { synchronized (tableInCreation) {
@ -2601,7 +2639,10 @@ HMasterRegionInterface {
firstMetaRegion = onlineMetaRegions.headMap(tableName).lastKey(); firstMetaRegion = onlineMetaRegions.headMap(tableName).lastKey();
} }
this.metaRegions.addAll(onlineMetaRegions.tailMap(firstMetaRegion).values()); synchronized (onlineMetaRegions) {
this.metaRegions.addAll(onlineMetaRegions.tailMap(
firstMetaRegion).values());
}
} }
void process() throws IOException { void process() throws IOException {
@ -2799,6 +2840,7 @@ HMasterRegionInterface {
LOG.debug("updated columns in row: " + i.getRegionName()); LOG.debug("updated columns in row: " + i.getRegionName());
} }
synchronized (serversToServerInfo) {
if (online) { // Bring offline regions on-line if (online) { // Bring offline regions on-line
if (!unassignedRegions.containsKey(i.getRegionName())) { if (!unassignedRegions.containsKey(i.getRegionName())) {
unassignedRegions.put(i.getRegionName(), i); unassignedRegions.put(i.getRegionName(), i);
@ -2810,6 +2852,7 @@ HMasterRegionInterface {
assignAttempts.remove(i.getRegionName()); assignAttempts.remove(i.getRegionName());
} }
} }
}
// Process regions currently being served // Process regions currently being served
@ -2825,7 +2868,10 @@ HMasterRegionInterface {
// Cause regions being served to be taken off-line and disabled // Cause regions being served to be taken off-line and disabled
HashMap<Text, HRegionInfo> localKillList = killList.get(serverName); HashMap<Text, HRegionInfo> localKillList = null;
synchronized (serversToServerInfo) {
localKillList = killList.get(serverName);
}
if (localKillList == null) { if (localKillList == null) {
localKillList = new HashMap<Text, HRegionInfo>(); localKillList = new HashMap<Text, HRegionInfo>();
} }
@ -2841,9 +2887,11 @@ HMasterRegionInterface {
LOG.debug("inserted local kill list into kill list for server " + LOG.debug("inserted local kill list into kill list for server " +
serverName); serverName);
} }
synchronized (serversToServerInfo) {
killList.put(serverName, localKillList); killList.put(serverName, localKillList);
} }
} }
}
servedRegions.clear(); servedRegions.clear();
} }
@ -2874,9 +2922,11 @@ HMasterRegionInterface {
for (HashSet<HRegionInfo> s: servedRegions.values()) { for (HashSet<HRegionInfo> s: servedRegions.values()) {
for (HRegionInfo i: s) { for (HRegionInfo i: s) {
synchronized (serversToServerInfo) {
regionsToDelete.add(i.getRegionName()); regionsToDelete.add(i.getRegionName());
} }
} }
}
// Unserved regions we can delete now // Unserved regions we can delete now
@ -3005,6 +3055,10 @@ HMasterRegionInterface {
synchronized (serversToServerInfo) { synchronized (serversToServerInfo) {
info = serversToServerInfo.remove(server); info = serversToServerInfo.remove(server);
if (info != null) { if (info != null) {
HServerAddress root = rootRegionLocation.get();
if (root != null && root.equals(info.getServerAddress())) {
unassignRootRegion();
}
String serverName = info.getServerAddress().toString(); String serverName = info.getServerAddress().toString();
HServerLoad load = serversToLoad.remove(serverName); HServerLoad load = serversToLoad.remove(serverName);
if (load != null) { if (load != null) {
@ -3021,9 +3075,9 @@ HMasterRegionInterface {
// NOTE: If the server was serving the root region, we cannot reassign it // NOTE: If the server was serving the root region, we cannot reassign it
// here because the new server will start serving the root region before // here because the new server will start serving the root region before
// the PendingServerShutdown operation has a chance to split the log file. // the ProcessServerShutdown operation has a chance to split the log file.
if (info != null) { if (info != null) {
shutdownQueue.put(new PendingServerShutdown(info)); shutdownQueue.put(new ProcessServerShutdown(info));
} }
} }
} }

View File

@ -108,7 +108,8 @@ class HMerge implements HConstants {
this.dir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)); this.dir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
this.basedir = new Path(dir, "merge_" + System.currentTimeMillis()); this.basedir = new Path(dir, "merge_" + System.currentTimeMillis());
fs.mkdirs(basedir); fs.mkdirs(basedir);
this.hlog = new HLog(fs, new Path(basedir, HREGION_LOGDIR_NAME), conf); this.hlog =
new HLog(fs, new Path(basedir, HREGION_LOGDIR_NAME), conf, null);
} }
void process() throws IOException { void process() throws IOException {
@ -150,11 +151,11 @@ class HMerge implements HConstants {
for(int i = 0; i < regions.length - 1; i++) { for(int i = 0; i < regions.length - 1; i++) {
if(currentRegion == null) { if(currentRegion == null) {
currentRegion = currentRegion =
new HRegion(dir, hlog, fs, conf, regions[i], null); new HRegion(dir, hlog, fs, conf, regions[i], null, null);
currentSize = currentRegion.largestHStore(midKey).getAggregate(); currentSize = currentRegion.largestHStore(midKey).getAggregate();
} }
nextRegion = nextRegion =
new HRegion(dir, hlog, fs, conf, regions[i + 1], null); new HRegion(dir, hlog, fs, conf, regions[i + 1], null, null);
nextSize = nextRegion.largestHStore(midKey).getAggregate(); nextSize = nextRegion.largestHStore(midKey).getAggregate();
@ -327,7 +328,7 @@ class HMerge implements HConstants {
// Scan root region to find all the meta regions // Scan root region to find all the meta regions
HRegion root = HRegion root =
new HRegion(dir, hlog,fs, conf, HRegionInfo.rootRegionInfo, null); new HRegion(dir, hlog,fs, conf, HRegionInfo.rootRegionInfo, null, null);
HInternalScannerInterface rootScanner = HInternalScannerInterface rootScanner =
root.getScanner(META_COLS, new Text(), System.currentTimeMillis(), null); root.getScanner(META_COLS, new Text(), System.currentTimeMillis(), null);
@ -363,7 +364,7 @@ class HMerge implements HConstants {
HRegion newRegion) throws IOException { HRegion newRegion) throws IOException {
HRegion root = HRegion root =
new HRegion(dir, hlog, fs, conf, HRegionInfo.rootRegionInfo, null); new HRegion(dir, hlog, fs, conf, HRegionInfo.rootRegionInfo, null, null);
Text[] regionsToDelete = { Text[] regionsToDelete = {
oldRegion1, oldRegion1,

View File

@ -90,7 +90,6 @@ public class HRegion implements HConstants {
static final Random rand = new Random(); static final Random rand = new Random();
static final Log LOG = LogFactory.getLog(HRegion.class); static final Log LOG = LogFactory.getLog(HRegion.class);
final AtomicBoolean closed = new AtomicBoolean(false); final AtomicBoolean closed = new AtomicBoolean(false);
private volatile long noFlushCount = 0;
/** /**
* Merge two HRegions. They must be available on the current * Merge two HRegions. They must be available on the current
@ -159,7 +158,7 @@ public class HRegion implements HConstants {
// Done // Done
// Construction moves the merge files into place under region. // Construction moves the merge files into place under region.
HRegion dstRegion = new HRegion(rootDir, log, fs, conf, newRegionInfo, HRegion dstRegion = new HRegion(rootDir, log, fs, conf, newRegionInfo,
newRegionDir); newRegionDir, null);
// Get rid of merges directory // Get rid of merges directory
@ -221,9 +220,10 @@ public class HRegion implements HConstants {
volatile WriteState writestate = new WriteState(); volatile WriteState writestate = new WriteState();
final int memcacheFlushSize; final int memcacheFlushSize;
private volatile long lastFlushTime;
final CacheFlushListener flushListener;
final int blockingMemcacheSize; final int blockingMemcacheSize;
protected final long threadWakeFrequency; protected final long threadWakeFrequency;
protected final int optionalFlushCount;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Integer updateLock = new Integer(0); private final Integer updateLock = new Integer(0);
private final long desiredMaxFileSize; private final long desiredMaxFileSize;
@ -251,11 +251,13 @@ public class HRegion implements HConstants {
* @param regionInfo - HRegionInfo that describes the region * @param regionInfo - HRegionInfo that describes the region
* @param initialFiles If there are initial files (implying that the HRegion * @param initialFiles If there are initial files (implying that the HRegion
* is new), then read them from the supplied path. * is new), then read them from the supplied path.
* @param listener an object that implements CacheFlushListener or null
* *
* @throws IOException * @throws IOException
*/ */
public HRegion(Path rootDir, HLog log, FileSystem fs, HBaseConfiguration conf, public HRegion(Path rootDir, HLog log, FileSystem fs, HBaseConfiguration conf,
HRegionInfo regionInfo, Path initialFiles) throws IOException { HRegionInfo regionInfo, Path initialFiles, CacheFlushListener listener)
throws IOException {
this.rootDir = rootDir; this.rootDir = rootDir;
this.log = log; this.log = log;
@ -265,8 +267,6 @@ public class HRegion implements HConstants {
this.encodedRegionName = this.encodedRegionName =
HRegionInfo.encodeRegionName(this.regionInfo.getRegionName()); HRegionInfo.encodeRegionName(this.regionInfo.getRegionName());
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
this.optionalFlushCount =
conf.getInt("hbase.hregion.memcache.optionalflushcount", 10);
// Declare the regionName. This is a unique string for the region, used to // Declare the regionName. This is a unique string for the region, used to
// build a unique filename. // build a unique filename.
@ -314,6 +314,7 @@ public class HRegion implements HConstants {
// By default, we flush the cache when 16M. // By default, we flush the cache when 16M.
this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size", this.memcacheFlushSize = conf.getInt("hbase.hregion.memcache.flush.size",
1024*1024*16); 1024*1024*16);
this.flushListener = listener;
this.blockingMemcacheSize = this.memcacheFlushSize * this.blockingMemcacheSize = this.memcacheFlushSize *
conf.getInt("hbase.hregion.memcache.block.multiplier", 2); conf.getInt("hbase.hregion.memcache.block.multiplier", 2);
@ -323,6 +324,7 @@ public class HRegion implements HConstants {
// HRegion is ready to go! // HRegion is ready to go!
this.writestate.compacting = false; this.writestate.compacting = false;
this.lastFlushTime = System.currentTimeMillis();
LOG.info("region " + this.regionInfo.getRegionName() + " available"); LOG.info("region " + this.regionInfo.getRegionName() + " available");
} }
@ -485,6 +487,11 @@ public class HRegion implements HConstants {
return this.fs; return this.fs;
} }
/** @return the last time the region was flushed */
public long getLastFlushTime() {
return this.lastFlushTime;
}
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// HRegion maintenance. // HRegion maintenance.
// //
@ -598,8 +605,10 @@ public class HRegion implements HConstants {
// Done! // Done!
// Opening the region copies the splits files from the splits directory // Opening the region copies the splits files from the splits directory
// under each region. // under each region.
HRegion regionA = new HRegion(rootDir, log, fs, conf, regionAInfo, dirA); HRegion regionA =
HRegion regionB = new HRegion(rootDir, log, fs, conf, regionBInfo, dirB); new HRegion(rootDir, log, fs, conf, regionAInfo, dirA, null);
HRegion regionB =
new HRegion(rootDir, log, fs, conf, regionBInfo, dirB, null);
// Cleanup // Cleanup
boolean deleted = fs.delete(splits); // Get rid of splits directory boolean deleted = fs.delete(splits); // Get rid of splits directory
@ -751,54 +760,30 @@ public class HRegion implements HConstants {
} }
/** /**
* Flush the cache if necessary. This is called periodically to minimize the * Flush the cache.
* amount of log processing needed upon startup.
* *
* <p>The returned Vector is a list of all the files used by the component * When this method is called the cache will be flushed unless:
* HStores. It is a list of HStoreFile objects. If the returned value is * <ol>
* NULL, then the flush could not be executed, because the HRegion is busy * <li>the cache is empty</li>
* doing something else storage-intensive. The caller should check back * <li>the region is closed.</li>
* later. * <li>a flush is already in progress</li>
* <li>writes are disabled</li>
* </ol>
* *
* <p>This method may block for some time, so it should not be called from a * <p>This method may block for some time, so it should not be called from a
* time-sensitive thread. * time-sensitive thread.
* *
* @param disableFutureWrites indicates that the caller intends to * @return true if cache was flushed
* close() the HRegion shortly, so the HRegion should not take on any new and *
* potentially long-lasting disk operations. This flush() should be the final
* pre-close() disk operation.
* @throws IOException * @throws IOException
* @throws DroppedSnapshotException Thrown when replay of hlog is required * @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted. * because a Snapshot was not properly persisted.
*/ */
void flushcache() throws IOException { boolean flushcache() throws IOException {
lock.readLock().lock(); // Prevent splits and closes lock.readLock().lock(); // Prevent splits and closes
try { try {
if (this.closed.get()) { if (this.closed.get()) {
return; return false;
}
boolean needFlush = false;
long memcacheSize = this.memcacheSize.get();
if(memcacheSize > this.memcacheFlushSize) {
needFlush = true;
} else if (memcacheSize > 0) {
if (this.noFlushCount >= this.optionalFlushCount) {
LOG.info("Optional flush called " + this.noFlushCount +
" times when data present without flushing. Forcing one.");
needFlush = true;
} else {
// Only increment if something in the cache.
// Gets zero'd when a flushcache is called.
this.noFlushCount++;
}
}
if (!needFlush) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cache flush not needed for region " +
regionInfo.getRegionName() + ". Cache size=" + memcacheSize +
", cache flush threshold=" + this.memcacheFlushSize);
}
return;
} }
synchronized (writestate) { synchronized (writestate) {
if ((!writestate.flushing) && writestate.writesEnabled) { if ((!writestate.flushing) && writestate.writesEnabled) {
@ -811,16 +796,15 @@ public class HRegion implements HConstants {
writestate.flushing + ", writesEnabled=" + writestate.flushing + ", writesEnabled=" +
writestate.writesEnabled); writestate.writesEnabled);
} }
return; return false;
} }
} }
this.noFlushCount = 0;
long startTime = -1; long startTime = -1;
synchronized (updateLock) {// Stop updates while we snapshot the memcaches synchronized (updateLock) {// Stop updates while we snapshot the memcaches
startTime = snapshotMemcaches(); startTime = snapshotMemcaches();
} }
try { try {
internalFlushcache(startTime); return internalFlushcache(startTime);
} finally { } finally {
synchronized (writestate) { synchronized (writestate) {
writestate.flushing = false; writestate.flushing = false;
@ -835,7 +819,7 @@ public class HRegion implements HConstants {
/* /*
* It is assumed that updates are blocked for the duration of this method * It is assumed that updates are blocked for the duration of this method
*/ */
long snapshotMemcaches() { private long snapshotMemcaches() {
if (this.memcacheSize.get() == 0) { if (this.memcacheSize.get() == 0) {
return -1; return -1;
} }
@ -883,17 +867,24 @@ public class HRegion implements HConstants {
* routes. * routes.
* *
* <p> This method may block for some time. * <p> This method may block for some time.
*
* @param startTime the time the cache was snapshotted or -1 if a flush is
* not needed
*
* @return true if the cache was flushed
*
* @throws IOException * @throws IOException
* @throws DroppedSnapshotException Thrown when replay of hlog is required * @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted. * because a Snapshot was not properly persisted.
*/ */
void internalFlushcache(long startTime) throws IOException { private boolean internalFlushcache(long startTime) throws IOException {
if (startTime == -1) { if (startTime == -1) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Not flushing cache: snapshotMemcaches() determined that " + LOG.debug("Not flushing cache for region " +
"there was nothing to do"); regionInfo.getRegionName() +
": snapshotMemcaches() determined that there was nothing to do");
} }
return; return false;
} }
// We pass the log to the HMemcache, so we can lock down both // We pass the log to the HMemcache, so we can lock down both
@ -914,7 +905,6 @@ public class HRegion implements HConstants {
// Otherwise, the snapshot content while backed up in the hlog, it will not // Otherwise, the snapshot content while backed up in the hlog, it will not
// be part of the current running servers state. // be part of the current running servers state.
long logCacheFlushId = sequenceId;
try { try {
// A. Flush memcache to all the HStores. // A. Flush memcache to all the HStores.
// Keep running vector of all store files that includes both old and the // Keep running vector of all store files that includes both old and the
@ -938,7 +928,7 @@ public class HRegion implements HConstants {
// and that all updates to the log for this regionName that have lower // and that all updates to the log for this regionName that have lower
// log-sequence-ids can be safely ignored. // log-sequence-ids can be safely ignored.
this.log.completeCacheFlush(this.regionInfo.getRegionName(), this.log.completeCacheFlush(this.regionInfo.getRegionName(),
regionInfo.getTableDesc().getName(), logCacheFlushId); regionInfo.getTableDesc().getName(), sequenceId);
// D. Finally notify anyone waiting on memcache to clear: // D. Finally notify anyone waiting on memcache to clear:
// e.g. checkResources(). // e.g. checkResources().
@ -948,8 +938,10 @@ public class HRegion implements HConstants {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Finished memcache flush for region " + LOG.debug("Finished memcache flush for region " +
this.regionInfo.getRegionName() + " in " + this.regionInfo.getRegionName() + " in " +
(System.currentTimeMillis() - startTime) + "ms"); (System.currentTimeMillis() - startTime) + "ms, sequenceid=" +
sequenceId);
} }
return true;
} }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -1309,13 +1301,18 @@ public class HRegion implements HConstants {
this.log.append(regionInfo.getRegionName(), this.log.append(regionInfo.getRegionName(),
regionInfo.getTableDesc().getName(), updatesByColumn); regionInfo.getTableDesc().getName(), updatesByColumn);
long memcacheSize = 0;
for (Map.Entry<HStoreKey, byte[]> e: updatesByColumn.entrySet()) { for (Map.Entry<HStoreKey, byte[]> e: updatesByColumn.entrySet()) {
HStoreKey key = e.getKey(); HStoreKey key = e.getKey();
byte[] val = e.getValue(); byte[] val = e.getValue();
this.memcacheSize.addAndGet(key.getSize() + memcacheSize = this.memcacheSize.addAndGet(key.getSize() +
(val == null ? 0 : val.length)); (val == null ? 0 : val.length));
stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val); stores.get(HStoreKey.extractFamily(key.getColumn())).add(key, val);
} }
if (this.flushListener != null && memcacheSize > this.memcacheFlushSize) {
// Request a cache flush
this.flushListener.flushRequested(this);
}
} }
} }
@ -1582,8 +1579,8 @@ public class HRegion implements HConstants {
FileSystem fs = FileSystem.get(conf); FileSystem fs = FileSystem.get(conf);
fs.mkdirs(regionDir); fs.mkdirs(regionDir);
return new HRegion(rootDir, return new HRegion(rootDir,
new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf), new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf, null),
fs, conf, info, initialFiles); fs, conf, info, initialFiles, null);
} }
/** /**

View File

@ -78,6 +78,17 @@ public class HRegionInfo implements WritableComparable {
private boolean split; private boolean split;
private Text startKey; private Text startKey;
private HTableDescriptor tableDesc; private HTableDescriptor tableDesc;
private int hashCode;
private void setHashCode() {
int result = this.regionName.hashCode();
result ^= Long.valueOf(this.regionId).hashCode();
result ^= this.startKey.hashCode();
result ^= this.endKey.hashCode();
result ^= Boolean.valueOf(this.offLine).hashCode();
result ^= this.tableDesc.hashCode();
this.hashCode = result;
}
/** Used to construct the HRegionInfo for the root and first meta regions */ /** Used to construct the HRegionInfo for the root and first meta regions */
private HRegionInfo(long regionId, HTableDescriptor tableDesc) { private HRegionInfo(long regionId, HTableDescriptor tableDesc) {
@ -89,6 +100,7 @@ public class HRegionInfo implements WritableComparable {
DELIMITER + regionId); DELIMITER + regionId);
this.split = false; this.split = false;
this.startKey = new Text(); this.startKey = new Text();
setHashCode();
} }
/** Default constructor - creates empty object */ /** Default constructor - creates empty object */
@ -100,6 +112,7 @@ public class HRegionInfo implements WritableComparable {
this.split = false; this.split = false;
this.startKey = new Text(); this.startKey = new Text();
this.tableDesc = new HTableDescriptor(); this.tableDesc = new HTableDescriptor();
this.hashCode = 0;
} }
/** /**
@ -152,6 +165,7 @@ public class HRegionInfo implements WritableComparable {
} }
this.tableDesc = tableDesc; this.tableDesc = tableDesc;
setHashCode();
} }
/** @return the endKey */ /** @return the endKey */
@ -232,13 +246,7 @@ public class HRegionInfo implements WritableComparable {
*/ */
@Override @Override
public int hashCode() { public int hashCode() {
int result = this.regionName.hashCode(); return this.hashCode;
result ^= Long.valueOf(this.regionId).hashCode();
result ^= this.startKey.hashCode();
result ^= this.endKey.hashCode();
result ^= Boolean.valueOf(this.offLine).hashCode();
result ^= this.tableDesc.hashCode();
return result;
} }
// //
@ -256,6 +264,7 @@ public class HRegionInfo implements WritableComparable {
out.writeBoolean(split); out.writeBoolean(split);
startKey.write(out); startKey.write(out);
tableDesc.write(out); tableDesc.write(out);
out.writeInt(hashCode);
} }
/** /**
@ -269,6 +278,7 @@ public class HRegionInfo implements WritableComparable {
this.split = in.readBoolean(); this.split = in.readBoolean();
this.startKey.readFields(in); this.startKey.readFields(in);
this.tableDesc.readFields(in); this.tableDesc.readFields(in);
this.hashCode = in.readInt();
} }
// //

View File

@ -26,15 +26,19 @@ import java.net.InetSocketAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.HashSet;
import java.util.ListIterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.Vector; import java.util.Vector;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -125,27 +129,79 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
/** region server process name */ /** region server process name */
public static final String REGIONSERVER = "regionserver"; public static final String REGIONSERVER = "regionserver";
/** Queue entry passed to flusher, compactor and splitter threads */
class QueueEntry implements Delayed {
private final HRegion region;
private long expirationTime;
QueueEntry(HRegion region, long expirationTime) {
this.region = region;
this.expirationTime = expirationTime;
}
/** {@inheritDoc} */
@Override
public boolean equals(Object o) {
QueueEntry other = (QueueEntry) o;
return this.hashCode() == other.hashCode();
}
/** {@inheritDoc} */
@Override
public int hashCode() {
return this.region.getRegionInfo().hashCode();
}
/** {@inheritDoc} */
public long getDelay(TimeUnit unit) {
return unit.convert(this.expirationTime - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
}
/** {@inheritDoc} */
public int compareTo(Delayed o) {
long delta = this.getDelay(TimeUnit.MILLISECONDS) -
o.getDelay(TimeUnit.MILLISECONDS);
int value = 0;
if (delta > 0) {
value = 1;
} else if (delta < 0) {
value = -1;
}
return value;
}
/** @return the region */
public HRegion getRegion() {
return region;
}
/** @param expirationTime the expirationTime to set */
public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}
// Check to see if regions should be split // Check to see if regions should be split
private final Thread splitOrCompactCheckerThread; final Splitter splitter;
// Needed at shutdown. On way out, if can get this lock then we are not in // Needed at shutdown. On way out, if can get this lock then we are not in
// middle of a split or compaction: i.e. splits/compactions cannot be // middle of a split or compaction: i.e. splits/compactions cannot be
// interrupted. // interrupted.
protected final Integer splitOrCompactLock = new Integer(0); final Integer splitterLock = new Integer(0);
/** Split regions on request */
class Splitter extends Thread implements RegionUnavailableListener {
private final BlockingQueue<QueueEntry> splitQueue =
new LinkedBlockingQueue<QueueEntry>();
/*
* Runs periodically to determine if regions need to be compacted or split
*/
class SplitOrCompactChecker extends Chore
implements RegionUnavailableListener {
private HTable root = null; private HTable root = null;
private HTable meta = null; private HTable meta = null;
/** /** constructor */
* @param stop public Splitter() {
*/ super();
public SplitOrCompactChecker(final AtomicBoolean stop) {
super(conf.getInt("hbase.regionserver.thread.splitcompactcheckfrequency",
30 * 1000), stop);
} }
/** {@inheritDoc} */ /** {@inheritDoc} */
@ -178,36 +234,51 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
} }
/** /**
* Scan for splits or compactions to run. Run any we find. * Perform region splits if necessary
*/ */
@Override @Override
protected void chore() { public void run() {
// Don't interrupt us while we're working while (!stopRequested.get()) {
synchronized (splitOrCompactLock) { QueueEntry e = null;
checkForSplitsOrCompactions();
}
}
private void checkForSplitsOrCompactions() {
// Grab a list of regions to check
List<HRegion> nonClosedRegionsToCheck = getRegionsToCheck();
for(HRegion cur: nonClosedRegionsToCheck) {
try { try {
if (cur.compactIfNeeded()) { e = splitQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
// After compaction, it probably needs splitting. May also need
// splitting just because one of the memcache flushes was big. } catch (InterruptedException ex) {
split(cur); continue;
}
if (e == null) {
continue;
}
synchronized (splitterLock) { // Don't interrupt us while we're working
try {
split(e.getRegion());
} catch (IOException ex) {
LOG.error("Split failed for region " +
e.getRegion().getRegionName(),
RemoteExceptionHandler.checkIOException(ex));
if (!checkFileSystem()) {
break;
} }
} catch(IOException e) { } catch (Exception ex) {
//TODO: What happens if this fails? Are we toast? LOG.error("Split failed on region " +
LOG.error("Split or compaction failed", e); e.getRegion().getRegionName(), ex);
if (!checkFileSystem()) { if (!checkFileSystem()) {
break; break;
} }
} }
} }
} }
LOG.info(getName() + " exiting");
}
/**
* @param e entry indicating which region needs to be split
*/
public void splitRequested(QueueEntry e) {
splitQueue.add(e);
}
private void split(final HRegion region) throws IOException { private void split(final HRegion region) throws IOException {
final HRegionInfo oldRegionInfo = region.getRegionInfo(); final HRegionInfo oldRegionInfo = region.getRegionInfo();
@ -271,54 +342,181 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
} }
} }
// Compactions
final Compactor compactor;
// Needed during shutdown so we send an interrupt after completion of a
// compaction, not in the midst.
final Integer compactionLock = new Integer(0);
/** Compact region on request */
class Compactor extends Thread {
private final BlockingQueue<QueueEntry> compactionQueue =
new LinkedBlockingQueue<QueueEntry>();
/** constructor */
public Compactor() {
super();
}
/** {@inheritDoc} */
@Override
public void run() {
while (!stopRequested.get()) {
QueueEntry e = null;
try {
e = compactionQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
continue;
}
if (e == null) {
continue;
}
try {
if (e.getRegion().compactIfNeeded()) {
splitter.splitRequested(e);
}
} catch (IOException ex) {
LOG.error("Compaction failed for region " +
e.getRegion().getRegionName(),
RemoteExceptionHandler.checkIOException(ex));
if (!checkFileSystem()) {
break;
}
} catch (Exception ex) {
LOG.error("Compaction failed for region " +
e.getRegion().getRegionName(), ex);
if (!checkFileSystem()) {
break;
}
}
}
LOG.info(getName() + " exiting");
}
/**
* @param e QueueEntry for region to be compacted
*/
public void compactionRequested(QueueEntry e) {
compactionQueue.add(e);
}
}
// Cache flushing // Cache flushing
private final Thread cacheFlusherThread; final Flusher cacheFlusher;
// Needed during shutdown so we send an interrupt after completion of a // Needed during shutdown so we send an interrupt after completion of a
// flush, not in the midst. // flush, not in the midst.
protected final Integer cacheFlusherLock = new Integer(0); final Integer cacheFlusherLock = new Integer(0);
/** Flush cache upon request */
class Flusher extends Thread implements CacheFlushListener {
private final DelayQueue<QueueEntry> flushQueue =
new DelayQueue<QueueEntry>();
private final long optionalFlushPeriod;
/** constructor */
public Flusher() {
super();
this.optionalFlushPeriod = conf.getLong(
"hbase.regionserver.optionalcacheflushinterval", 60L * 1000L);
/* Runs periodically to flush memcache.
*/
class Flusher extends Chore {
/**
* @param period
* @param stop
*/
public Flusher(final int period, final AtomicBoolean stop) {
super(period, stop);
} }
/** {@inheritDoc} */
@Override @Override
protected void chore() { public void run() {
synchronized(cacheFlusherLock) { while (!stopRequested.get()) {
checkForFlushesToRun(); QueueEntry e = null;
try {
e = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
continue;
} catch (ConcurrentModificationException ex) {
continue;
} }
synchronized(cacheFlusherLock) { // Don't interrupt while we're working
if (e != null) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("flushing region " + e.getRegion().getRegionName());
}
if (e.getRegion().flushcache()) {
compactor.compactionRequested(e);
} }
private void checkForFlushesToRun() { } catch (DroppedSnapshotException ex) {
// Grab a list of items to flush
List<HRegion> nonClosedRegionsToFlush = getRegionsToCheck();
// Flush them, if necessary
for(HRegion cur: nonClosedRegionsToFlush) {
try {
cur.flushcache();
} catch (DroppedSnapshotException e) {
// Cache flush can fail in a few places. If it fails in a critical // Cache flush can fail in a few places. If it fails in a critical
// section, we get a DroppedSnapshotException and a replay of hlog // section, we get a DroppedSnapshotException and a replay of hlog
// is required. Currently the only way to do this is a restart of // is required. Currently the only way to do this is a restart of
// the server. // the server.
LOG.fatal("Replay of hlog required. Forcing server restart", e); LOG.fatal("Replay of hlog required. Forcing server restart", ex);
if (!checkFileSystem()) { if (!checkFileSystem()) {
break; break;
} }
HRegionServer.this.stop(); HRegionServer.this.stop();
} catch (IOException iex) {
LOG.error("Cache flush failed", } catch (IOException ex) {
RemoteExceptionHandler.checkIOException(iex)); LOG.error("Cache flush failed for region " +
e.getRegion().getRegionName(),
RemoteExceptionHandler.checkIOException(ex));
if (!checkFileSystem()) {
break;
}
} catch (Exception ex) {
LOG.error("Cache flush failed for region " +
e.getRegion().getRegionName(), ex);
if (!checkFileSystem()) { if (!checkFileSystem()) {
break; break;
} }
} }
e.setExpirationTime(System.currentTimeMillis() +
optionalFlushPeriod);
flushQueue.add(e);
}
// Now insure that all the active regions are in the queue
Set<HRegion> regions = getRegionsToCheck();
for (HRegion r: regions) {
e = new QueueEntry(r, r.getLastFlushTime() + optionalFlushPeriod);
synchronized (flushQueue) {
if (!flushQueue.contains(e)) {
flushQueue.add(e);
}
}
}
// Now make sure that the queue only contains active regions
synchronized (flushQueue) {
for (Iterator<QueueEntry> i = flushQueue.iterator(); i.hasNext(); ) {
e = i.next();
if (!regions.contains(e.getRegion())) {
i.remove();
}
}
}
}
}
flushQueue.clear();
LOG.info(getName() + " exiting");
}
/** {@inheritDoc} */
public void flushRequested(HRegion region) {
QueueEntry e = new QueueEntry(region, System.currentTimeMillis());
synchronized (flushQueue) {
if (flushQueue.contains(e)) {
flushQueue.remove(e);
}
flushQueue.add(e);
} }
} }
} }
@ -326,44 +524,57 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// HLog and HLog roller. log is protected rather than private to avoid // HLog and HLog roller. log is protected rather than private to avoid
// eclipse warning when accessed by inner classes // eclipse warning when accessed by inner classes
protected HLog log; protected HLog log;
private final Thread logRollerThread; final LogRoller logRoller;
protected final Integer logRollerLock = new Integer(0); final Integer logRollerLock = new Integer(0);
/** Runs periodically to determine if the HLog should be rolled */ /** Runs periodically to determine if the HLog should be rolled */
class LogRoller extends Chore { class LogRoller extends Thread implements LogRollListener {
private int MAXLOGENTRIES = private volatile boolean rollLog;
conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000);
/** /** constructor */
* @param period public LogRoller() {
* @param stop super();
*/ this.rollLog = false;
public LogRoller(final int period, final AtomicBoolean stop) {
super(period, stop);
} }
/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
protected void chore() { public synchronized void run() {
synchronized(logRollerLock) { while (!stopRequested.get()) {
checkForLogRoll(); try {
this.wait(threadWakeFrequency);
} catch (InterruptedException e) {
continue;
}
if (!rollLog) {
continue;
}
synchronized (logRollerLock) {
try {
LOG.info("Rolling hlog. Number of entries: " + log.getNumEntries());
log.rollWriter();
} catch (IOException ex) {
LOG.error("Log rolling failed",
RemoteExceptionHandler.checkIOException(ex));
checkFileSystem();
} catch (Exception ex) {
LOG.error("Log rolling failed", ex);
checkFileSystem();
} finally {
rollLog = false;
}
}
} }
} }
private void checkForLogRoll() { /** {@inheritDoc} */
// If the number of log entries is high enough, roll the log. This public synchronized void logRollRequested() {
// is a very fast operation, but should not be done too frequently. rollLog = true;
int nEntries = log.getNumEntries(); this.notifyAll();
if(nEntries > this.MAXLOGENTRIES) {
try {
LOG.info("Rolling hlog. Number of entries: " + nEntries);
log.rollWriter();
} catch (IOException iex) {
LOG.error("Log rolling failed",
RemoteExceptionHandler.checkIOException(iex));
checkFileSystem();
}
}
} }
} }
@ -396,20 +607,22 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
this.serverLeaseTimeout = this.serverLeaseTimeout =
conf.getInt("hbase.master.lease.period", 30 * 1000); conf.getInt("hbase.master.lease.period", 30 * 1000);
// Cache flushing chore thread. // Cache flushing thread.
this.cacheFlusherThread = this.cacheFlusher = new Flusher();
new Flusher(this.threadWakeFrequency, stopRequested);
// Check regions to see if they need to be split or compacted chore thread // Compaction thread
this.splitOrCompactCheckerThread = this.compactor = new Compactor();
new SplitOrCompactChecker(this.stopRequested);
// Region split thread
this.splitter = new Splitter();
// Log rolling thread
this.logRoller = new LogRoller();
// Task thread to process requests from Master // Task thread to process requests from Master
this.worker = new Worker(); this.worker = new Worker();
this.workerThread = new Thread(worker); this.workerThread = new Thread(worker);
this.sleeper = new Sleeper(this.msgInterval, this.stopRequested); this.sleeper = new Sleeper(this.msgInterval, this.stopRequested);
this.logRollerThread =
new LogRoller(this.threadWakeFrequency, stopRequested);
// Server to handle client requests // Server to handle client requests
this.server = RPC.getServer(this, address.getBindAddress(), this.server = RPC.getServer(this, address.getBindAddress(),
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
@ -557,14 +770,17 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// Send interrupts to wake up threads if sleeping so they notice shutdown. // Send interrupts to wake up threads if sleeping so they notice shutdown.
// TODO: Should we check they are alive? If OOME could have exited already // TODO: Should we check they are alive? If OOME could have exited already
synchronized(logRollerLock) {
this.logRollerThread.interrupt();
}
synchronized(cacheFlusherLock) { synchronized(cacheFlusherLock) {
this.cacheFlusherThread.interrupt(); this.cacheFlusher.interrupt();
} }
synchronized(splitOrCompactLock) { synchronized (compactionLock) {
this.splitOrCompactCheckerThread.interrupt(); this.compactor.interrupt();
}
synchronized (splitterLock) {
this.splitter.interrupt();
}
synchronized (logRollerLock) {
this.logRoller.interrupt();
} }
if (abortRequested) { if (abortRequested) {
@ -657,7 +873,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
"running at " + this.serverInfo.getServerAddress().toString() + "running at " + this.serverInfo.getServerAddress().toString() +
" because logdir " + logdir.toString() + " exists"); " because logdir " + logdir.toString() + " exists");
} }
return new HLog(fs, logdir, conf); return new HLog(fs, logdir, conf, logRoller);
} }
/* /*
@ -680,16 +896,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
LOG.fatal("Set stop flag in " + t.getName(), e); LOG.fatal("Set stop flag in " + t.getName(), e);
} }
}; };
Threads.setDaemonThreadRunning(this.cacheFlusherThread, n + ".cacheFlusher", Threads.setDaemonThreadRunning(this.logRoller, n + ".logRoller",
handler); handler);
Threads.setDaemonThreadRunning(this.splitOrCompactCheckerThread, Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
n + ".splitOrCompactChecker", handler);
Threads.setDaemonThreadRunning(this.logRollerThread, n + ".logRoller",
handler); handler);
// Worker is not the same as the above threads in that it does not Threads.setDaemonThreadRunning(this.compactor, n + ".compactor",
// inherit from Chore. Set an UncaughtExceptionHandler on it in case its handler);
// the one to see an OOME, etc., first. The handler will set the stop Threads.setDaemonThreadRunning(this.splitter, n + ".splitter", handler);
// flag.
Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler); Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
// Leases is not a Thread. Internally it runs a daemon thread. If it gets // Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit. // an unhandled exception, it will just exit.
@ -752,9 +965,10 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
*/ */
void join() { void join() {
join(this.workerThread); join(this.workerThread);
join(this.logRollerThread); join(this.logRoller);
join(this.cacheFlusherThread); join(this.cacheFlusher);
join(this.splitOrCompactCheckerThread); join(this.compactor);
join(this.splitter);
} }
private void join(final Thread t) { private void join(final Thread t) {
@ -925,7 +1139,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
HRegion region = onlineRegions.get(regionInfo.getRegionName()); HRegion region = onlineRegions.get(regionInfo.getRegionName());
if(region == null) { if(region == null) {
region = new HRegion(new Path(this.conf.get(HConstants.HBASE_DIR)), region = new HRegion(new Path(this.conf.get(HConstants.HBASE_DIR)),
this.log, FileSystem.get(conf), conf, regionInfo, null); this.log, FileSystem.get(conf), conf, regionInfo, null,
this.cacheFlusher);
this.lock.writeLock().lock(); this.lock.writeLock().lock();
try { try {
this.log.setSequenceNumber(region.getMinSequenceId()); this.log.setSequenceNumber(region.getMinSequenceId());
@ -1227,6 +1442,11 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
return this.requestCount; return this.requestCount;
} }
/** @return reference to CacheFlushListener */
public CacheFlushListener getCacheFlushListener() {
return this.cacheFlusher;
}
/** /**
* Protected utility method for safely obtaining an HRegion handle. * Protected utility method for safely obtaining an HRegion handle.
* @param regionName Name of online {@link HRegion} to return * @param regionName Name of online {@link HRegion} to return
@ -1318,8 +1538,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* @return Returns list of non-closed regions hosted on this server. If no * @return Returns list of non-closed regions hosted on this server. If no
* regions to check, returns an empty list. * regions to check, returns an empty list.
*/ */
protected List<HRegion> getRegionsToCheck() { protected Set<HRegion> getRegionsToCheck() {
ArrayList<HRegion> regionsToCheck = new ArrayList<HRegion>(); HashSet<HRegion> regionsToCheck = new HashSet<HRegion>();
//TODO: is this locking necessary? //TODO: is this locking necessary?
lock.readLock().lock(); lock.readLock().lock();
try { try {
@ -1328,8 +1548,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
lock.readLock().unlock(); lock.readLock().unlock();
} }
// Purge closed regions. // Purge closed regions.
for (final ListIterator<HRegion> i = regionsToCheck.listIterator(); for (final Iterator<HRegion> i = regionsToCheck.iterator(); i.hasNext();) {
i.hasNext();) {
HRegion r = i.next(); HRegion r = i.next();
if (r.isClosed()) { if (r.isClosed()) {
i.remove(); i.remove();

View File

@ -0,0 +1,29 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
/**
* Mechanism by which the HLog requests a log roll
*/
public interface LogRollListener {
/** Request that the log be rolled */
public void logRollRequested();
}

View File

@ -31,6 +31,10 @@ public class Sleeper {
private final int period; private final int period;
private AtomicBoolean stop; private AtomicBoolean stop;
/**
* @param sleep
* @param stop
*/
public Sleeper(final int sleep, final AtomicBoolean stop) { public Sleeper(final int sleep, final AtomicBoolean stop) {
this.period = sleep; this.period = sleep;
this.stop = stop; this.stop = stop;
@ -40,7 +44,7 @@ public class Sleeper {
* Sleep for period. * Sleep for period.
*/ */
public void sleep() { public void sleep() {
sleep(System.currentTimeMillis()); sleep(period);
} }
/** /**

View File

@ -103,8 +103,17 @@
the master will notice a dead region server sooner. The default is 15 seconds. the master will notice a dead region server sooner. The default is 15 seconds.
</description> </description>
</property> </property>
<property>
<name>hbase.regionserver.optionalcacheflushinterval</name>
<value>10000</value>
<description>
Amount of time to wait since the last time a region was flushed before
invoking an optional cache flush. Default 60,000.
</description>
</property>
<property> <property>
<name>hbase.rootdir</name> <name>hbase.rootdir</name>
<value>/hbase</value> <value>/hbase</value>
<description>location of HBase instance in dfs</description></property> <description>location of HBase instance in dfs</description>
</property>
</configuration> </configuration>

View File

@ -123,8 +123,8 @@ public abstract class HBaseTestCase extends TestCase {
FileSystem fs = dir.getFileSystem(c); FileSystem fs = dir.getFileSystem(c);
fs.mkdirs(regionDir); fs.mkdirs(regionDir);
return new HRegion(dir, return new HRegion(dir,
new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf), new HLog(fs, new Path(regionDir, HConstants.HREGION_LOGDIR_NAME), conf,
fs, conf, info, null); null), fs, conf, info, null, null);
} }
protected HTableDescriptor createTableDescriptor(final String name) { protected HTableDescriptor createTableDescriptor(final String name) {
@ -365,7 +365,7 @@ public abstract class HBaseTestCase extends TestCase {
return region.getFull(row); return region.getFull(row);
} }
public void flushcache() throws IOException { public void flushcache() throws IOException {
this.region.internalFlushcache(this.region.snapshotMemcaches()); this.region.flushcache();
} }
} }

View File

@ -257,7 +257,7 @@ public class MiniHBaseCluster implements HConstants {
for (LocalHBaseCluster.RegionServerThread t: for (LocalHBaseCluster.RegionServerThread t:
this.hbaseCluster.getRegionServers()) { this.hbaseCluster.getRegionServers()) {
for(HRegion r: t.getRegionServer().onlineRegions.values() ) { for(HRegion r: t.getRegionServer().onlineRegions.values() ) {
r.internalFlushcache(r.snapshotMemcaches()); r.flushcache();
} }
} }
} }

View File

@ -103,8 +103,10 @@ public class MultiRegionTable extends HBaseTestCase {
} }
} }
// Flush will provoke a split next time the split-checker thread runs. // Flush the cache
r.internalFlushcache(r.snapshotMemcaches());
cluster.getRegionThreads().get(0).getRegionServer().getCacheFlushListener().
flushRequested(r);
// Now, wait until split makes it into the meta table. // Now, wait until split makes it into the meta table.
int oldCount = count; int oldCount = count;

View File

@ -54,10 +54,11 @@ public class TestCompaction extends HBaseTestCase {
@Override @Override
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
this.hlog = new HLog(this.localFs, this.testDir, this.conf); this.hlog = new HLog(this.localFs, this.testDir, this.conf, null);
HTableDescriptor htd = createTableDescriptor(getName()); HTableDescriptor htd = createTableDescriptor(getName());
HRegionInfo hri = new HRegionInfo(htd, null, null); HRegionInfo hri = new HRegionInfo(htd, null, null);
this.r = new HRegion(testDir, hlog, this.localFs, this.conf, hri, null); this.r =
new HRegion(testDir, hlog, this.localFs, this.conf, hri, null, null);
} }
/** {@inheritDoc} */ /** {@inheritDoc} */

View File

@ -93,9 +93,9 @@ public class TestGet extends HBaseTestCase {
HRegionInfo.encodeRegionName(info.getRegionName())); HRegionInfo.encodeRegionName(info.getRegionName()));
fs.mkdirs(regionDir); fs.mkdirs(regionDir);
HLog log = new HLog(fs, new Path(regionDir, "log"), conf); HLog log = new HLog(fs, new Path(regionDir, "log"), conf, null);
HRegion region = new HRegion(dir, log, fs, conf, info, null); HRegion region = new HRegion(dir, log, fs, conf, info, null, null);
HRegionIncommon r = new HRegionIncommon(region); HRegionIncommon r = new HRegionIncommon(region);
// Write information to the table // Write information to the table
@ -135,7 +135,7 @@ public class TestGet extends HBaseTestCase {
region.close(); region.close();
log.rollWriter(); log.rollWriter();
region = new HRegion(dir, log, fs, conf, info, null); region = new HRegion(dir, log, fs, conf, info, null, null);
r = new HRegionIncommon(region); r = new HRegionIncommon(region);
// Read it back // Read it back
@ -164,7 +164,7 @@ public class TestGet extends HBaseTestCase {
region.close(); region.close();
log.rollWriter(); log.rollWriter();
region = new HRegion(dir, log, fs, conf, info, null); region = new HRegion(dir, log, fs, conf, info, null, null);
r = new HRegionIncommon(region); r = new HRegionIncommon(region);
// Read it back // Read it back

View File

@ -45,6 +45,10 @@ public class TestHBaseCluster extends HBaseClusterTestCase {
this.table = null; this.table = null;
Logger.getRootLogger().setLevel(Level.INFO); Logger.getRootLogger().setLevel(Level.INFO);
// Make the thread wake frequency a little slower so other threads
// can run
conf.setInt("hbase.server.thread.wakefrequency", 2000);
// Make lease timeout longer, lease checks less frequent // Make lease timeout longer, lease checks less frequent
conf.setInt("hbase.master.lease.period", 10 * 1000); conf.setInt("hbase.master.lease.period", 10 * 1000);
conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000); conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
@ -112,7 +116,7 @@ public class TestHBaseCluster extends HBaseClusterTestCase {
Text rowlabel = new Text("row_" + k); Text rowlabel = new Text("row_" + k);
byte bodydata[] = table.get(rowlabel, CONTENTS_BASIC); byte bodydata[] = table.get(rowlabel, CONTENTS_BASIC);
assertNotNull(bodydata); assertNotNull("no data for row " + rowlabel, bodydata);
String bodystr = new String(bodydata, HConstants.UTF8_ENCODING).trim(); String bodystr = new String(bodydata, HConstants.UTF8_ENCODING).trim();
String teststr = CONTENTSTR + k; String teststr = CONTENTSTR + k;
assertEquals("Incorrect value for key: (" + rowlabel + "," + CONTENTS_BASIC assertEquals("Incorrect value for key: (" + rowlabel + "," + CONTENTS_BASIC

View File

@ -45,7 +45,7 @@ public class TestHLog extends HBaseTestCase implements HConstants {
final Text tableName = new Text("tablename"); final Text tableName = new Text("tablename");
final Text row = new Text("row"); final Text row = new Text("row");
Reader reader = null; Reader reader = null;
HLog log = new HLog(fs, dir, this.conf); HLog log = new HLog(fs, dir, this.conf, null);
try { try {
// Write columns named 1, 2, 3, etc. and then values of single byte // Write columns named 1, 2, 3, etc. and then values of single byte
// 1, 2, 3... // 1, 2, 3...

View File

@ -98,12 +98,12 @@ public class TestHRegion extends HBaseTestCase implements RegionUnavailableListe
fs.mkdirs(parentdir); fs.mkdirs(parentdir);
newlogdir = new Path(parentdir, "log"); newlogdir = new Path(parentdir, "log");
log = new HLog(fs, newlogdir, conf); log = new HLog(fs, newlogdir, conf, null);
desc = new HTableDescriptor("test"); desc = new HTableDescriptor("test");
desc.addFamily(new HColumnDescriptor("contents:")); desc.addFamily(new HColumnDescriptor("contents:"));
desc.addFamily(new HColumnDescriptor("anchor:")); desc.addFamily(new HColumnDescriptor("anchor:"));
r = new HRegion(parentdir, log, fs, conf, r = new HRegion(parentdir, log, fs, conf,
new HRegionInfo(desc, null, null), null); new HRegionInfo(desc, null, null), null, null);
region = new HRegionIncommon(r); region = new HRegionIncommon(r);
} }

View File

@ -19,10 +19,12 @@
*/ */
package org.apache.hadoop.hbase; package org.apache.hadoop.hbase;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.dfs.MiniDFSCluster; import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
/** /**
@ -32,7 +34,8 @@ public class TestLogRolling extends HBaseTestCase {
private static final Log LOG = LogFactory.getLog(TestLogRolling.class); private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
private MiniDFSCluster dfs; private MiniDFSCluster dfs;
private MiniHBaseCluster cluster; private MiniHBaseCluster cluster;
private Path logdir; private HRegionServer server;
private HLog log;
private String tableName; private String tableName;
private byte[] value; private byte[] value;
@ -45,10 +48,14 @@ public class TestLogRolling extends HBaseTestCase {
try { try {
this.dfs = null; this.dfs = null;
this.cluster = null; this.cluster = null;
this.logdir = null; this.server = null;
this.log = null;
this.tableName = null; this.tableName = null;
this.value = null; this.value = null;
// Force a region split after every 768KB
conf.setLong("hbase.hregion.max.filesize", 768L * 1024L);
// We roll the log after every 256 writes // We roll the log after every 256 writes
conf.setInt("hbase.regionserver.maxlogentries", 256); conf.setInt("hbase.regionserver.maxlogentries", 256);
@ -118,8 +125,8 @@ public class TestLogRolling extends HBaseTestCase {
// continue // continue
} }
this.logdir = this.server = cluster.getRegionThreads().get(0).getRegionServer();
cluster.getRegionThreads().get(0).getRegionServer().getLog().dir; this.log = server.getLog();
// When the META table can be opened, the region servers are running // When the META table can be opened, the region servers are running
@SuppressWarnings("unused") @SuppressWarnings("unused")
@ -150,21 +157,6 @@ public class TestLogRolling extends HBaseTestCase {
} }
} }
private int countLogFiles(final boolean print) throws Exception {
Path[] logfiles = dfs.getFileSystem().listPaths(new Path[] {this.logdir});
if (print) {
for (int i = 0; i < logfiles.length; i++) {
if (LOG.isDebugEnabled()) {
LOG.debug("logfile: " + logfiles[i].toString());
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("number of log files: " + logfiles.length);
}
return logfiles.length;
}
/** /**
* Tests that logs are deleted * Tests that logs are deleted
* *
@ -172,21 +164,24 @@ public class TestLogRolling extends HBaseTestCase {
*/ */
public void testLogRolling() throws Exception { public void testLogRolling() throws Exception {
tableName = getName(); tableName = getName();
// Force a region split after every 768KB
conf.setLong("hbase.hregion.max.filesize", 768L * 1024L);
try { try {
startAndWriteData(); startAndWriteData();
int count = countLogFiles(true); LOG.info("after writing there are " + log.getNumLogFiles() + " log files");
LOG.info("Finished writing. There are " + count + " log files. " +
"Sleeping to let cache flusher and log roller run"); // flush all regions
while (count > 2) {
try { List<HRegion> regions =
Thread.sleep(1000L); new ArrayList<HRegion>(server.getOnlineRegions().values());
} catch (InterruptedException e) { for (HRegion r: regions) {
LOG.info("Sleep interrupted", e); r.flushcache();
}
count = countLogFiles(true);
} }
// Now roll the log
log.rollWriter();
int count = log.getNumLogFiles();
LOG.info("after flushing all regions and rolling logs there are " +
log.getNumLogFiles() + " log files");
assertTrue(count <= 2); assertTrue(count <= 2);
} catch (Exception e) { } catch (Exception e) {
LOG.fatal("unexpected exception", e); LOG.fatal("unexpected exception", e);

View File

@ -144,9 +144,9 @@ public class TestScanner extends HBaseTestCase {
HRegionInfo.encodeRegionName(REGION_INFO.getRegionName())); HRegionInfo.encodeRegionName(REGION_INFO.getRegionName()));
fs.mkdirs(regionDir); fs.mkdirs(regionDir);
HLog log = new HLog(fs, new Path(regionDir, "log"), conf); HLog log = new HLog(fs, new Path(regionDir, "log"), conf, null);
r = new HRegion(dir, log, fs, conf, REGION_INFO, null); r = new HRegion(dir, log, fs, conf, REGION_INFO, null, null);
region = new HRegionIncommon(r); region = new HRegionIncommon(r);
// Write information to the meta table // Write information to the meta table
@ -169,7 +169,7 @@ public class TestScanner extends HBaseTestCase {
r.close(); r.close();
log.rollWriter(); log.rollWriter();
r = new HRegion(dir, log, fs, conf, REGION_INFO, null); r = new HRegion(dir, log, fs, conf, REGION_INFO, null, null);
region = new HRegionIncommon(r); region = new HRegionIncommon(r);
// Verify we can get the data back now that it is on disk. // Verify we can get the data back now that it is on disk.
@ -210,7 +210,7 @@ public class TestScanner extends HBaseTestCase {
r.close(); r.close();
log.rollWriter(); log.rollWriter();
r = new HRegion(dir, log, fs, conf, REGION_INFO, null); r = new HRegion(dir, log, fs, conf, REGION_INFO, null, null);
region = new HRegionIncommon(r); region = new HRegionIncommon(r);
// Validate again // Validate again
@ -247,7 +247,7 @@ public class TestScanner extends HBaseTestCase {
r.close(); r.close();
log.rollWriter(); log.rollWriter();
r = new HRegion(dir, log, fs, conf, REGION_INFO, null); r = new HRegion(dir, log, fs, conf, REGION_INFO, null, null);
region = new HRegionIncommon(r); region = new HRegionIncommon(r);
// Validate again // Validate again

View File

@ -65,11 +65,11 @@ public class TestSplit extends MultiRegionTable {
*/ */
public void testBasicSplit() throws Exception { public void testBasicSplit() throws Exception {
HRegion region = null; HRegion region = null;
HLog hlog = new HLog(this.localFs, this.testDir, this.conf); HLog hlog = new HLog(this.localFs, this.testDir, this.conf, null);
try { try {
HTableDescriptor htd = createTableDescriptor(getName()); HTableDescriptor htd = createTableDescriptor(getName());
HRegionInfo hri = new HRegionInfo(htd, null, null); HRegionInfo hri = new HRegionInfo(htd, null, null);
region = new HRegion(testDir, hlog, this.localFs, this.conf, hri, null); region = new HRegion(testDir, hlog, this.localFs, this.conf, hri, null, null);
basicSplit(region); basicSplit(region);
} finally { } finally {
if (region != null) { if (region != null) {
@ -81,7 +81,7 @@ public class TestSplit extends MultiRegionTable {
private void basicSplit(final HRegion region) throws Exception { private void basicSplit(final HRegion region) throws Exception {
addContent(region, COLFAMILY_NAME3); addContent(region, COLFAMILY_NAME3);
region.internalFlushcache(region.snapshotMemcaches()); region.flushcache();
Text midkey = new Text(); Text midkey = new Text();
assertTrue(region.needsSplit(midkey)); assertTrue(region.needsSplit(midkey));
HRegion [] regions = split(region); HRegion [] regions = split(region);
@ -108,12 +108,7 @@ public class TestSplit extends MultiRegionTable {
} }
addContent(regions[i], COLFAMILY_NAME2); addContent(regions[i], COLFAMILY_NAME2);
addContent(regions[i], COLFAMILY_NAME1); addContent(regions[i], COLFAMILY_NAME1);
long startTime = region.snapshotMemcaches(); regions[i].flushcache();
if (startTime == -1) {
LOG.info("cache flush not needed");
} else {
regions[i].internalFlushcache(startTime);
}
} }
// Assert that even if one store file is larger than a reference, the // Assert that even if one store file is larger than a reference, the

View File

@ -310,11 +310,11 @@ public class TestTimestamp extends HBaseTestCase {
} }
private HRegion createRegion() throws IOException { private HRegion createRegion() throws IOException {
HLog hlog = new HLog(this.localFs, this.testDir, this.conf); HLog hlog = new HLog(this.localFs, this.testDir, this.conf, null);
HTableDescriptor htd = createTableDescriptor(getName()); HTableDescriptor htd = createTableDescriptor(getName());
htd.addFamily(new HColumnDescriptor(COLUMN, VERSIONS, htd.addFamily(new HColumnDescriptor(COLUMN, VERSIONS,
CompressionType.NONE, false, Integer.MAX_VALUE, null)); CompressionType.NONE, false, Integer.MAX_VALUE, null));
HRegionInfo hri = new HRegionInfo(htd, null, null); HRegionInfo hri = new HRegionInfo(htd, null, null);
return new HRegion(testDir, hlog, this.localFs, this.conf, hri, null); return new HRegion(testDir, hlog, this.localFs, this.conf, hri, null, null);
} }
} }

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.mapred;
import java.io.IOException; import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
@ -87,6 +88,9 @@ public class TestTableMapReduce extends MultiRegionTable {
public TestTableMapReduce() { public TestTableMapReduce() {
super(); super();
// The region server doesn't have to talk to the master quite so often
conf.setInt("hbase.regionserver.msginterval", 2000);
// Make the thread wake frequency a little slower so other threads // Make the thread wake frequency a little slower so other threads
// can run // can run
conf.setInt("hbase.server.thread.wakefrequency", 2000); conf.setInt("hbase.server.thread.wakefrequency", 2000);
@ -105,6 +109,9 @@ public class TestTableMapReduce extends MultiRegionTable {
// Make lease timeout longer, lease checks less frequent // Make lease timeout longer, lease checks less frequent
conf.setInt("hbase.master.lease.period", 10 * 1000); conf.setInt("hbase.master.lease.period", 10 * 1000);
conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000); conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
// Set client pause to the original default
conf.setInt("hbase.client.pause", 10 * 1000);
} }
/** /**
@ -381,9 +388,11 @@ public class TestTableMapReduce extends MultiRegionTable {
assertNotNull(firstValue); assertNotNull(firstValue);
assertNotNull(secondValue); assertNotNull(secondValue);
assertEquals(firstValue.length, secondValue.length); assertEquals(firstValue.length, secondValue.length);
for (int i=0; i<firstValue.length; i++) { byte[] secondReversed = new byte[secondValue.length];
assertEquals(firstValue[i], secondValue[firstValue.length-i-1]); for (int i = 0, j = secondValue.length - 1; j >= 0; j--, i++) {
secondReversed[i] = secondValue[j];
} }
assertTrue(Arrays.equals(firstValue, secondReversed));
} }
} finally { } finally {