From 5d8b73585f56100c4a7e6082575bd9f2ac67d5fb Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Wed, 15 Sep 2010 18:10:43 +0000 Subject: [PATCH] HBASE-2979 Fix failing TestMultParrallel in hudson build git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@997437 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + bin/rolling-restart.sh | 86 ------ .../hadoop/hbase/LocalHBaseCluster.java | 5 +- .../hadoop/hbase/catalog/CatalogTracker.java | 78 +++--- .../hadoop/hbase/catalog/MetaEditor.java | 12 +- .../hadoop/hbase/client/HBaseAdmin.java | 4 +- .../hbase/client/HConnectionManager.java | 2 +- .../hadoop/hbase/client/ScannerCallable.java | 10 +- .../hadoop/hbase/ipc/HRegionInterface.java | 9 +- .../hbase/master/AssignmentManager.java | 32 ++- .../apache/hadoop/hbase/master/HMaster.java | 10 +- .../hadoop/hbase/master/LoadBalancer.java | 12 +- .../hadoop/hbase/master/ServerManager.java | 4 +- .../hbase/regionserver/HRegionServer.java | 29 +- .../handler/OpenRegionHandler.java | 2 +- .../hbase/zookeeper/MetaNodeTracker.java | 36 ++- .../hbase/zookeeper/RootRegionTracker.java | 3 +- .../hbase/zookeeper/ZooKeeperNodeTracker.java | 4 + .../hbase/zookeeper/ZooKeeperWatcher.java | 3 - .../hadoop/hbase/HBaseTestingUtility.java | 29 +- .../apache/hadoop/hbase/MiniHBaseCluster.java | 2 + .../hbase/catalog/TestCatalogTracker.java | 247 ++++++++++++++++++ .../hbase/{ => client}/TestMultiParallel.java | 219 +++++++++------- 23 files changed, 533 insertions(+), 306 deletions(-) create mode 100644 src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java rename src/test/java/org/apache/hadoop/hbase/{ => client}/TestMultiParallel.java (62%) diff --git a/CHANGES.txt b/CHANGES.txt index c7edbf0bb78..263844ac935 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -521,6 +521,7 @@ Release 0.21.0 - Unreleased HBASE-2983 TestHLog unit test is mis-comparing an assertion (Alex Newman via Todd Lipcon) HBASE-2986 multi writable can npe causing client hang + HBASE-2979 Fix failing TestMultParrallel in hudson build IMPROVEMENTS HBASE-1760 Cleanup TODOs in HTable diff --git a/bin/rolling-restart.sh b/bin/rolling-restart.sh index 81a837d7480..123ab1ec3d6 100644 --- a/bin/rolling-restart.sh +++ b/bin/rolling-restart.sh @@ -82,89 +82,3 @@ else --hosts "${HBASE_REGIONSERVERS}" restart regionserver fi - -#!/usr/bin/env bash -# -#/** -# * Copyright 2010 The Apache Software Foundation -# * -# * Licensed to the Apache Software Foundation (ASF) under one -# * or more contributor license agreements. See the NOTICE file -# * distributed with this work for additional information -# * regarding copyright ownership. The ASF licenses this file -# * to you under the Apache License, Version 2.0 (the -# * "License"); you may not use this file except in compliance -# * with the License. You may obtain a copy of the License at -# * -# * http://www.apache.org/licenses/LICENSE-2.0 -# * -# * Unless required by applicable law or agreed to in writing, software -# * distributed under the License is distributed on an "AS IS" BASIS, -# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# * See the License for the specific language governing permissions and -# * limitations under the License. -# */ -# -# Run a shell command on all regionserver hosts. -# -# Environment Variables -# -# HBASE_REGIONSERVERS File naming remote hosts. -# Default is ${HADOOP_CONF_DIR}/regionservers -# HADOOP_CONF_DIR Alternate conf dir. Default is ${HADOOP_HOME}/conf. -# HBASE_CONF_DIR Alternate hbase conf dir. Default is ${HBASE_HOME}/conf. -# HADOOP_SLAVE_SLEEP Seconds to sleep between spawning remote commands. -# HADOOP_SLAVE_TIMEOUT Seconds to wait for timing out a remote command. -# HADOOP_SSH_OPTS Options passed to ssh when running remote commands. -# -# Modelled after $HADOOP_HOME/bin/slaves.sh. - -usage="Usage: $0 [--config ] commands..." - -bin=`dirname "$0"` -bin=`cd "$bin">/dev/null; pwd` - -. "$bin"/hbase-config.sh - -# start hbase daemons -errCode=$? -if [ $errCode -ne 0 ] -then - exit $errCode -fi - -# quick function to get a value from the HBase config file -distMode=`$bin/hbase org.apache.hadoop.hbase.util.HBaseConfTool hbase.cluster.distributed` -if [ "$distMode" == 'false' ]; then - "$bin"/hbase-daemon.sh restart master -else - # stop all masters before re-start to avoid races for master znode - "$bin"/hbase-daemon.sh --config "${HBASE_CONF_DIR}" stop master - "$bin"/hbase-daemons.sh --config "${HBASE_CONF_DIR}" \ - --hosts "${HBASE_BACKUP_MASTERS}" stop master-backup - - # make sure the master znode has been deleted before continuing - zparent=`$bin/hbase org.apache.hadoop.hbase.util.HBaseConfTool zookeeper.znode.parent` - if [ "$zparent" == "null" ]; then zparent="/hbase"; fi - zmaster=`$bin/hbase org.apache.hadoop.hbase.util.HBaseConfTool zookeeper.znode.master` - if [ "$zmaster" == "null" ]; then zmaster="master"; fi - zmaster=$zparent/$zmaster - echo -n "Waiting for Master ZNode to expire" - while bin/hbase zkcli stat $zmaster >/dev/null 2>&1; do - echo -n "." - sleep 1 - done - echo #force a newline - - # all masters are down, now restart - "$bin"/hbase-daemon.sh --config "${HBASE_CONF_DIR}" start master - "$bin"/hbase-daemons.sh --config "${HBASE_CONF_DIR}" \ - --hosts "${HBASE_BACKUP_MASTERS}" start master-backup - - # unlike the masters, roll all regionservers one-at-a-time - export HBASE_SLAVE_PARALLEL=false - "$bin"/hbase-daemons.sh --config "${HBASE_CONF_DIR}" \ - --hosts "${HBASE_REGIONSERVERS}" restart regionserver - -fi - diff --git a/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java b/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java index bc0a62f78c2..ea0a64d0e74 100644 --- a/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java +++ b/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java @@ -30,6 +30,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; + import java.util.concurrent.CopyOnWriteArrayList; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.util.JVMClusterUtil; @@ -166,7 +168,8 @@ public class LocalHBaseCluster { public List getLiveRegionServers() { List liveServers = new ArrayList(); - for (JVMClusterUtil.RegionServerThread rst: getRegionServers()) { + List list = getRegionServers(); + for (JVMClusterUtil.RegionServerThread rst: list) { if (rst.isAlive()) liveServers.add(rst); } return liveServers; diff --git a/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java b/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java index e8233a912f3..2bcd5d0b151 100644 --- a/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.catalog; import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; import java.net.ConnectException; import java.util.concurrent.atomic.AtomicBoolean; @@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.RetriesExhaustedException; import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.MetaNodeTracker; import org.apache.hadoop.hbase.zookeeper.RootRegionTracker; @@ -43,38 +45,34 @@ import org.apache.zookeeper.KeeperException; /** * Tracks the availability of the catalog tables -ROOT- and * .META.. - *

