HBASE-2414 Enhance test suite to be able to specify distributed scenarios

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@939567 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-04-30 06:52:27 +00:00
parent 658c28fa02
commit 19b01d968d
22 changed files with 1077 additions and 405 deletions

View File

@ -543,6 +543,7 @@ Release 0.21.0 - Unreleased
HBASE-2393 ThriftServer instantiates a new HTable per request
(Bogdan DRAGU via Stack)
HBASE-2496 Less ArrayList churn on the scan path
HBASE-2414 Enhance test suite to be able to specify distributed scenarios
NEW FEATURES
HBASE-1961 HBase EC2 scripts

View File

@ -20,8 +20,6 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -29,10 +27,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.ReflectionUtils;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
/**
* This class creates a single process HBase cluster. One thread is created for
@ -57,8 +56,7 @@ import org.apache.hadoop.util.ReflectionUtils;
public class LocalHBaseCluster implements HConstants {
static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class);
private final HMaster master;
private final List<RegionServerThread> regionThreads =
new ArrayList<RegionServerThread>();
private final List<JVMClusterUtil.RegionServerThread> regionThreads;
private final static int DEFAULT_NO = 1;
/** local mode */
public static final String LOCAL = "local";
@ -84,47 +82,50 @@ public class LocalHBaseCluster implements HConstants {
* @param noRegionServers Count of regionservers to start.
* @throws IOException
*/
@SuppressWarnings("unchecked")
public LocalHBaseCluster(final Configuration conf,
final int noRegionServers)
throws IOException {
this(conf, noRegionServers, HMaster.class);
}
/**
* Constructor.
* @param conf Configuration to use. Post construction has the master's
* address.
* @param noRegionServers Count of regionservers to start.
* @param masterClass
* @throws IOException
*/
@SuppressWarnings("unchecked")
public LocalHBaseCluster(final Configuration conf,
final int noRegionServers, final Class masterClass)
throws IOException {
this.conf = conf;
// Create the master
this.master = new HMaster(conf);
this.master = HMaster.constructMaster(masterClass, conf);
// Start the HRegionServers. Always have region servers come up on
// port '0' so there won't be clashes over default port as unit tests
// start/stop ports at different times during the life of the test.
conf.set(REGIONSERVER_PORT, "0");
regionServerClass = (Class<? extends HRegionServer>)
conf.getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
this.regionThreads =
new CopyOnWriteArrayList<JVMClusterUtil.RegionServerThread>();
this.regionServerClass =
(Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
HRegionServer.class);
for (int i = 0; i < noRegionServers; i++) {
addRegionServer();
addRegionServer(i);
}
}
/**
* Creates a region server.
* Call 'start' on the returned thread to make it run.
*
* @throws IOException
* @return Region server added.
*/
public RegionServerThread addRegionServer() throws IOException {
synchronized (regionThreads) {
HRegionServer server;
try {
server = regionServerClass.getConstructor(Configuration.class).
newInstance(conf);
} catch (Exception e) {
IOException ioe = new IOException();
ioe.initCause(e);
throw ioe;
}
RegionServerThread t = new RegionServerThread(server,
this.regionThreads.size());
this.regionThreads.add(t);
return t;
}
public JVMClusterUtil.RegionServerThread addRegionServer() throws IOException {
return addRegionServer(this.regionThreads.size());
}
public JVMClusterUtil.RegionServerThread addRegionServer(final int index) throws IOException {
JVMClusterUtil.RegionServerThread rst = JVMClusterUtil.createRegionServerThread(this.conf,
this.regionServerClass, index);
this.regionThreads.add(rst);
return rst;
}
/**
@ -132,38 +133,7 @@ public class LocalHBaseCluster implements HConstants {
* @return region server
*/
public HRegionServer getRegionServer(int serverNumber) {
synchronized (regionThreads) {
return regionThreads.get(serverNumber).getRegionServer();
}
}
/** runs region servers */
public static class RegionServerThread extends Thread {
private final HRegionServer regionServer;
RegionServerThread(final HRegionServer r, final int index) {
super(r, "RegionServer:" + index);
this.regionServer = r;
}
/** @return the region server */
public HRegionServer getRegionServer() {
return this.regionServer;
}
/**
* Block until the region server has come online, indicating it is ready
* to be used.
*/
public void waitForServerOnline() {
while (!regionServer.isOnline()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// continue waiting
}
}
}
return regionThreads.get(serverNumber).getRegionServer();
}
/**
@ -176,7 +146,7 @@ public class LocalHBaseCluster implements HConstants {
/**
* @return Read-only list of region server threads.
*/
public List<RegionServerThread> getRegionServers() {
public List<JVMClusterUtil.RegionServerThread> getRegionServers() {
return Collections.unmodifiableList(this.regionThreads);
}
@ -187,10 +157,8 @@ public class LocalHBaseCluster implements HConstants {
* @return Name of region server that just went down.
*/
public String waitOnRegionServer(int serverNumber) {
RegionServerThread regionServerThread;
synchronized (regionThreads) {
regionServerThread = this.regionThreads.remove(serverNumber);
}
JVMClusterUtil.RegionServerThread regionServerThread =
this.regionThreads.remove(serverNumber);
while (regionServerThread.isAlive()) {
try {
LOG.info("Waiting on " +
@ -211,14 +179,12 @@ public class LocalHBaseCluster implements HConstants {
*/
public void join() {
if (this.regionThreads != null) {
synchronized(this.regionThreads) {
for(Thread t: this.regionThreads) {
if (t.isAlive()) {
try {
t.join();
} catch (InterruptedException e) {
// continue
}
} catch (InterruptedException e) {
// continue
}
}
}
@ -234,79 +200,16 @@ public class LocalHBaseCluster implements HConstants {
/**
* Start the cluster.
* @return Address to use contacting master.
*/
public String startup() {
this.master.start();
synchronized (regionThreads) {
for (RegionServerThread t: this.regionThreads) {
t.start();
}
}
return this.master.getMasterAddress().toString();
public void startup() {
JVMClusterUtil.startup(this.master, this.regionThreads);
}
/**
* Shut down the mini HBase cluster
* @throws IOException
*/
public void shutdown() throws IOException {
LOG.debug("Shutting down HBase Cluster");
// Be careful about how we shutdown hdfs. Its done elsewhere.
synchronized (this.regionThreads) {
for (RegionServerThread t: this.regionThreads) {
t.getRegionServer().setShutdownHDFS(false);
}
}
if(this.master != null) {
this.master.shutdown();
}
// regionServerThreads can never be null because they are initialized when
// the class is constructed.
synchronized(this.regionThreads) {
for(Thread t: this.regionThreads) {
if (t.isAlive()) {
try {
t.join();
} catch (InterruptedException e) {
// continue
}
}
}
}
if (this.master != null) {
while (this.master.isAlive()) {
try {
// The below has been replaced to debug sometime hangs on end of
// tests.
// this.master.join():
threadDumpingJoin(this.master);
} catch(InterruptedException e) {
// continue
}
}
}
LOG.info("Shutdown " + this.regionThreads.size() + " region server(s)");
}
/**
* @param t
* @throws InterruptedException
*/
public void threadDumpingJoin(final Thread t) throws InterruptedException {
if (t == null) {
return;
}
long startTime = System.currentTimeMillis();
while (t.isAlive()) {
Thread.sleep(1000);
if (System.currentTimeMillis() - startTime > 60000) {
startTime = System.currentTimeMillis();
ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
"Automatic Stack Trace every 60 seconds waiting on " +
t.getName());
}
}
public void shutdown() {
JVMClusterUtil.shutdown(this.master, this.regionThreads);
}
/**

View File

@ -89,10 +89,6 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -141,13 +137,6 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
// The Path to the old logs dir
private final Path oldLogDir;
// Queues for RegionServerOperation events. Includes server open, shutdown,
// and region open and close.
private final DelayQueue<RegionServerOperation> delayedToDoQueue =
new DelayQueue<RegionServerOperation>();
private final BlockingQueue<RegionServerOperation> toDoQueue =
new PriorityBlockingQueue<RegionServerOperation>();
private final HBaseServer rpcServer;
private final HServerAddress address;
@ -157,6 +146,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
private long lastFragmentationQuery = -1L;
private Map<String, Integer> fragmentation = null;
private final RegionServerOperationQueue regionServerOperationQueue;
/**
* Constructor
@ -202,6 +192,8 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
this.zkMasterAddressWatcher =
new ZKMasterAddressWatcher(this.zooKeeperWrapper, this.shutdownRequested);
this.zkMasterAddressWatcher.writeAddressToZooKeeper(this.address, true);
this.regionServerOperationQueue =
new RegionServerOperationQueue(this.conf, this.closed);
serverManager = new ServerManager(this);
regionManager = new RegionManager(this);
@ -407,6 +399,10 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
return this.serverManager.getAverageLoad();
}
RegionServerOperationQueue getRegionServerOperationQueue () {
return this.regionServerOperationQueue;
}
/**
* Get the directory where old logs go
* @return the dir
@ -433,7 +429,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
startServiceThreads();
/* Main processing loop */
try {
while (!this.closed.get()) {
FINISHED: while (!this.closed.get()) {
// check if we should be shutting down
if (this.shutdownRequested.get()) {
// The region servers won't all exit until we stop scanning the
@ -444,9 +440,15 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
break;
}
}
// work on the TodoQueue. If that fails, we should shut down.
if (!processToDoQueue()) {
break;
if (this.regionManager.getRootRegionLocation() != null) {
switch(this.regionServerOperationQueue.process()) {
case FAILED:
break FINISHED;
case REQUEUED_BUT_PROBLEM:
if (!checkFileSystem()) break FINISHED;
default: // PROCESSED, NOOP, REQUEUED:
break;
}
}
}
} catch (Throwable t) {
@ -474,93 +476,6 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
LOG.info("HMaster main thread exiting");
}
/*
* Try to get an operation off of the todo queue and perform it.
* We actually have two tiers of todo; those that we couldn't do immediately
* which we put aside and then current current todos. We look at put-asides
* first.
* @return True if we have nothing to do or we're to close.
*/
private boolean processToDoQueue() {
RegionServerOperation op = null;
// block until the root region is online
if (this.regionManager.getRootRegionLocation() != null) {
// We can't process server shutdowns unless the root region is online
op = this.delayedToDoQueue.poll();
}
// if there aren't any todo items in the queue, sleep for a bit.
if (op == null) {
try {
op = this.toDoQueue.poll(this.threadWakeFrequency, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// continue
}
}
// at this point, if there's still no todo operation, or we're supposed to
// be closed, return.
if (op == null || this.closed.get()) {
return true;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Processing todo: " + op.toString());
}
try {
// perform the operation.
if (!op.process()) {
// Operation would have blocked because not all meta regions are
// online. This could cause a deadlock, because this thread is waiting
// for the missing meta region(s) to come back online, but since it
// is waiting, it cannot process the meta region online operation it
// is waiting for. So put this operation back on the queue for now.
if (this.toDoQueue.size() == 0) {
// The queue is currently empty so wait for a while to see if what
// we need comes in first
this.sleeper.sleep();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Put " + op.toString() + " back on queue");
}
queue(op);
}
} catch (Exception ex) {
// There was an exception performing the operation.
if (ex instanceof RemoteException) {
try {
ex = RemoteExceptionHandler.decodeRemoteException(
(RemoteException)ex);
} catch (IOException e) {
ex = e;
LOG.warn("main processing loop: " + op.toString(), e);
}
}
// make sure the filesystem is still ok. otherwise, we're toast.
if (!checkFileSystem()) {
return false;
}
LOG.warn("Adding to delayed queue: " + op.toString(), ex);
requeue(op);
}
return true;
}
/**
* @param op operation to requeue; added to the delayedToDoQueue.
*/
void requeue(final RegionServerOperation op) {
this.delayedToDoQueue.put(op);
}
/**
* @param op Operation to queue. Added to the TODO queue.
*/
void queue(final RegionServerOperation op) {
try {
this.toDoQueue.put(op);
} catch (InterruptedException e) {
LOG.error("Failed queue: " + op.toString(), e);
}
}
/*
* Joins cluster. Checks to see if this instance of HBase is fresh or the
* master was started following a failover. In the second case, it inspects
@ -706,12 +621,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
void startShutdown() {
this.closed.set(true);
this.regionManager.stopScanners();
synchronized(toDoQueue) {
this.toDoQueue.clear();
this.delayedToDoQueue.clear();
// Wake main thread; TODO: Is this necessary?
this.toDoQueue.notifyAll();
}
this.regionServerOperationQueue.shutdown();
this.serverManager.notifyServers();
}
@ -752,7 +662,18 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
public HMsg [] regionServerReport(HServerInfo serverInfo, HMsg msgs[],
HRegionInfo[] mostLoadedRegions)
throws IOException {
return serverManager.regionServerReport(serverInfo, msgs, mostLoadedRegions);
return adornRegionServerAnswer(serverInfo,
this.serverManager.regionServerReport(serverInfo, msgs, mostLoadedRegions));
}
/**
* Override if you'd add messages to return to regionserver <code>hsi</code>
* @param messages Messages to add to
* @return Messages to return to
*/
protected HMsg [] adornRegionServerAnswer(final HServerInfo hsi,
final HMsg [] msgs) {
return msgs;
}
public boolean isMasterRunning() {
@ -1192,7 +1113,26 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
System.exit(0);
}
protected static void doMain(String [] args, Class<? extends HMaster> clazz) {
/**
* Utility for constructing an instance of the passed HMaster class.
* @param masterClass
* @param conf
* @return HMaster instance.
*/
public static HMaster constructMaster(Class<? extends HMaster> masterClass,
final Configuration conf) {
try {
Constructor<? extends HMaster> c =
masterClass.getConstructor(Configuration.class);
return c.newInstance(conf);
} catch (Exception e) {
throw new RuntimeException("Failed construction of " +
"Master: " + masterClass.toString(), e);
}
}
protected static void doMain(String [] args,
Class<? extends HMaster> masterClass) {
if (args.length < 1) {
printUsageAndExit();
}
@ -1239,9 +1179,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
conf.set("hbase.zookeeper.property.clientPort", Integer.toString(clientPort));
(new LocalHBaseCluster(conf)).startup();
} else {
Constructor<? extends HMaster> c =
clazz.getConstructor(Configuration.class);
HMaster master = c.newInstance(conf);
HMaster master = constructMaster(masterClass, conf);
if (master.shutdownRequested.get()) {
LOG.info("Won't bring the Master up as a shutdown is requested");
return;

View File

@ -58,6 +58,13 @@ class ProcessRegionClose extends ProcessRegionStatusChange {
@Override
protected boolean process() throws IOException {
if (!metaRegionAvailable()) {
// We can't proceed unless the meta region we are going to update
// is online. metaRegionAvailable() has put this operation on the
// delayedToDoQueue, so return true so the operation is not put
// back on the toDoQueue
return true;
}
Boolean result = null;
if (offlineRegion || reassignRegion) {
result =

View File

@ -58,6 +58,7 @@ class ProcessRegionOpen extends ProcessRegionStatusChange {
@Override
protected boolean process() throws IOException {
// TODO: The below check is way too convoluted!!!
if (!metaRegionAvailable()) {
// We can't proceed unless the meta region we are going to update
// is online. metaRegionAvailable() has put this operation on the

View File

@ -109,6 +109,13 @@ class ProcessServerShutdown extends RegionServerOperation {
}
}
/**
* @return Name of server we are processing.
*/
public HServerAddress getDeadServerAddress() {
return this.deadServerAddress;
}
@Override
public String toString() {
return "ProcessServerShutdown of " + this.deadServer;

View File

@ -33,7 +33,7 @@ abstract class RegionServerOperation implements Delayed, HConstants {
private long expire;
protected final HMaster master;
final int delay;
private int delay;
protected RegionServerOperation(HMaster master) {
this.master = master;
@ -41,13 +41,28 @@ abstract class RegionServerOperation implements Delayed, HConstants {
getInt("hbase.server.thread.wakefrequency", 10 * 1000);
// Set the future time at which we expect to be released from the
// DelayQueue we're inserted in on lease expiration.
this.expire = whenToExpire();
resetExpiration();
}
/**
* Call before putting this back on the delay queue.
* @return When we will expire next.
*/
long resetExpiration() {
// Set the future time at which we expect to be released from the
// DelayQueue we're inserted in on lease expiration.
this.expire = System.currentTimeMillis() + this.delay;
return this.expire;
}
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
}
void setDelay(final int d) {
this.delay = d;
}
public int compareTo(Delayed o) {
return Long.valueOf(getDelay(TimeUnit.MILLISECONDS)
@ -55,8 +70,7 @@ abstract class RegionServerOperation implements Delayed, HConstants {
}
protected void requeue() {
this.expire = whenToExpire();
this.master.requeue(this);
this.master.getRegionServerOperationQueue().putOnDelayQueue(this);
}
private long whenToExpire() {
@ -103,5 +117,6 @@ abstract class RegionServerOperation implements Delayed, HConstants {
protected int getPriority() {
return Integer.MAX_VALUE;
}
protected abstract boolean process() throws IOException;
}
}

View File

@ -0,0 +1,43 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
/**
* Listener for regionserver events in master.
* @see HMaster#registerRegionServerOperationListener(RegionServerOperationListener)
* @see HMaster#unregisterRegionServerOperationListener(RegionServerOperationListener)
*/
public interface RegionServerOperationListener {
/**
* Called before processing <code>op</code>
* @param op
* @return True if we are to proceed w/ processing.
* @exception IOException
*/
public boolean process(final RegionServerOperation op) throws IOException;
/**
* Called after <code>op</code> has been processed.
* @param op The operation that just completed.
*/
public void processed(final RegionServerOperation op);
}

View File

@ -0,0 +1,211 @@
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.ipc.RemoteException;
/**
* Keeps up the queue of {@link RegionServerOperation}s.
* Has both live queue and a temporary put-aside queue; if processing of the
* live todo queue fails for some reason, we'll add the item back on the delay
* queue for retry later. Call {@link #shutdown()} to effect a cleanup of
* queues when done. Listen to this queue by registering
* {@link RegionServerOperationListener}s.
* @see #registerRegionServerOperationListener(RegionServerOperationListener)
* @see #unregisterRegionServerOperationListener(RegionServerOperationListener)
*/
public class RegionServerOperationQueue {
// TODO: Build up the junit test of this class.
private final Log LOG = LogFactory.getLog(this.getClass());
/**
* Enums returned by {@link RegionServerOperationQueue#process()};
*/
public static enum ProcessingResultCode {
/**
* Operation was processed successfully.
*/
PROCESSED,
/**
* Nothing to do.
*/
NOOP,
/**
* Operation was put-aside for now. Will be retried later.
*/
REQUEUED,
/**
* Failed processing of the operation.
*/
FAILED,
/**
* Operation was requeued but we failed its processing for some reason
* (Bad filesystem?).
*/
REQUEUED_BUT_PROBLEM
};
/*
* Do not put items directly on this queue. Use {@link #putOnDelayQueue(RegionServerOperation)}.
* It makes sure the expiration on the RegionServerOperation added is updated.
*/
private final DelayQueue<RegionServerOperation> delayedToDoQueue =
new DelayQueue<RegionServerOperation>();
private final BlockingQueue<RegionServerOperation> toDoQueue =
new PriorityBlockingQueue<RegionServerOperation>();
private final Set<RegionServerOperationListener> listeners =
new CopyOnWriteArraySet<RegionServerOperationListener>();
private final int threadWakeFrequency;
private final AtomicBoolean closed;
private final Sleeper sleeper;
RegionServerOperationQueue(final Configuration c, final AtomicBoolean closed) {
this.threadWakeFrequency = c.getInt(HMaster.THREAD_WAKE_FREQUENCY, 10 * 1000);
this.closed = closed;
this.sleeper = new Sleeper(this.threadWakeFrequency, this.closed);
}
public void put(final RegionServerOperation op) {
try {
this.toDoQueue.put(op);
} catch (InterruptedException e) {
LOG.warn("Insertion into todo queue interrupted; putting on delay queue", e);
putOnDelayQueue(op);
}
}
/**
* Try to get an operation off of the queue and process it.
* @return {@link ProcessingResultCode#PROCESSED},
* {@link ProcessingResultCode#REQUEUED},
* {@link ProcessingResultCode#REQUEUED_BUT_PROBLEM}
*/
public synchronized ProcessingResultCode process() {
RegionServerOperation op = delayedToDoQueue.poll();
// if there aren't any todo items in the queue, sleep for a bit.
if (op == null) {
try {
op = toDoQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.debug("Interrupted", e);
}
}
// At this point, if there's still no todo operation, or we're supposed to
// be closed, return.
if (op == null || closed.get()) {
return ProcessingResultCode.NOOP;
}
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing todo: " + op.toString());
}
if (!process(op)) {
// Add it back on the queue.
putOnDelayQueue(op);
} else if (op.process()) {
processed(op);
} else {
// Operation would have blocked because not all meta regions are
// online. This could cause a deadlock, because this thread is waiting
// for the missing meta region(s) to come back online, but since it
// is waiting, it cannot process the meta region online operation it
// is waiting for. So put this operation back on the queue for now.
if (toDoQueue.size() == 0) {
// The queue is currently empty so wait for a while to see if what
// we need comes in first
this.sleeper.sleep();
}
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Put " + op.toString() + " back on queue");
}
toDoQueue.put(op);
} catch (InterruptedException e) {
throw new RuntimeException(
"Putting into toDoQueue was interrupted.", e);
}
}
} catch (Exception ex) {
// There was an exception performing the operation.
if (ex instanceof RemoteException) {
try {
ex = RemoteExceptionHandler.decodeRemoteException(
(RemoteException)ex);
} catch (IOException e) {
ex = e;
LOG.warn("main processing loop: " + op.toString(), e);
}
}
LOG.warn("Failed processing: " + op.toString() +
"; putting onto delayed todo queue", ex);
putOnDelayQueue(op);
return ProcessingResultCode.REQUEUED_BUT_PROBLEM;
}
return ProcessingResultCode.REQUEUED;
}
void putOnDelayQueue(final RegionServerOperation op) {
op.resetExpiration();
this.delayedToDoQueue.put(op);
}
/**
* Clean up the queues.
*/
public synchronized void shutdown() {
this.toDoQueue.clear();
this.delayedToDoQueue.clear();
}
/**
* @param l Register this listener of RegionServerOperation events.
*/
public void registerRegionServerOperationListener(final RegionServerOperationListener l) {
this.listeners.add(l);
}
/**
* @param l Unregister this listener for RegionServerOperation events.
* @return True if this listener was registered.
*/
public boolean unregisterRegionServerOperationListener(final RegionServerOperationListener l) {
return this.listeners.remove(l);
}
/*
* Tell listeners that we processed a RegionServerOperation.
* @param op Operation to tell the world about.
*/
private void processed(final RegionServerOperation op) {
if (this.listeners.isEmpty()) return;
for (RegionServerOperationListener listener: this.listeners) {
listener.processed(op);
}
}
/*
* Tell listeners that we processed a RegionServerOperation.
* @param op Operation to tell the world about.
*/
private boolean process(final RegionServerOperation op) throws IOException {
if (this.listeners.isEmpty()) return true;
for (RegionServerOperationListener listener: this.listeners) {
if (!listener.process(op)) return false;
}
return true;
}
}

View File

@ -196,7 +196,8 @@ public class ServerManager implements HConstants {
// The startup message was from a known server with the same name.
// Timeout the old one right away.
this.master.getRegionManager().getRootRegionLocation();
this.master.queue(new ProcessServerShutdown(this.master, storedInfo));
RegionServerOperation op = new ProcessServerShutdown(master, storedInfo);
this.master.getRegionServerOperationQueue().put(op);
}
recordNewServer(info);
}
@ -599,8 +600,9 @@ public class ServerManager implements HConstants {
// Note that the table has been assigned and is waiting for the
// meta table to be updated.
this.master.getRegionManager().setOpen(region.getRegionNameAsString());
// Queue up an update to note the region location.
this.master.queue(new ProcessRegionOpen(master, serverInfo, region));
RegionServerOperation op =
new ProcessRegionOpen(master, serverInfo, region);
this.master.getRegionServerOperationQueue().put(op);
}
}
}
@ -637,8 +639,9 @@ public class ServerManager implements HConstants {
// processed before an open resulting in the master not agreeing on
// the region's state.
this.master.getRegionManager().setClosed(region.getRegionNameAsString());
this.master.queue(new ProcessRegionClose(master, region,
offlineRegion, reassignRegion));
RegionServerOperation op =
new ProcessRegionClose(master, region, offlineRegion, reassignRegion);
this.master.getRegionServerOperationQueue().put(op);
}
}
@ -800,7 +803,7 @@ public class ServerManager implements HConstants {
}
public void process(WatchedEvent event) {
if(event.getType().equals(EventType.NodeDeleted)) {
if (event.getType().equals(EventType.NodeDeleted)) {
LOG.info(server + " znode expired");
// Remove the server from the known servers list and update load info
serverAddressToServerInfo.remove(serverAddress);
@ -821,7 +824,8 @@ public class ServerManager implements HConstants {
}
}
deadServers.add(server);
master.queue(new ProcessServerShutdown(master, info));
RegionServerOperation op = new ProcessServerShutdown(master, info);
master.getRegionServerOperationQueue().put(op);
}
synchronized (serversToServerInfo) {
serversToServerInfo.notifyAll();
@ -872,5 +876,4 @@ public class ServerManager implements HConstants {
public void setMinimumServerCount(int minimumServerCount) {
this.minimumServerCount = minimumServerCount;
}
}

View File

@ -119,7 +119,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
private static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING);
private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED);
private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {};
// Set when a report to the master comes back with a message asking us to
// shutdown. Also set by call to stop when debugging or running unit tests
// of HRegionServer in isolation. We use AtomicBoolean rather than
@ -169,7 +169,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
// Server to handle client requests. Default access so can be accessed by
// unit tests.
HBaseServer server;
// Leases
private Leases leases;
@ -221,7 +221,9 @@ public class HRegionServer implements HConstants, HRegionInterface,
private final long rpcTimeout;
// Address passed in to constructor.
// Address passed in to constructor. This is not always the address we run
// with. For example, if passed port is 0, then we are to pick a port. The
// actual address we run with is in the #serverInfo data member.
private final HServerAddress address;
// The main region server thread.
@ -244,7 +246,11 @@ public class HRegionServer implements HConstants, HRegionInterface,
conf.get("hbase.regionserver.dns.nameserver","default"));
String addressStr = machineName + ":" +
conf.get(REGIONSERVER_PORT, Integer.toString(DEFAULT_REGIONSERVER_PORT));
this.address = new HServerAddress(addressStr);
// This is not necessarily the address we will run with. The address we
// use will be in #serverInfo data member. For example, we may have been
// passed a port of 0 which means we should pick some ephemeral port to bind
// to.
address = new HServerAddress(addressStr);
LOG.info("My address is " + address);
this.abortRequested = false;
@ -652,12 +658,14 @@ public class HRegionServer implements HConstants, HRegionInterface,
LOG.info("stopping server at: " +
serverInfo.getServerAddress().toString());
}
// Make sure the proxy is down.
if (this.hbaseMaster != null) {
HBaseRPC.stopProxy(this.hbaseMaster);
this.hbaseMaster = null;
}
join();
join();
zooKeeperWrapper.close();
LOG.info(Thread.currentThread().getName() + " exiting");
@ -716,7 +724,13 @@ public class HRegionServer implements HConstants, HRegionInterface,
// Master may have sent us a new address with the other configs.
// Update our address in this case. See HBASE-719
String hra = conf.get("hbase.regionserver.address");
if (address != null) {
// TODO: The below used to be this.address != null. Was broken by what
// looks like a mistake in:
//
// HBASE-1215 migration; metautils scan of meta region was broken; wouldn't see first row
// ------------------------------------------------------------------------
// r796326 | stack | 2009-07-21 07:40:34 -0700 (Tue, 21 Jul 2009) | 38 lines
if (hra != null) {
HServerAddress hsa = new HServerAddress (hra,
this.serverInfo.getServerAddress().getPort());
LOG.info("Master passed us address to use. Was=" +
@ -1180,7 +1194,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
notifyAll(); // FindBugs NN_NAKED_NOTIFY
}
}
/**
* Cause the server to exit without closing the regions it is serving, the
* log it is using and without notifying the master.
@ -1228,9 +1242,8 @@ public class HRegionServer implements HConstants, HRegionInterface,
// Do initial RPC setup. The final argument indicates that the RPC
// should retry indefinitely.
master = (HMasterRegionInterface)HBaseRPC.waitForProxy(
HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
masterAddress.getInetSocketAddress(),
this.conf, -1, this.rpcTimeout);
HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
masterAddress.getInetSocketAddress(), this.conf, -1, this.rpcTimeout);
} catch (IOException e) {
LOG.warn("Unable to connect to master. Retrying. Error was:", e);
sleeper.sleep();
@ -2247,7 +2260,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
}
throw new IOException("Unknown protocol to name node: " + protocol);
}
/**
* @return Queue to which you can add outbound messages.
*/
@ -2290,9 +2303,101 @@ public class HRegionServer implements HConstants, HRegionInterface,
return fs;
}
/**
* @return Info on port this server has bound to, etc.
*/
public HServerInfo getServerInfo() { return this.serverInfo; }
/** {@inheritDoc} */
public long incrementColumnValue(byte [] regionName, byte [] row,
byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
throws IOException {
checkOpen();
if (regionName == null) {
throw new IOException("Invalid arguments to incrementColumnValue " +
"regionName is null");
}
requestCount.incrementAndGet();
try {
HRegion region = getRegion(regionName);
long retval = region.incrementColumnValue(row, family, qualifier, amount,
writeToWAL);
return retval;
} catch (IOException e) {
checkFileSystem();
throw e;
}
}
/** {@inheritDoc} */
public HRegionInfo[] getRegionsAssignment() throws IOException {
HRegionInfo[] regions = new HRegionInfo[onlineRegions.size()];
Iterator<HRegion> ite = onlineRegions.values().iterator();
for(int i = 0; ite.hasNext(); i++) {
regions[i] = ite.next().getRegionInfo();
}
return regions;
}
/** {@inheritDoc} */
public HServerInfo getHServerInfo() throws IOException {
return serverInfo;
}
@Override
public MultiPutResponse multiPut(MultiPut puts) throws IOException {
MultiPutResponse resp = new MultiPutResponse();
// do each region as it's own.
for( Map.Entry<byte[],List<Put>> e: puts.puts.entrySet()) {
int result = put(e.getKey(), e.getValue().toArray(new Put[]{}));
resp.addResult(e.getKey(), result);
e.getValue().clear(); // clear some RAM
}
return resp;
}
public String toString() {
return this.serverInfo.toString();
}
/**
* Interval at which threads should run
* @return the interval
*/
public int getThreadWakeFrequency() {
return threadWakeFrequency;
}
//
// Main program and support routines
//
/**
* @param hrs
* @return Thread the RegionServer is running in correctly named.
*/
public static Thread startRegionServer(final HRegionServer hrs) {
return startRegionServer(hrs,
"regionserver" + hrs.server.getListenerAddress());
}
/**
* @param hrs
* @param name
* @return Thread the RegionServer is running in correctly named.
*/
public static Thread startRegionServer(final HRegionServer hrs,
final String name) {
Thread t = new Thread(hrs);
t.setName(name);
t.start();
return t;
}
private static void printUsageAndExit() {
printUsageAndExit(null);
@ -2305,7 +2410,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
System.err.println("Usage: java org.apache.hbase.HRegionServer start|stop");
System.exit(0);
}
/**
* Do class main.
* @param args
@ -2335,10 +2440,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
}
Constructor<? extends HRegionServer> c =
regionServerClass.getConstructor(Configuration.class);
HRegionServer hrs = c.newInstance(conf);
Thread t = new Thread(hrs);
t.setName("regionserver" + hrs.server.getListenerAddress());
t.start();
startRegionServer(c.newInstance(conf));
}
} catch (Throwable t) {
LOG.error( "Can not start region server because "+
@ -2357,50 +2459,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
printUsageAndExit();
}
}
/** {@inheritDoc} */
public long incrementColumnValue(byte [] regionName, byte [] row,
byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
throws IOException {
checkOpen();
if (regionName == null) {
throw new IOException("Invalid arguments to incrementColumnValue " +
"regionName is null");
}
requestCount.incrementAndGet();
try {
HRegion region = getRegion(regionName);
return region.incrementColumnValue(row, family, qualifier, amount,
writeToWAL);
} catch (IOException e) {
checkFileSystem();
throw e;
}
}
/** {@inheritDoc} */
public HRegionInfo[] getRegionsAssignment() throws IOException {
HRegionInfo[] regions = new HRegionInfo[onlineRegions.size()];
Iterator<HRegion> ite = onlineRegions.values().iterator();
for(int i = 0; ite.hasNext(); i++) {
regions[i] = ite.next().getRegionInfo();
}
return regions;
}
/** {@inheritDoc} */
public HServerInfo getHServerInfo() throws IOException {
return serverInfo;
}
/**
* Interval at which threads should run
* @return the interval
*/
public int getThreadWakeFrequency() {
return threadWakeFrequency;
}
/**
* @param args
@ -2413,20 +2471,4 @@ public class HRegionServer implements HConstants, HRegionInterface,
HRegionServer.class);
doMain(args, regionServerClass);
}
@Override
public MultiPutResponse multiPut(MultiPut puts) throws IOException {
MultiPutResponse resp = new MultiPutResponse();
// do each region as it's own.
for( Map.Entry<byte[],List<Put>> e: puts.puts.entrySet()) {
int result = put(e.getKey(), e.getValue().toArray(new Put[]{}));
resp.addResult(e.getKey(), result);
e.getValue().clear(); // clear some RAM
}
return resp;
}
}

View File

@ -799,6 +799,10 @@ public class HLog implements HConstants, Syncable {
}
lock.lock();
try {
if (syncerShuttingDown) {
LOG.warn(getName() + " was shut down while waiting for sync");
return;
}
if (syncerShuttingDown) {
LOG.warn(getName() + " was shut down while waiting for sync");
return;

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.hbase.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.PrintWriter;
import org.apache.hadoop.util.ReflectionUtils;
import java.lang.Thread.UncaughtExceptionHandler;
@ -74,6 +76,7 @@ public class Threads {
* @param t Thread to shutdown
*/
public static void shutdown(final Thread t, final long joinwait) {
if (t == null) return;
while (t.isAlive()) {
try {
t.join(joinwait);
@ -82,4 +85,38 @@ public class Threads {
}
}
}
}
/**
* @param t Waits on the passed thread to die dumping a threaddump every
* minute while its up.
* @throws InterruptedException
*/
public static void threadDumpingIsAlive(final Thread t)
throws InterruptedException {
if (t == null) {
return;
}
long startTime = System.currentTimeMillis();
while (t.isAlive()) {
Thread.sleep(1000);
if (System.currentTimeMillis() - startTime > 60000) {
startTime = System.currentTimeMillis();
ReflectionUtils.printThreadInfo(new PrintWriter(System.out),
"Automatic Stack Trace every 60 seconds waiting on " +
t.getName());
}
}
}
/**
* @param millis How long to sleep for in milliseconds.
*/
public static void sleep(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@ -201,8 +201,8 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
* regionservers and master threads are no long alive.
*/
public void threadDumpingJoin() {
if (this.cluster.getRegionThreads() != null) {
for(Thread t: this.cluster.getRegionThreads()) {
if (this.cluster.getRegionServerThreads() != null) {
for(Thread t: this.cluster.getRegionServerThreads()) {
threadDumpingJoin(t);
}
}

View File

@ -40,12 +40,12 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.zookeeper.ZooKeeper;
@ -60,7 +60,6 @@ import org.apache.zookeeper.ZooKeeper;
* make changes to configuration parameters.
*/
public class HBaseTestingUtility {
private final Log LOG = LogFactory.getLog(getClass());
private final Configuration conf;
@ -68,6 +67,7 @@ public class HBaseTestingUtility {
private MiniDFSCluster dfsCluster = null;
private MiniHBaseCluster hbaseCluster = null;
private MiniMRCluster mrCluster = null;
// If non-null, then already a cluster running.
private File clusterTestBuildDir = null;
private HBaseAdmin hbaseAdmin = null;
@ -78,7 +78,7 @@ public class HBaseTestingUtility {
public HBaseTestingUtility(Configuration conf) {
this.conf = conf;
}
/** System property key to get test directory value.
*/
public static final String TEST_DIRECTORY_KEY = "test.build.data";
@ -97,6 +97,36 @@ public class HBaseTestingUtility {
return new Path(System.getProperty(TEST_DIRECTORY_KEY, "build/test/data"));
}
/**
* Home our cluster in a dir under build/test. Give it a random name
* so can have many concurrent clusters running if we need to. Need to
* amend the test.build.data System property. Its what minidfscluster bases
* it data dir on. Moding a System property is not the way to do concurrent
* instances -- another instance could grab the temporary
* value unintentionally -- but not anything can do about it at moment; its
* how the minidfscluster works.
* @return The calculated cluster test build directory.
*/
File setupClusterTestBuildDir() {
String oldTestBuildDir =
System.getProperty(TEST_DIRECTORY_KEY, "build/test/data");
String randomStr = UUID.randomUUID().toString();
String dirStr = oldTestBuildDir + "." + randomStr;
File dir = new File(dirStr).getAbsoluteFile();
// Have it cleaned up on exit
dir.deleteOnExit();
return dir;
}
/**
* @throws IOException If cluster already running.
*/
void isRunningCluster() throws IOException {
if (this.clusterTestBuildDir == null) return;
throw new IOException("Cluster already running at " +
this.clusterTestBuildDir);
}
/**
* @param subdirName
* @return Path to a subdirectory named <code>subdirName</code> under
@ -114,15 +144,34 @@ public class HBaseTestingUtility {
startMiniCluster(1);
}
/**
* Call this if you only want a zk cluster.
* @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster.
* @throws Exception
* @see #shutdownMiniZKCluster()
*/
public void startMiniZKCluster() throws Exception {
// Note that this is done before we create the MiniHBaseCluster because we
// need to edit the config to add the ZooKeeper servers.
isRunningCluster();
this.clusterTestBuildDir = setupClusterTestBuildDir();
startMiniZKCluster(this.clusterTestBuildDir);
}
private void startMiniZKCluster(final File dir) throws Exception {
this.zkCluster = new MiniZooKeeperCluster();
int clientPort = this.zkCluster.startup(this.clusterTestBuildDir);
int clientPort = this.zkCluster.startup(dir);
this.conf.set("hbase.zookeeper.property.clientPort",
Integer.toString(clientPort));
}
/**
* @throws IOException
* @see #startMiniZKCluster()
*/
public void shutdownMiniZKCluster() throws IOException {
if (this.zkCluster != null) this.zkCluster.shutdown();
}
/**
* Start up a minicluster of hbase, optinally dfs, and zookeeper.
* Modifies Configuration. Homes the cluster data directory under a random
@ -138,27 +187,13 @@ public class HBaseTestingUtility {
throws Exception {
LOG.info("Starting up minicluster");
// If we already put up a cluster, fail.
if (this.clusterTestBuildDir != null) {
throw new IOException("Cluster already running at " +
this.clusterTestBuildDir);
}
// Now, home our cluster in a dir under build/test. Give it a random name
// so can have many concurrent clusters running if we need to. Need to
// amend the test.build.data System property. Its what minidfscluster bases
// it data dir on. Moding a System property is not the way to do concurrent
// instances -- another instance could grab the temporary
// value unintentionally -- but not anything can do about it at moment; its
// how the minidfscluster works.
String oldTestBuildDir =
isRunningCluster();
String oldBuildTestDir =
System.getProperty(TEST_DIRECTORY_KEY, "build/test/data");
String randomStr = UUID.randomUUID().toString();
String clusterTestBuildDirStr = oldTestBuildDir + "." + randomStr;
this.clusterTestBuildDir =
new File(clusterTestBuildDirStr).getAbsoluteFile();
// Have it cleaned up on exit
this.clusterTestBuildDir.deleteOnExit();
this.clusterTestBuildDir = setupClusterTestBuildDir();
// Set our random dir while minidfscluster is being constructed.
System.setProperty(TEST_DIRECTORY_KEY, clusterTestBuildDirStr);
System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.getPath());
// Bring up mini dfs cluster. This spews a bunch of warnings about missing
// scheme. TODO: fix.
// Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
@ -167,7 +202,8 @@ public class HBaseTestingUtility {
// Restore System property. minidfscluster accesses content of
// the TEST_DIRECTORY_KEY to make bad blocks, a feature we are not using,
// but otherwise, just in constructor.
System.setProperty(TEST_DIRECTORY_KEY, oldTestBuildDir);
System.setProperty(TEST_DIRECTORY_KEY, oldBuildTestDir);
// Mangle conf so fs parameter points to minidfs we just started up
FileSystem fs = this.dfsCluster.getFileSystem();
this.conf.set("fs.defaultFS", fs.getUri().toString());
@ -175,7 +211,7 @@ public class HBaseTestingUtility {
// It could be created before the cluster
if(this.zkCluster == null) {
startMiniZKCluster();
startMiniZKCluster(this.clusterTestBuildDir);
}
// Now do the mini hbase cluster. Set the hbase.rootdir in config.
@ -191,9 +227,18 @@ public class HBaseTestingUtility {
LOG.info("Minicluster is up");
}
/**
* @return Current mini hbase cluster. Only has something in it after a call
* to {@link #startMiniCluster()}.
* @see #startMiniCluster()
*/
public MiniHBaseCluster getMiniHBaseCluster() {
return this.hbaseCluster;
}
/**
* @throws IOException
* @see {@link #startMiniCluster(boolean, int)}
* @see {@link #startMiniCluster(int)}
*/
public void shutdownMiniCluster() throws IOException {
LOG.info("Shutting down minicluster");
@ -202,7 +247,7 @@ public class HBaseTestingUtility {
// Wait till hbase is down before going on to shutdown zk.
this.hbaseCluster.join();
}
if (this.zkCluster != null) this.zkCluster.shutdown();
shutdownMiniZKCluster();
if (this.dfsCluster != null) {
// The below throws an exception per dn, AsynchronousCloseException.
this.dfsCluster.shutdown();
@ -369,9 +414,24 @@ public class HBaseTestingUtility {
*
* @param table The table to use for the data.
* @param columnFamily The family to insert the data into.
* @return count of regions created.
* @throws IOException When creating the regions fails.
*/
public void createMultiRegions(HTable table, byte[] columnFamily)
public int createMultiRegions(HTable table, byte[] columnFamily)
throws IOException {
return createMultiRegions(getConfiguration(), table, columnFamily);
}
/**
* Creates many regions names "aaa" to "zzz".
* @param c Configuration to use.
* @param table The table to use for the data.
* @param columnFamily The family to insert the data into.
* @return count of regions created.
* @throws IOException When creating the regions fails.
*/
public int createMultiRegions(final Configuration c, final HTable table,
final byte[] columnFamily)
throws IOException {
byte[][] KEYS = {
HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
@ -385,7 +445,6 @@ public class HBaseTestingUtility {
Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
};
Configuration c = getConfiguration();
HTable meta = new HTable(c, HConstants.META_TABLE_NAME);
HTableDescriptor htd = table.getTableDescriptor();
if(!htd.hasFamily(columnFamily)) {
@ -398,6 +457,7 @@ public class HBaseTestingUtility {
// including the new start region from empty to "bbb". lg
List<byte[]> rows = getMetaTableRows();
// add custom ones
int count = 0;
for (int i = 0; i < KEYS.length; i++) {
int j = (i + 1) % KEYS.length;
HRegionInfo hri = new HRegionInfo(table.getTableDescriptor(),
@ -407,6 +467,7 @@ public class HBaseTestingUtility {
Writables.getBytes(hri));
meta.put(put);
LOG.info("createMultiRegions: inserted " + hri.toString());
count++;
}
// see comment above, remove "old" (or previous) single region
for (byte[] row : rows) {
@ -417,6 +478,7 @@ public class HBaseTestingUtility {
// flush cache of regions
HConnection conn = table.getConnection();
conn.clearRegionCache();
return count;
}
/**
@ -621,4 +683,4 @@ public class HBaseTestingUtility {
public MiniDFSCluster getDFSCluster() {
return dfsCluster;
}
}
}

View File

@ -21,7 +21,10 @@ package org.apache.hadoop.hbase;
import java.io.IOException;
import java.net.BindException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -31,6 +34,7 @@ import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
/**
* This class creates a single process HBase cluster. One thread is run for
@ -46,7 +50,7 @@ public class MiniHBaseCluster implements HConstants {
/**
* Start a MiniHBaseCluster.
* @param conf HBaseConfiguration to be used for cluster
* @param conf Configuration to be used for cluster
* @param numRegionServers initial number of region servers to start.
* @throws IOException
*/
@ -56,12 +60,62 @@ public class MiniHBaseCluster implements HConstants {
init(numRegionServers);
}
/**
* Override Master so can add inject behaviors testing.
*/
public static class MiniHBaseClusterMaster extends HMaster {
private final Map<HServerInfo, List<HMsg>> messages =
new ConcurrentHashMap<HServerInfo, List<HMsg>>();
public MiniHBaseClusterMaster(final Configuration conf)
throws IOException {
super(conf);
}
/**
* Add a message to send to a regionserver next time it checks in.
* @param hsi RegionServer's HServerInfo.
* @param msg Message to add.
*/
void addMessage(final HServerInfo hsi, HMsg msg) {
synchronized(this.messages) {
List<HMsg> hmsgs = this.messages.get(hsi);
if (hmsgs == null) {
hmsgs = new ArrayList<HMsg>();
this.messages.put(hsi, hmsgs);
}
hmsgs.add(msg);
}
}
@Override
protected HMsg[] adornRegionServerAnswer(final HServerInfo hsi,
final HMsg[] msgs) {
HMsg [] answerMsgs = msgs;
synchronized (this.messages) {
List<HMsg> hmsgs = this.messages.get(hsi);
if (hmsgs != null && !hmsgs.isEmpty()) {
int size = answerMsgs.length;
HMsg [] newAnswerMsgs = new HMsg[size + hmsgs.size()];
System.arraycopy(answerMsgs, 0, newAnswerMsgs, 0, answerMsgs.length);
for (int i = 0; i < hmsgs.size(); i++) {
newAnswerMsgs[answerMsgs.length + i] = hmsgs.get(i);
}
answerMsgs = newAnswerMsgs;
hmsgs.clear();
}
}
return super.adornRegionServerAnswer(hsi, answerMsgs);
}
}
private void init(final int nRegionNodes) throws IOException {
try {
// start up a LocalHBaseCluster
while (true) {
try {
hbaseCluster = new LocalHBaseCluster(conf, nRegionNodes);
hbaseCluster = new LocalHBaseCluster(conf, nRegionNodes,
MiniHBaseCluster.MiniHBaseClusterMaster.class);
hbaseCluster.startup();
} catch (BindException e) {
//this port is already in use. try to use another (for multiple testing)
@ -86,8 +140,7 @@ public class MiniHBaseCluster implements HConstants {
* @return Name of regionserver started.
*/
public String startRegionServer() throws IOException {
LocalHBaseCluster.RegionServerThread t =
this.hbaseCluster.addRegionServer();
JVMClusterUtil.RegionServerThread t = this.hbaseCluster.addRegionServer();
t.start();
t.waitForServerOnline();
return t.getName();
@ -109,18 +162,18 @@ public class MiniHBaseCluster implements HConstants {
}
/**
* Cause a region server to exit without cleaning up
*
* Cause a region server to exit doing basic clean up only on its way out.
* @param serverNumber Used as index into a list.
*/
public void abortRegionServer(int serverNumber) {
public String abortRegionServer(int serverNumber) {
HRegionServer server = getRegionServer(serverNumber);
try {
LOG.info("Aborting " + server.getHServerInfo().toString());
} catch (IOException e) {
e.printStackTrace();
}
/*TODO: Prove not needed in TRUNK
// // Don't run hdfs shutdown thread.
// server.setHDFSShutdownThreadOnExit(null);
*/
LOG.info("Aborting " + server.toString());
server.abort();
return server.toString();
}
/**
@ -129,7 +182,7 @@ public class MiniHBaseCluster implements HConstants {
* @param serverNumber Used as index into a list.
* @return the region server that was stopped
*/
public LocalHBaseCluster.RegionServerThread stopRegionServer(int serverNumber) {
public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) {
return stopRegionServer(serverNumber, true);
}
@ -143,9 +196,9 @@ public class MiniHBaseCluster implements HConstants {
* before end of the test.
* @return the region server that was stopped
*/
public LocalHBaseCluster.RegionServerThread stopRegionServer(int serverNumber,
public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber,
final boolean shutdownFS) {
LocalHBaseCluster.RegionServerThread server =
JVMClusterUtil.RegionServerThread server =
hbaseCluster.getRegionServers().get(serverNumber);
LOG.info("Stopping " + server.toString());
if (!shutdownFS) {
@ -157,8 +210,8 @@ public class MiniHBaseCluster implements HConstants {
}
/**
* Wait for the specified region server to stop
* Removes this thread from list of running threads.
* Wait for the specified region server to stop. Removes this thread from list
* of running threads.
* @param serverNumber
* @return Name of region server that just went down.
*/
@ -189,7 +242,7 @@ public class MiniHBaseCluster implements HConstants {
* @throws IOException
*/
public void flushcache() throws IOException {
for (LocalHBaseCluster.RegionServerThread t:
for (JVMClusterUtil.RegionServerThread t:
this.hbaseCluster.getRegionServers()) {
for(HRegion r: t.getRegionServer().getOnlineRegions()) {
r.flushcache();
@ -200,7 +253,7 @@ public class MiniHBaseCluster implements HConstants {
/**
* @return List of region server threads.
*/
public List<LocalHBaseCluster.RegionServerThread> getRegionThreads() {
public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() {
return this.hbaseCluster.getRegionServers();
}
@ -212,4 +265,38 @@ public class MiniHBaseCluster implements HConstants {
public HRegionServer getRegionServer(int serverNumber) {
return hbaseCluster.getRegionServer(serverNumber);
}
}
/**
* @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()}
* of HRS carrying .META. Returns -1 if none found.
*/
public int getServerWithMeta() {
int index = -1;
int count = 0;
for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) {
HRegionServer hrs = rst.getRegionServer();
HRegion metaRegion =
hrs.getOnlineRegion(HRegionInfo.FIRST_META_REGIONINFO.getRegionName());
if (metaRegion != null) {
index = count;
break;
}
count++;
}
return index;
}
/**
* Add a message to include in the responses send a regionserver when it
* checks back in.
* @param serverNumber Which server to send it to.
* @param msg The MESSAGE
* @throws IOException
*/
public void addMessageToSendRegionServer(final int serverNumber,
final HMsg msg)
throws IOException {
HRegionServer hrs = getRegionServer(serverNumber);
((MiniHBaseClusterMaster)getMaster()).addMessage(hrs.getHServerInfo(), msg);
}
}

View File

@ -51,7 +51,7 @@ public class TestInfoServers extends HBaseClusterTestCase {
int port = cluster.getMaster().getInfoServer().getPort();
assertHasExpectedContent(new URL("http://localhost:" + port +
"/index.html"), "master");
port = cluster.getRegionThreads().get(0).getRegionServer().
port = cluster.getRegionServerThreads().get(0).getRegionServer().
getInfoServer().getPort();
assertHasExpectedContent(new URL("http://localhost:" + port +
"/index.html"), "regionserver");

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
/**
* Test whether region rebalancing works. (HBASE-71)
@ -194,7 +195,7 @@ public class TestRegionRebalancing extends HBaseClusterTestCase {
private List<HRegionServer> getOnlineRegionServers() {
List<HRegionServer> list = new ArrayList<HRegionServer>();
for (LocalHBaseCluster.RegionServerThread rst : cluster.getRegionThreads()) {
for (JVMClusterUtil.RegionServerThread rst : cluster.getRegionServerThreads()) {
if (rst.getRegionServer().isOnline()) {
list.add(rst.getRegionServer());
}

View File

@ -0,0 +1,262 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HMsg;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Test transitions of state across the master.
*/
public class TestMasterTransistions {
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final String TABLENAME = "master_transitions";
private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a"),
Bytes.toBytes("b"), Bytes.toBytes("c")};
/**
* Start up a mini cluster and put a small table of many empty regions into it.
* @throws Exception
*/
@BeforeClass public static void beforeAllTests() throws Exception {
// Start a cluster of two regionservers.
TEST_UTIL.startMiniCluster(2);
// Create a table of three families. This will assign a region.
TEST_UTIL.createTable(Bytes.toBytes(TABLENAME), FAMILIES);
HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
int countOfRegions = TEST_UTIL.createMultiRegions(t, FAMILIES[0]);
waitUntilAllRegionsAssigned(countOfRegions);
}
@AfterClass public static void afterAllTests() throws IOException {
TEST_UTIL.shutdownMiniCluster();
}
/**
* Listener for regionserver events testing hbase-2428 (Infinite loop of
* region closes if META region is offline). In particular, listen
* for the close of the 'metaServer' and when it comes in, requeue it with a
* delay as though there were an issue processing the shutdown. As part of
* the requeuing, send over a close of a region on 'otherServer' so it comes
* into a master that has its meta region marked as offline.
*/
static class HBase2428Listener implements RegionServerOperationListener {
// Map of what we've delayed so we don't do do repeated delays.
private final Set<RegionServerOperation> postponed =
new CopyOnWriteArraySet<RegionServerOperation>();
private boolean done = false;;
private boolean metaShutdownReceived = false;
private final HServerAddress metaAddress;
private final MiniHBaseCluster cluster;
private final int otherServerIndex;
private final HRegionInfo hri;
private int closeCount = 0;
static final int SERVER_DURATION = 10 * 1000;
static final int CLOSE_DURATION = 1 * 1000;
HBase2428Listener(final MiniHBaseCluster c, final HServerAddress metaAddress,
final HRegionInfo closingHRI, final int otherServerIndex) {
this.cluster = c;
this.metaAddress = metaAddress;
this.hri = closingHRI;
this.otherServerIndex = otherServerIndex;
}
@Override
public boolean process(final RegionServerOperation op) throws IOException {
// If a regionserver shutdown and its of the meta server, then we want to
// delay the processing of the shutdown and send off a close of a region on
// the 'otherServer.
boolean result = true;
if (op instanceof ProcessServerShutdown) {
ProcessServerShutdown pss = (ProcessServerShutdown)op;
if (pss.getDeadServerAddress().equals(this.metaAddress)) {
// Don't postpone more than once.
if (!this.postponed.contains(pss)) {
// Close some region.
this.cluster.addMessageToSendRegionServer(this.otherServerIndex,
new HMsg(HMsg.Type.MSG_REGION_CLOSE, hri,
Bytes.toBytes("Forcing close in test")));
this.postponed.add(pss);
// Put off the processing of the regionserver shutdown processing.
pss.setDelay(SERVER_DURATION);
this.metaShutdownReceived = true;
// Return false. This will add this op to the delayed queue.
result = false;
}
}
} else {
// Have the close run frequently.
if (isWantedCloseOperation(op) != null) {
op.setDelay(CLOSE_DURATION);
// Count how many times it comes through here.
this.closeCount++;
}
}
return result;
}
public void processed(final RegionServerOperation op) {
if (isWantedCloseOperation(op) == null) return;
this.done = true;
}
/*
* @param op
* @return Null if not the wanted ProcessRegionClose, else <code>op</code>
* cast as a ProcessRegionClose.
*/
private ProcessRegionClose isWantedCloseOperation(final RegionServerOperation op) {
// Count every time we get a close operation.
if (op instanceof ProcessRegionClose) {
ProcessRegionClose c = (ProcessRegionClose)op;
if (c.regionInfo.equals(hri)) {
return c;
}
}
return null;
}
boolean isDone() {
return this.done;
}
boolean isMetaShutdownReceived() {
return metaShutdownReceived;
}
int getCloseCount() {
return this.closeCount;
}
}
/**
* In 2428, the meta region has just been set offline and then a close comes
* in.
* @see <a href="https://issues.apache.org/jira/browse/HBASE-2428">HBASE-2428</a>
*/
@Test public void testRegionCloseWhenNoMetaHBase2428() throws Exception {
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
final HMaster master = cluster.getMaster();
int metaIndex = cluster.getServerWithMeta();
// Figure the index of the server that is not server the .META.
int otherServerIndex = -1;
for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
if (i == metaIndex) continue;
otherServerIndex = i;
break;
}
final HRegionServer otherServer = cluster.getRegionServer(otherServerIndex);
final HRegionServer metaHRS = cluster.getRegionServer(metaIndex);
// Get a region out on the otherServer.
final HRegionInfo hri =
otherServer.getOnlineRegions().iterator().next().getRegionInfo();
// Add our ReionServerOperationsListener
HBase2428Listener listener = new HBase2428Listener(cluster,
metaHRS.getHServerInfo().getServerAddress(), hri, otherServerIndex);
master.getRegionServerOperationQueue().
registerRegionServerOperationListener(listener);
try {
// Now close the server carrying index.
cluster.abortRegionServer(metaIndex);
// First wait on receipt of meta server shutdown message.
while(!listener.metaShutdownReceived) Threads.sleep(100);
while(!listener.isDone()) Threads.sleep(10);
// We should not have retried the close more times than it took for the
// server shutdown message to exit the delay queue and get processed
// (Multiple by two to add in some slop in case of GC or something).
assertTrue(listener.getCloseCount() <
((HBase2428Listener.SERVER_DURATION/HBase2428Listener.CLOSE_DURATION) * 2));
assertClosedRegionIsBackOnline(hri);
} finally {
master.getRegionServerOperationQueue().
unregisterRegionServerOperationListener(listener);
}
}
private void assertClosedRegionIsBackOnline(final HRegionInfo hri)
throws IOException {
// When we get here, region should be successfully deployed. Assert so.
// 'aaa' is safe as first row if startkey is EMPTY_BYTE_ARRAY because we
// loaded with HBaseTestingUtility#createMultiRegions.
byte [] row = Bytes.equals(HConstants.EMPTY_BYTE_ARRAY, hri.getStartKey())?
new byte [] {'a', 'a', 'a'}: hri.getStartKey();
Put p = new Put(row);
p.add(FAMILIES[0], FAMILIES[0], FAMILIES[0]);
HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
t.put(p);
Get g = new Get(row);
assertTrue((t.get(g)).size() > 0);
}
/*
* Wait until all rows in .META. have a non-empty info:server. This means
* all regions have been deployed, master has been informed and updated
* .META. with the regions deployed server.
* @param countOfRegions How many regions in .META.
* @throws IOException
*/
private static void waitUntilAllRegionsAssigned(final int countOfRegions)
throws IOException {
HTable meta = new HTable(TEST_UTIL.getConfiguration(),
HConstants.META_TABLE_NAME);
while (true) {
int rows = 0;
Scan scan = new Scan();
scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
ResultScanner s = meta.getScanner(scan);
for (Result r = null; (r = s.next()) != null;) {
byte [] b =
r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
if (b == null || b.length <= 0) break;
rows++;
}
s.close();
// If I got to hear and all rows have a Server, then all have been assigned.
if (rows == countOfRegions) break;
}
}
}

View File

@ -0,0 +1,47 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Before;
/**
* Test the queue used to manage RegionServerOperations.
* Currently RegionServerOperationQueue is untestable because each
* RegionServerOperation has a {@link HMaster} reference. TOOD: Fix.
*/
public class TestRegionServerOperationQueue {
private RegionServerOperationQueue queue;
private Configuration conf;
private AtomicBoolean closed;
@Before
public void setUp() throws Exception {
this.closed = new AtomicBoolean(false);
this.conf = new Configuration();
this.queue = new RegionServerOperationQueue(this.conf, this.closed);
}
@After
public void tearDown() throws Exception {
}
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
/**
* Tests region server failover when a region server exits both cleanly and
@ -123,8 +124,8 @@ public class DisabledTestRegionServerExit extends HBaseClusterTestCase {
* is just shut down.
*/
private void stopOrAbortMetaRegionServer(boolean abort) {
List<LocalHBaseCluster.RegionServerThread> regionThreads =
cluster.getRegionThreads();
List<JVMClusterUtil.RegionServerThread> regionThreads =
cluster.getRegionServerThreads();
int server = -1;
for (int i = 0; i < regionThreads.size() && server == -1; i++) {

View File

@ -99,7 +99,7 @@ public class TestLogRolling extends HBaseClusterTestCase {
private void startAndWriteData() throws Exception {
// When the META table can be opened, the region servers are running
new HTable(conf, HConstants.META_TABLE_NAME);
this.server = cluster.getRegionThreads().get(0).getRegionServer();
this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
this.log = server.getLog();
// Create the test table and open it