+ * * This class is "read-only" in that the locations of the catalog tables cannot * be explicitly set. Instead, ZooKeeper is used to learn of the availability - * and location of ROOT. ROOT is used to learn of the location of META. If not - * available in ROOT, ZooKeeper is used to monitor for a new location of META. + * and location of -ROOT-. -ROOT- is used to learn of + * the location of .META. If not available in -ROOT-, + * ZooKeeper is used to monitor for a new location of .META.. + * *

Call {@link #start()} to start up operation. */ public class CatalogTracker { private static final Log LOG = LogFactory.getLog(CatalogTracker.class); - private final HConnection connection; - private final ZooKeeperWatcher zookeeper; - private final RootRegionTracker rootRegionTracker; - private final MetaNodeTracker metaNodeTracker; - private final AtomicBoolean metaAvailable = new AtomicBoolean(false); private HServerAddress metaLocation; - private final int defaultTimeout; public static final byte [] ROOT_REGION = HRegionInfo.ROOT_REGIONINFO.getRegionName(); - public static final byte [] META_REGION = HRegionInfo.FIRST_META_REGIONINFO.getRegionName(); /** * Constructs the catalog tracker. Find current state of catalog tables and - * begin active tracking by executing {@link #start()}. + * begin active tracking by executing {@link #start()} post construction. + * Does not timeout. * @param zk * @param connection server connection * @param abortable if fatal exception @@ -88,11 +86,12 @@ public class CatalogTracker { /** * Constructs the catalog tracker. Find current state of catalog tables and - * begin active tracking by executing {@link #start()}. + * begin active tracking by executing {@link #start()} post construction. * @param zk * @param connection server connection * @param abortable if fatal exception - * @param defaultTimeout Timeout to use. + * @param defaultTimeout Timeout to use. Pass zero for no timeout + * ({@link Object#wait(long)} when passed a 0 waits for ever). * @throws IOException */ public CatalogTracker(final ZooKeeperWatcher zk, final HConnection connection, @@ -101,26 +100,22 @@ public class CatalogTracker { this.zookeeper = zk; this.connection = connection; this.rootRegionTracker = new RootRegionTracker(zookeeper, abortable); - this.metaNodeTracker = new MetaNodeTracker(zookeeper, this); + this.metaNodeTracker = new MetaNodeTracker(zookeeper, this, abortable); this.defaultTimeout = defaultTimeout; } /** * Starts the catalog tracker. - *

* Determines current availability of catalog tables and ensures all further - * transitions of either region is tracked. + * transitions of either region are tracked. * @throws IOException * @throws InterruptedException */ public void start() throws IOException, InterruptedException { - // Register listeners with zk - zookeeper.registerListener(rootRegionTracker); - zookeeper.registerListener(metaNodeTracker); - // Start root tracking - rootRegionTracker.start(); + this.rootRegionTracker.start(); + this.metaNodeTracker.start(); // Determine meta assignment; may not work because root and meta not yet - // deployed. + // deployed. Calling the below will set {@link #metaLocation}. getMetaServerConnection(true); } @@ -148,7 +143,7 @@ public class CatalogTracker { */ public void waitForRoot() throws InterruptedException { - rootRegionTracker.getRootRegionLocation(); + this.rootRegionTracker.blockUntilAvailable(); } /** @@ -161,7 +156,7 @@ public class CatalogTracker { * @throws NotAllMetaRegionsOnlineException if root not available before * timeout */ - public HServerAddress waitForRoot(final long timeout) + HServerAddress waitForRoot(final long timeout) throws InterruptedException, NotAllMetaRegionsOnlineException { HServerAddress address = rootRegionTracker.waitRootRegionLocation(timeout); if (address == null) { @@ -235,27 +230,27 @@ public class CatalogTracker { */ private HRegionInterface getMetaServerConnection(boolean refresh) throws IOException, InterruptedException { - synchronized(metaAvailable) { - if(metaAvailable.get()) { + synchronized (metaAvailable) { + if (metaAvailable.get()) { HRegionInterface current = getCachedConnection(metaLocation); - if(!refresh) { + if (!refresh) { return current; } - if(verifyRegionLocation(current, META_REGION)) { + if (verifyRegionLocation(current, META_REGION)) { return current; } resetMetaLocation(); } HRegionInterface rootConnection = getRootServerConnection(); - if(rootConnection == null) { + if (rootConnection == null) { return null; } HServerAddress newLocation = MetaReader.readMetaLocation(rootConnection); - if(newLocation == null) { + if (newLocation == null) { return null; } HRegionInterface newConnection = getCachedConnection(newLocation); - if(verifyRegionLocation(newConnection, META_REGION)) { + if (verifyRegionLocation(newConnection, META_REGION)) { setMetaLocation(newLocation); return newConnection; } @@ -270,7 +265,7 @@ public class CatalogTracker { */ public void waitForMeta() throws InterruptedException { synchronized(metaAvailable) { - while(!metaAvailable.get()) { + while (!metaAvailable.get()) { metaAvailable.wait(); } } @@ -339,8 +334,8 @@ public class CatalogTracker { private void resetMetaLocation() { LOG.info("Current cached META location is not valid, resetting"); - metaAvailable.set(false); - metaLocation = null; + this.metaAvailable.set(false); + this.metaLocation = null; } private void setMetaLocation(HServerAddress metaLocation) { @@ -368,11 +363,20 @@ public class CatalogTracker { private boolean verifyRegionLocation(HRegionInterface metaServer, byte [] regionName) { + Throwable t = null; try { return metaServer.getRegionInfo(regionName) != null; } catch (NotServingRegionException e) { - return false; + t = e; + } catch (UndeclaredThrowableException e) { + // We can get a ConnectException wrapped by a UTE if client fails connect + // If not a ConnectException, rethrow. + if (!(e.getCause() instanceof ConnectException)) throw e; + t = e.getCause(); } + LOG.info("Failed verification of " + Bytes.toString(regionName) + + ": " + t.getMessage()); + return false; } /** @@ -407,4 +411,8 @@ public class CatalogTracker { } return result; } + + MetaNodeTracker getMetaNodeTracker() { + return this.metaNodeTracker; + } } \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java b/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java index 263db2d5cde..1e823f617d9 100644 --- a/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java +++ b/src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.catalog; import java.io.IOException; +import java.net.ConnectException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -115,11 +116,15 @@ public class MetaEditor { * @param regionInfo region to update location of * @param serverInfo server the region is located on * @throws IOException + * @throws ConnectException Usually because the regionserver carrying .META. + * is down. + * @throws NullPointerException Because no -ROOT- server connection */ public static void updateMetaLocation(CatalogTracker catalogTracker, HRegionInfo regionInfo, HServerInfo serverInfo) - throws IOException { + throws IOException, ConnectException { HRegionInterface server = catalogTracker.waitForRootServerConnectionDefault(); + if (server == null) throw new NullPointerException("No server for -ROOT-"); updateLocation(server, CatalogTracker.ROOT_REGION, regionInfo, serverInfo); } @@ -152,9 +157,10 @@ public class MetaEditor { * @param catalogRegionName name of catalog region being updated * @param regionInfo region to update location of * @param serverInfo server the region is located on - * @throws IOException + * @throws IOException In particular could throw {@link java.net.ConnectException} + * if the server is down on other end. */ - public static void updateLocation(HRegionInterface server, + private static void updateLocation(HRegionInterface server, byte [] catalogRegionName, HRegionInfo regionInfo, HServerInfo serverInfo) throws IOException { Put put = new Put(regionInfo.getRegionName()); diff --git a/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 79760c950e9..5ace618323c 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -273,7 +273,9 @@ public class HBaseAdmin implements Abortable { byte [] lastKey = null; for(byte [] splitKey : splitKeys) { if(lastKey != null && Bytes.equals(splitKey, lastKey)) { - throw new IllegalArgumentException("All split keys must be unique, found duplicate"); + throw new IllegalArgumentException("All split keys must be unique, " + + "found duplicate: " + Bytes.toStringBinary(splitKey) + + ", " + Bytes.toStringBinary(lastKey)); } lastKey = splitKey; } diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index 7efc7fc037d..be145460e03 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -1198,7 +1198,7 @@ public class HConnectionManager { ExecutorService pool) throws IOException { Result[] results = new Result[list.size()]; processBatch((List) list, tableName, pool, results); - + // mutate list so that it is empty for complete success, or contains only failed records // results are returned in the same order as the requests in list // walk the list backwards, so we can remove from list without impacting the indexes of earlier members diff --git a/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 2fff71eb234..5ea38b444bd 100644 --- a/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -20,14 +20,15 @@ package org.apache.hadoop.hbase.client; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.ipc.RemoteException; -import org.mortbay.log.Log; - -import java.io.IOException; /** @@ -35,6 +36,7 @@ import java.io.IOException; * Used by {@link ResultScanner}s made by {@link HTable}. */ public class ScannerCallable extends ServerCallable { + private static final Log LOG = LogFactory.getLog(ScannerCallable.class); private long scannerId = -1L; private boolean instantiated = false; private boolean closed = false; @@ -103,7 +105,7 @@ public class ScannerCallable extends ServerCallable { try { this.server.close(this.scannerId); } catch (IOException e) { - Log.warn("Ignore, probably already closed", e); + LOG.warn("Ignore, probably already closed", e); } this.scannerId = -1L; } diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java b/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java index c3e24d92702..a4810a6e878 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java @@ -31,9 +31,9 @@ import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.MultiAction; -import org.apache.hadoop.hbase.client.MultiResponse; import org.apache.hadoop.hbase.client.MultiPut; import org.apache.hadoop.hbase.client.MultiPutResponse; +import org.apache.hadoop.hbase.client.MultiResponse; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; @@ -56,7 +56,6 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion, Stoppable, Ab public HRegionInfo getRegionInfo(final byte [] regionName) throws NotServingRegionException; - /** * Return all the data for the row that matches row exactly, * or the one that immediately preceeds it. @@ -288,8 +287,8 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion, Stoppable, Ab /** * Bulk load an HFile into an open region */ - public void bulkLoadHFile(String hfilePath, - byte[] regionName, byte[] familyName) throws IOException; + public void bulkLoadHFile(String hfilePath, byte[] regionName, byte[] familyName) + throws IOException; // Master methods @@ -355,4 +354,4 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion, Stoppable, Ab * @throws IOException */ public void replicateLogEntries(HLog.Entry[] entries) throws IOException; -} +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 701e128b8e1..690f78cf9c3 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -88,7 +88,7 @@ public class AssignmentManager extends ZooKeeperListener { private final Map regionsInTransition = new TreeMap(); - /** Plans for region movement. */ + /** Plans for region movement. Key is the encoded version of a region name*/ // TODO: When do plans get cleaned out? Ever? // Its cleaned on server shutdown processing -- St.Ack private final Map regionPlans = @@ -240,9 +240,10 @@ public class AssignmentManager extends ZooKeeperListener { private void handleRegion(RegionTransitionData data) { synchronized(regionsInTransition) { // Verify this is a known server - if(!serverManager.isServerOnline(data.getServerName())) { - LOG.warn("Attempted to handle region transition for server " + - data.getServerName() + " but server is not online"); + if (!serverManager.isServerOnline(data.getServerName()) && + !this.master.getServerName().equals(data.getServerName())) { + LOG.warn("Attempted to handle region transition for server but " + + "server is not online: " + data); return; } String encodedName = HRegionInfo.encodeRegionName(data.getRegionName()); @@ -251,6 +252,9 @@ public class AssignmentManager extends ZooKeeperListener { ", server=" + data.getServerName() + ", region=" + prettyPrintedRegionName); RegionState regionState = regionsInTransition.get(encodedName); switch(data.getEventType()) { + case M2ZK_REGION_OFFLINE: + LOG.warn("What to do with this event? " + data); + break; case RS2ZK_REGION_CLOSING: // Should see CLOSING after we have asked it to CLOSE or additional @@ -494,7 +498,7 @@ public class AssignmentManager extends ZooKeeperListener { // go of this lock. There must be a better construct that this -- St.Ack 20100811 this.assignLock.lock(); try { - synchronized(regionsInTransition) { + synchronized (regionsInTransition) { state = regionsInTransition.get(encodedName); if(state == null) { state = new RegionState(region, RegionState.State.OFFLINE); @@ -546,6 +550,8 @@ public class AssignmentManager extends ZooKeeperListener { plan = new RegionPlan(state.getRegion(), null, LoadBalancer.randomAssignment(serverManager.getOnlineServersList())); regionPlans.put(encodedName, plan); + } else { + LOG.debug("Using preexisting plan=" + plan); } } try { @@ -934,14 +940,26 @@ public class AssignmentManager extends ZooKeeperListener { // Clean out any exisiting assignment plans for this server synchronized (this.regionPlans) { for (Iterator > i = - this.regionPlans.entrySet().iterator(); i.hasNext();) { + this.regionPlans.entrySet().iterator(); i.hasNext();) { Map.Entry e = i.next(); if (e.getValue().getDestination().equals(hsi)) { - // Use iterator's remove else we'll get CME.fail a + // Use iterator's remove else we'll get CME i.remove(); } } } + // Remove assignment info related to the downed server. Remove the downed + // server from list of servers else it looks like a server w/ no load. + synchronized (this.regions) { + Set hris = new HashSet(); + for (Map.Entry e: this.regions.entrySet()) { + // Add to a Set -- don't call setOffline in here else we get a CME. + if (e.getValue().equals(hsi)) hris.add(e.getKey()); + } + for (HRegionInfo hri: hris) setOffline(hri); + this.servers.remove(hsi); + } + // If anything in transition related to the server, clean it up. synchronized (regionsInTransition) { // Iterate all regions in transition checking if were on this server final String serverName = hsi.getServerName(); diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index bc21a1e39c8..f91b650cdf8 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -33,7 +33,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.ClusterStatus; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HMsg; @@ -55,8 +54,8 @@ import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.MetaScanner; -import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; +import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.apache.hadoop.hbase.ipc.HBaseRPC; @@ -87,7 +86,6 @@ import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.DNS; -import org.apache.hadoop.util.ToolRunner; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; @@ -258,7 +256,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { this.serverManager = new ServerManager(this, this); this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection, - this, conf.getInt("hbase.master.catalog.timeout", -1)); + this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE)); this.catalogTracker.start(); this.assignmentManager = new AssignmentManager(this, serverManager, @@ -563,6 +561,9 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { this.assignmentManager.getRegionsInTransition()); return false; } + if (!this.serverManager.getDeadServers().isEmpty()) { + LOG.debug("Not running balancer because dead regionserver processing"); + } Map> assignments = this.assignmentManager.getAssignments(); // Returned Map from AM does not include mention of servers w/o assignments. @@ -576,6 +577,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { List plans = this.balancer.balanceCluster(assignments); if (plans != null && !plans.isEmpty()) { for (RegionPlan plan: plans) { + LOG.info("balance=" + plan); this.assignmentManager.balance(plan); } } diff --git a/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java b/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java index 41ea508811d..935dd915b52 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java +++ b/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java @@ -141,8 +141,8 @@ public class LoadBalancer { } int numRegions = 0; // Iterate so we can count regions as we build the map - for(Map.Entry> server : - clusterState.entrySet()) { + for(Map.Entry> server: + clusterState.entrySet()) { server.getKey().getLoad().setNumberOfRegions(server.getValue().size()); numRegions += server.getKey().getLoad().getNumberOfRegions(); serversByLoad.put(server.getKey(), server.getValue()); @@ -527,8 +527,6 @@ public class LoadBalancer { private final HServerInfo source; private HServerInfo dest; - - /** * Instantiate a plan for a region move, moving the specified region from * the specified source server to the specified destination server. @@ -589,5 +587,11 @@ public class LoadBalancer { public int compareTo(RegionPlan o) { return getRegionName().compareTo(o.getRegionName()); } + + @Override + public String toString() { + return "hri=" + this.hri.getRegionNameAsString() + ", src=" + + this.source.getServerName() + ", dest=" + this.dest.getServerName(); + } } } \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 552bfcc0674..528bb9dac0d 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -499,7 +499,7 @@ public class ServerManager { */ public void sendRegionOpen(HServerInfo server, HRegionInfo region) { HRegionInterface hri = getServerConnection(server); - if(hri == null) { + if (hri == null) { LOG.warn("Attempting to send OPEN RPC to server " + server.getServerName() + " failed because no RPC connection found to this server"); return; @@ -533,7 +533,7 @@ public class ServerManager { HConnection connection = HConnectionManager.getConnection(this.master.getConfiguration()); HRegionInterface hri = serverConnections.get(info.getServerName()); - if(hri == null) { + if (hri == null) { LOG.info("new connection"); hri = connection.getHRegionConnection(info.getServerAddress(), false); serverConnections.put(info.getServerName(), hri); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index bba7b679a8a..0a5ee687b4a 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.lang.management.ManagementFactory; import java.lang.management.MemoryUsage; -import java.lang.management.RuntimeMXBean; import java.lang.reflect.Constructor; import java.net.BindException; import java.net.InetSocketAddress; @@ -64,7 +63,6 @@ import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.HServerLoad; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.LocalHBaseCluster; import org.apache.hadoop.hbase.MasterAddressTracker; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RemoteExceptionHandler; @@ -352,7 +350,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // create the catalog tracker and start it this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection, - this, this.conf.getInt("hbase.regionserver.catalog.timeout", -1)); + this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE)); catalogTracker.start(); this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this); @@ -1143,7 +1141,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, RootLocationEditor.setRootLocation(getZooKeeper(), getServerInfo().getServerAddress()); } else if (r.getRegionInfo().isMetaRegion()) { - // TODO: doh, this has weird naming between RootEditor/MetaEditor MetaEditor.updateMetaLocation(ct, r.getRegionInfo(), getServerInfo()); } else { if (daughter) { @@ -1361,8 +1358,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, } } + @Override public HRegionInfo getRegionInfo(final byte[] regionName) - throws NotServingRegionException { + throws NotServingRegionException { requestCount.incrementAndGet(); return getRegion(regionName).getRegionInfo(); } @@ -2228,20 +2226,19 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, } } } catch (IOException ioe) { - if (multi.size() == 1) { - throw ioe; - } else { - LOG.error("Exception found while attempting " + action.toString() - + " " + StringUtils.stringifyException(ioe)); - response.add(regionName,null); - // stop processing on this region, continue to the next. - } + if (multi.size() == 1) { + throw ioe; + } else { + LOG.error("Exception found while attempting " + action.toString() + + " " + StringUtils.stringifyException(ioe)); + response.add(regionName,null); + // stop processing on this region, continue to the next. } } - - return response; } - + return response; + } + /** * @deprecated Use HRegionServer.multi( MultiAction action) instead */ diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java b/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java index 7b48025c30f..3216d3ab980 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java @@ -155,7 +155,7 @@ public class OpenRegionHandler extends EventHandler { // Finally, Transition ZK node to OPENED try { - if(ZKAssign.transitionNodeOpened(server.getZooKeeper(), regionInfo, + if (ZKAssign.transitionNodeOpened(server.getZooKeeper(), regionInfo, server.getServerName(), openingVersion) == -1) { LOG.warn("Completed the OPEN of a region but when transitioning from " + " OPENING to OPENED got a version mismatch, someone else clashed " + diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaNodeTracker.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaNodeTracker.java index 87905383132..d7ec232c32c 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaNodeTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/MetaNodeTracker.java @@ -23,24 +23,23 @@ import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.catalog.CatalogTracker; /** * Tracks the unassigned zookeeper node used by the META table. - *

- * A callback is made into a {@link CatalogTracker} when META completes a new - * assignment. + * + * A callback is made into the passed {@link CatalogTracker} when + * .META. completes a new assignment. *

* If META is already assigned when instantiating this class, you will not * receive any notification for that assignment. You will receive a * notification after META has been successfully assigned to a new location. */ -public class MetaNodeTracker extends ZooKeeperListener { +public class MetaNodeTracker extends ZooKeeperNodeTracker { private static final Log LOG = LogFactory.getLog(MetaNodeTracker.class); - private final String node; - /** Catalog tracker to notify when META has a new assignment completed. */ private final CatalogTracker catalogTracker; @@ -49,25 +48,22 @@ public class MetaNodeTracker extends ZooKeeperListener { * @param watcher * @param abortable */ - public MetaNodeTracker(ZooKeeperWatcher watcher, - CatalogTracker catalogTracker) { - super(watcher); + public MetaNodeTracker(final ZooKeeperWatcher watcher, + final CatalogTracker catalogTracker, final Abortable abortable) { + super(watcher, ZKUtil.joinZNode(watcher.assignmentZNode, + HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()), abortable); this.catalogTracker = catalogTracker; - node = ZKUtil.joinZNode(watcher.assignmentZNode, - HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); } @Override public void nodeDeleted(String path) { - if(path.equals(node)) { - LOG.info("Detected completed assignment of META, notifying catalog " + - "tracker"); - try { - catalogTracker.waitForMetaServerConnectionDefault(); - } catch (IOException e) { - LOG.warn("Tried to reset META server location after seeing the " + - "completion of a new META assignment but got an IOE", e); - } + if (!path.equals(node)) return; + LOG.info("Detected completed assignment of META, notifying catalog tracker"); + try { + this.catalogTracker.waitForMetaServerConnectionDefault(); + } catch (IOException e) { + LOG.warn("Tried to reset META server location after seeing the " + + "completion of a new META assignment but got an IOE", e); } } } \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java index 19d7c31b851..7067e417494 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/RootRegionTracker.java @@ -28,7 +28,8 @@ import org.apache.hadoop.hbase.util.Bytes; /** * Tracks the root region server location node in zookeeper. * Root region location is set by {@link RootLocationEditor} usually called - * out of {@link RegionServerServices#postOpenDeployTasks(org.apache.hadoop.hbase.regionserver.HRegion, org.apache.hadoop.hbase.catalog.CatalogTracker)} + * out of {@link RegionServerServices#postOpenDeployTasks(org.apache.hadoop.hbase.regionserver.HRegion, org.apache.hadoop.hbase.catalog.CatalogTracker)}. + * This class has a watcher on the root location and notices changes. */ public class RootRegionTracker extends ZooKeeperNodeTracker { /** diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java index e5073695d52..6e74241cf03 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java @@ -132,6 +132,10 @@ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener { return data; } + public String getNode() { + return this.node; + } + @Override public synchronized void nodeCreated(String path) { if(path.equals(node)) { diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index 87b56ce5fe0..aaa88a0c41b 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -169,9 +169,6 @@ public class ZooKeeperWatcher implements Watcher { "state=" + event.getState() + ", " + "path=" + event.getPath()); - // While we are still using both ZKWs, need to call parent process() -// super.process(event); - switch(event.getType()) { // If event type is NONE, this is a connection status change diff --git a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 9cc11681fb6..e5b1a30e420 100644 --- a/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -609,6 +609,18 @@ public class HBaseTestingUtility { return createMultiRegions(getConfiguration(), table, columnFamily); } + public static final byte[][] KEYS = { + HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), + Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), + Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), + Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"), + Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), + Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), + Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), + Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"), + Bytes.toBytes("xxx"), Bytes.toBytes("yyy") + }; + /** * Creates many regions names "aaa" to "zzz". * @param c Configuration to use. @@ -620,17 +632,6 @@ public class HBaseTestingUtility { public int createMultiRegions(final Configuration c, final HTable table, final byte[] columnFamily) throws IOException { - byte[][] KEYS = { - HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"), - Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), - Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), - Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"), - Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), - Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), - Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), - Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"), - Bytes.toBytes("xxx"), Bytes.toBytes("yyy") - }; return createMultiRegions(c, table, columnFamily, KEYS); } @@ -963,16 +964,18 @@ public class HBaseTestingUtility { * Make sure that at least the specified number of region servers * are running * @param num minimum number of region servers that should be running + * @return True if we started some servers * @throws IOException */ - public void ensureSomeRegionServersAvailable(final int num) + public boolean ensureSomeRegionServersAvailable(final int num) throws IOException { if (this.getHBaseCluster().getLiveRegionServerThreads().size() < num) { // Need at least "num" servers. LOG.info("Started new server=" + this.getHBaseCluster().startRegionServer()); - + return true; } + return false; } /** diff --git a/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java b/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java index 9407c1e607b..63a6956b670 100644 --- a/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java +++ b/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java @@ -223,6 +223,8 @@ public class MiniHBaseCluster { try { LOG.info("Hook closing fs=" + this.fs); this.fs.close(); + } catch (NullPointerException npe) { + LOG.debug("Need to fix these: " + npe.toString()); } catch (IOException e) { LOG.warn("Running hook", e); } diff --git a/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java b/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java new file mode 100644 index 00000000000..ce3c50b50d3 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java @@ -0,0 +1,247 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.catalog; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HServerInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.util.Writables; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.util.Progressable; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Test {@link CatalogTracker} + */ +public class TestCatalogTracker { + private static final Log LOG = LogFactory.getLog(TestCatalogTracker.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final HServerAddress HSA = + new HServerAddress("example.org:1234"); + private ZooKeeperWatcher watcher; + private Abortable abortable; + + @BeforeClass public static void beforeClass() throws Exception { + UTIL.startMiniZKCluster(); + } + + @AfterClass public static void afterClass() throws IOException { + UTIL.getZkCluster().shutdown(); + } + + @Before public void before() throws IOException { + this.abortable = new Abortable() { + @Override + public void abort(String why, Throwable e) { + LOG.info(why, e); + } + }; + this.watcher = new ZooKeeperWatcher(UTIL.getConfiguration(), + this.getClass().getSimpleName(), this.abortable); + } + + @After public void after() { + this.watcher.close(); + } + + private CatalogTracker constructAndStartCatalogTracker() + throws IOException, InterruptedException { + return constructAndStartCatalogTracker(null); + } + + private CatalogTracker constructAndStartCatalogTracker(final HConnection c) + throws IOException, InterruptedException { + CatalogTracker ct = new CatalogTracker(this.watcher, c, this.abortable); + ct.start(); + return ct; + } + + @Test (expected = NotAllMetaRegionsOnlineException.class) + public void testTimeoutWaitForRoot() + throws IOException, InterruptedException { + final CatalogTracker ct = constructAndStartCatalogTracker(); + ct.waitForRoot(100); + } + + @Test (expected = NotAllMetaRegionsOnlineException.class) + public void testTimeoutWaitForMeta() + throws IOException, InterruptedException { + final CatalogTracker ct = constructAndStartCatalogTracker(); + ct.waitForMeta(100); + } + + /** + * Test waiting on root w/ no timeout specified. + * @throws IOException + * @throws InterruptedException + * @throws KeeperException + */ + @Test public void testNoTimeoutWaitForRoot() + throws IOException, InterruptedException, KeeperException { + final CatalogTracker ct = constructAndStartCatalogTracker(); + HServerAddress hsa = ct.getRootLocation(); + Assert.assertNull(hsa); + + // Now test waiting on root location getting set. + Thread t = new WaitOnMetaThread(ct); + startWaitAliveThenWaitItLives(t, 1000); + // Set a root location. + hsa = setRootLocation(); + // Join the thread... should exit shortly. + t.join(); + // Now root is available. + Assert.assertTrue(ct.getRootLocation().equals(hsa)); + } + + private HServerAddress setRootLocation() throws KeeperException { + RootLocationEditor.setRootLocation(this.watcher, HSA); + return HSA; + } + + /** + * Test waiting on meta w/ no timeout specified. + * @throws IOException + * @throws InterruptedException + * @throws KeeperException + */ + @Test public void testNoTimeoutWaitForMeta() + throws IOException, InterruptedException, KeeperException { + // Mock an HConnection and a HRegionInterface implementation. Have the + // HConnection return the HRI. Have the HRI return a few mocked up responses + // to make our test work. + HConnection connection = Mockito.mock(HConnection.class); + HRegionInterface mockHRI = Mockito.mock(HRegionInterface.class); + // Make the HRI return an answer no matter how Get is called. Same for + // getHRegionInfo. Thats enough for this test. + Mockito.when(connection.getHRegionConnection((HServerAddress)Mockito.any(), Mockito.anyBoolean())). + thenReturn(mockHRI); + + final CatalogTracker ct = constructAndStartCatalogTracker(connection); + HServerAddress hsa = ct.getMetaLocation(); + Assert.assertNull(hsa); + + // Now test waiting on meta location getting set. + Thread t = new WaitOnMetaThread(ct) { + @Override + void doWaiting() throws InterruptedException { + this.ct.waitForMeta(); + } + }; + startWaitAliveThenWaitItLives(t, 1000); + + // Now the ct is up... set into the mocks some answers that make it look + // like things have been getting assigned. Make it so we'll return a + // location (no matter what the Get is). Same for getHRegionInfo -- always + // just return the meta region. + List kvs = new ArrayList(); + kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY, + HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, + Bytes.toBytes(HSA.toString()))); + final Result result = new Result(kvs); + Mockito.when(mockHRI.get((byte [])Mockito.any(), (Get)Mockito.any())). + thenReturn(result); + Mockito.when(mockHRI.getRegionInfo((byte [])Mockito.any())). + thenReturn(HRegionInfo.FIRST_META_REGIONINFO); + // This should trigger wake up of meta wait (Its the removal of the meta + // region unassigned node that triggers catalogtrackers that a meta has + // been assigned. + String node = ct.getMetaNodeTracker().getNode(); + ZKUtil.createAndFailSilent(this.watcher, node); + MetaEditor.updateMetaLocation(ct, HRegionInfo.FIRST_META_REGIONINFO, + new HServerInfo(HSA, -1, "example.com")); + ZKUtil.deleteNode(this.watcher, node); + // Join the thread... should exit shortly. + t.join(); + // Now meta is available. + Assert.assertTrue(ct.getMetaLocation().equals(HSA)); + } + + private void startWaitAliveThenWaitItLives(final Thread t, final int ms) { + t.start(); + while(!t.isAlive()) { + // Wait + } + // Wait one second. + Threads.sleep(ms); + Assert.assertTrue("Assert " + t.getName() + " still waiting", t.isAlive()); + } + + class CountingProgressable implements Progressable { + final AtomicInteger counter = new AtomicInteger(0); + @Override + public void progress() { + this.counter.incrementAndGet(); + } + } + + /** + * Wait on META. + * Default is wait on -ROOT-. + */ + class WaitOnMetaThread extends Thread { + final CatalogTracker ct; + + WaitOnMetaThread(final CatalogTracker ct) { + super("WaitOnMeta"); + this.ct = ct; + } + + @Override + public void run() { + try { + doWaiting(); + } catch (InterruptedException e) { + throw new RuntimeException("Failed wait on root", e); + } + LOG.info("Exiting " + getName()); + } + + void doWaiting() throws InterruptedException { + this.ct.waitForRoot(); + } + } +} \ No newline at end of file diff --git a/src/test/java/org/apache/hadoop/hbase/TestMultiParallel.java b/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java similarity index 62% rename from src/test/java/org/apache/hadoop/hbase/TestMultiParallel.java rename to src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java index 681b9817e7e..5031adf47c2 100644 --- a/src/test/java/org/apache/hadoop/hbase/TestMultiParallel.java +++ b/src/test/java/org/apache/hadoop/hbase/client/TestMultiParallel.java @@ -17,54 +17,67 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase; +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.HBaseAdmin; -import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; -import java.io.IOException; -import java.util.List; -import java.util.ArrayList; - -public class TestMultiParallel extends MultiRegionTable { - // This test needs to be rewritten to use HBaseTestingUtility -- St.Ack 20100910 +public class TestMultiParallel { private static final Log LOG = LogFactory.getLog(TestMultiParallel.class); + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); private static final byte[] VALUE = Bytes.toBytes("value"); private static final byte[] QUALIFIER = Bytes.toBytes("qual"); private static final String FAMILY = "family"; private static final String TEST_TABLE = "multi_test_table"; private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY); private static final byte[] ONE_ROW = Bytes.toBytes("xxx"); + private static final byte [][] KEYS = makeKeys(); - List keys = new ArrayList(); - - public TestMultiParallel() { - super(2, FAMILY); - desc = new HTableDescriptor(TEST_TABLE); - desc.addFamily(new HColumnDescriptor(FAMILY)); - makeKeys(); + @BeforeClass public static void beforeClass() throws Exception { + UTIL.startMiniCluster(2); + HTable t = UTIL.createTable(Bytes.toBytes(TEST_TABLE), Bytes.toBytes(FAMILY)); + UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY)); } - private void makeKeys() { + @AfterClass public static void afterClass() throws IOException { + UTIL.getMiniHBaseCluster().shutdown(); + } + + @Before public void before() throws IOException { + LOG.info("before"); + if (UTIL.ensureSomeRegionServersAvailable(2)) { + // Distribute regions + UTIL.getMiniHBaseCluster().getMaster().balance(); + } + LOG.info("before done"); + } + + private static byte[][] makeKeys() { + byte [][] starterKeys = HBaseTestingUtility.KEYS; // Create a "non-uniform" test set with the following characteristics: // a) Unequal number of keys per region // Don't use integer as a multiple, so that we have a number of keys that is // not a multiple of the number of regions - int numKeys = (int) ((float) KEYS.length * 10.33F); + int numKeys = (int) ((float) starterKeys.length * 10.33F); + List keys = new ArrayList(); for (int i = 0; i < numKeys; i++) { - int kIdx = i % KEYS.length; - byte[] k = KEYS[kIdx]; + int kIdx = i % starterKeys.length; + byte[] k = starterKeys[kIdx]; byte[] cp = new byte[k.length + 1]; System.arraycopy(k, 0, cp, 0, k.length); cp[k.length] = new Integer(i % 256).byteValue(); @@ -76,18 +89,19 @@ public class TestMultiParallel extends MultiRegionTable { // c) keys are not in sorted order (within a region), to ensure that the // sorting code and index mapping doesn't break the functionality for (int i = 0; i < 100; i++) { - int kIdx = i % KEYS.length; - byte[] k = KEYS[kIdx]; + int kIdx = i % starterKeys.length; + byte[] k = starterKeys[kIdx]; byte[] cp = new byte[k.length + 1]; System.arraycopy(k, 0, cp, 0, k.length); cp[k.length] = new Integer(i % 256).byteValue(); keys.add(cp); } + return keys.toArray(new byte [][] {new byte [] {}}); } - public void testBatchWithGet() throws Exception { + @Test public void testBatchWithGet() throws Exception { LOG.info("test=testBatchWithGet"); - HTable table = new HTable(conf, TEST_TABLE); + HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); // load test data List puts = constructPutRequests(); @@ -95,7 +109,7 @@ public class TestMultiParallel extends MultiRegionTable { // create a list of gets and run it List gets = new ArrayList(); - for (byte[] k : keys) { + for (byte[] k : KEYS) { Get get = new Get(k); get.addColumn(BYTES_FAMILY, QUALIFIER); gets.add(get); @@ -110,14 +124,14 @@ public class TestMultiParallel extends MultiRegionTable { } // Compare results - assertEquals(singleRes.size(), multiRes.length); + Assert.assertEquals(singleRes.size(), multiRes.length); for (int i = 0; i < singleRes.size(); i++) { - assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER)); + Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER)); KeyValue[] singleKvs = singleRes.get(i).raw(); KeyValue[] multiKvs = multiRes[i].raw(); for (int j = 0; j < singleKvs.length; j++) { - assertEquals(singleKvs[j], multiKvs[j]); - assertEquals(0, Bytes.compareTo(singleKvs[j].getValue(), multiKvs[j] + Assert.assertEquals(singleKvs[j], multiKvs[j]); + Assert.assertEquals(0, Bytes.compareTo(singleKvs[j].getValue(), multiKvs[j] .getValue())); } } @@ -129,32 +143,32 @@ public class TestMultiParallel extends MultiRegionTable { * * @throws Exception */ - public void testFlushCommitsWithAbort() throws Exception { + @Test public void testFlushCommitsWithAbort() throws Exception { LOG.info("test=testFlushCommitsWithAbort"); doTestFlushCommits(true); } - public void testFlushCommitsNoAbort() throws Exception { + @Test public void testFlushCommitsNoAbort() throws Exception { + LOG.info("test=testFlushCommitsNoAbort"); doTestFlushCommits(false); } - public void doTestFlushCommits(boolean doAbort) throws Exception { - LOG.info("test=doTestFlushCommits"); + private void doTestFlushCommits(boolean doAbort) throws Exception { // Load the data - Configuration newconf = new Configuration(conf); - newconf.setInt("hbase.client.retries.number", 10); - HTable table = new HTable(newconf, TEST_TABLE); + LOG.info("get new table"); + HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); table.setAutoFlush(false); table.setWriteBufferSize(10 * 1024 * 1024); + LOG.info("constructPutRequests"); List puts = constructPutRequests(); for (Row put : puts) { table.put((Put) put); } + LOG.info("puts"); table.flushCommits(); - if (doAbort) { - cluster.abortRegionServer(0); + LOG.info("Aborted=" + UTIL.getMiniHBaseCluster().abortRegionServer(0)); // try putting more keys after the abort. same key/qual... just validating // no exceptions thrown @@ -166,100 +180,107 @@ public class TestMultiParallel extends MultiRegionTable { table.flushCommits(); } + LOG.info("validating loaded data"); validateLoadedData(table); // Validate server and region count - HBaseAdmin admin = new HBaseAdmin(conf); - ClusterStatus cs = admin.getClusterStatus(); - assertEquals((doAbort ? 1 : 2), cs.getServers()); - for (HServerInfo info : cs.getServerInfo()) { - System.out.println(info); - assertTrue(info.getLoad().getNumberOfRegions() > 10); + List liveRSs = + UTIL.getMiniHBaseCluster().getLiveRegionServerThreads(); + int count = 0; + for (JVMClusterUtil.RegionServerThread t: liveRSs) { + count++; + LOG.info("Count=" + count + ", Alive=" + t.getRegionServer()); } + LOG.info("Count=" + count); + Assert.assertEquals("Server count=" + count + ", abort=" + doAbort, + (doAbort ? 1 : 2), count); + for (JVMClusterUtil.RegionServerThread t: liveRSs) { + int regions = t.getRegionServer().getOnlineRegions().size(); + Assert.assertTrue("Count of regions=" + regions, regions > 10); + } + LOG.info("done"); } - public void testBatchWithPut() throws Exception { + @Test public void testBatchWithPut() throws Exception { LOG.info("test=testBatchWithPut"); - Configuration newconf = new Configuration(conf); - newconf.setInt("hbase.client.retries.number", 10); - HTable table = new HTable(newconf, TEST_TABLE); + HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); // put multiple rows using a batch List puts = constructPutRequests(); Result[] results = table.batch(puts); - validateSizeAndEmpty(results, keys.size()); + validateSizeAndEmpty(results, KEYS.length); if (true) { - cluster.abortRegionServer(0); + UTIL.getMiniHBaseCluster().abortRegionServer(0); puts = constructPutRequests(); results = table.batch(puts); - validateSizeAndEmpty(results, keys.size()); + validateSizeAndEmpty(results, KEYS.length); } validateLoadedData(table); } - public void testBatchWithDelete() throws Exception { + @Test public void testBatchWithDelete() throws Exception { LOG.info("test=testBatchWithDelete"); - HTable table = new HTable(conf, TEST_TABLE); + HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); // Load some data List puts = constructPutRequests(); Result[] results = table.batch(puts); - validateSizeAndEmpty(results, keys.size()); + validateSizeAndEmpty(results, KEYS.length); // Deletes List deletes = new ArrayList(); - for (int i = 0; i < keys.size(); i++) { - Delete delete = new Delete(keys.get(i)); + for (int i = 0; i < KEYS.length; i++) { + Delete delete = new Delete(KEYS[i]); delete.deleteFamily(BYTES_FAMILY); deletes.add(delete); } results = table.batch(deletes); - validateSizeAndEmpty(results, keys.size()); + validateSizeAndEmpty(results, KEYS.length); // Get to make sure ... - for (byte[] k : keys) { + for (byte[] k : KEYS) { Get get = new Get(k); get.addColumn(BYTES_FAMILY, QUALIFIER); - assertFalse(table.exists(get)); + Assert.assertFalse(table.exists(get)); } } - public void testHTableDeleteWithList() throws Exception { + @Test public void testHTableDeleteWithList() throws Exception { LOG.info("test=testHTableDeleteWithList"); - HTable table = new HTable(conf, TEST_TABLE); + HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); // Load some data List puts = constructPutRequests(); Result[] results = table.batch(puts); - validateSizeAndEmpty(results, keys.size()); + validateSizeAndEmpty(results, KEYS.length); // Deletes ArrayList deletes = new ArrayList(); - for (int i = 0; i < keys.size(); i++) { - Delete delete = new Delete(keys.get(i)); + for (int i = 0; i < KEYS.length; i++) { + Delete delete = new Delete(KEYS[i]); delete.deleteFamily(BYTES_FAMILY); deletes.add(delete); } table.delete(deletes); - assertTrue(deletes.isEmpty()); + Assert.assertTrue(deletes.isEmpty()); // Get to make sure ... - for (byte[] k : keys) { + for (byte[] k : KEYS) { Get get = new Get(k); get.addColumn(BYTES_FAMILY, QUALIFIER); - assertFalse(table.exists(get)); + Assert.assertFalse(table.exists(get)); } } - public void testBatchWithManyColsInOneRowGetAndPut() throws Exception { + @Test public void testBatchWithManyColsInOneRowGetAndPut() throws Exception { LOG.info("test=testBatchWithManyColsInOneRowGetAndPut"); - HTable table = new HTable(conf, TEST_TABLE); + HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); List puts = new ArrayList(); for (int i = 0; i < 100; i++) { @@ -293,13 +314,13 @@ public class TestMultiParallel extends MultiRegionTable { } - public void testBatchWithMixedActions() throws Exception { + @Test public void testBatchWithMixedActions() throws Exception { LOG.info("test=testBatchWithMixedActions"); - HTable table = new HTable(conf, TEST_TABLE); + HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE); // Load some data to start Result[] results = table.batch(constructPutRequests()); - validateSizeAndEmpty(results, keys.size()); + validateSizeAndEmpty(results, KEYS.length); // Batch: get, get, put(new col), delete, get, get of put, get of deleted, // put @@ -309,42 +330,42 @@ public class TestMultiParallel extends MultiRegionTable { byte[] val2 = Bytes.toBytes("putvalue2"); // 0 get - Get get = new Get(keys.get(10)); + Get get = new Get(KEYS[10]); get.addColumn(BYTES_FAMILY, QUALIFIER); actions.add(get); // 1 get - get = new Get(keys.get(11)); + get = new Get(KEYS[11]); get.addColumn(BYTES_FAMILY, QUALIFIER); actions.add(get); // 2 put of new column - Put put = new Put(keys.get(10)); + Put put = new Put(KEYS[10]); put.add(BYTES_FAMILY, qual2, val2); actions.add(put); // 3 delete - Delete delete = new Delete(keys.get(20)); + Delete delete = new Delete(KEYS[20]); delete.deleteFamily(BYTES_FAMILY); actions.add(delete); // 4 get - get = new Get(keys.get(30)); + get = new Get(KEYS[30]); get.addColumn(BYTES_FAMILY, QUALIFIER); actions.add(get); // 5 get of the put in #2 (entire family) - get = new Get(keys.get(10)); + get = new Get(KEYS[10]); get.addFamily(BYTES_FAMILY); actions.add(get); // 6 get of the delete from #3 - get = new Get(keys.get(20)); + get = new Get(KEYS[20]); get.addColumn(BYTES_FAMILY, QUALIFIER); actions.add(get); // 7 put of new column - put = new Put(keys.get(40)); + put = new Put(KEYS[40]); put.add(BYTES_FAMILY, qual2, val2); actions.add(put); @@ -363,7 +384,7 @@ public class TestMultiParallel extends MultiRegionTable { validateEmpty(results[7]); // validate last put, externally from the batch - get = new Get(keys.get(40)); + get = new Get(KEYS[40]); get.addColumn(BYTES_FAMILY, qual2); Result r = table.get(get); validateResult(r, qual2, val2); @@ -376,13 +397,13 @@ public class TestMultiParallel extends MultiRegionTable { } private void validateResult(Result r, byte[] qual, byte[] val) { - assertTrue(r.containsColumn(BYTES_FAMILY, qual)); - assertEquals(0, Bytes.compareTo(val, r.getValue(BYTES_FAMILY, qual))); + Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual)); + Assert.assertEquals(0, Bytes.compareTo(val, r.getValue(BYTES_FAMILY, qual))); } private List constructPutRequests() { List puts = new ArrayList(); - for (byte[] k : keys) { + for (byte[] k : KEYS) { Put put = new Put(k); put.add(BYTES_FAMILY, QUALIFIER, VALUE); puts.add(put); @@ -392,28 +413,28 @@ public class TestMultiParallel extends MultiRegionTable { private void validateLoadedData(HTable table) throws IOException { // get the data back and validate that it is correct - for (byte[] k : keys) { + for (byte[] k : KEYS) { + LOG.info("Assert=" + Bytes.toString(k)); Get get = new Get(k); get.addColumn(BYTES_FAMILY, QUALIFIER); Result r = table.get(get); - assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER)); - assertEquals(0, Bytes.compareTo(VALUE, r + Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER)); + Assert.assertEquals(0, Bytes.compareTo(VALUE, r .getValue(BYTES_FAMILY, QUALIFIER))); } } private void validateEmpty(Result result) { - assertTrue(result != null); - assertTrue(result.getRow() == null); - assertEquals(0, result.raw().length); + Assert.assertTrue(result != null); + Assert.assertTrue(result.getRow() == null); + Assert.assertEquals(0, result.raw().length); } private void validateSizeAndEmpty(Result[] results, int expectedSize) { // Validate got back the same number of Result objects, all empty - assertEquals(expectedSize, results.length); + Assert.assertEquals(expectedSize, results.length); for (Result result : results) { validateEmpty(result); } } - -} +} \ No newline at end of file