HBASE-2979 Fix failing TestMultParrallel in hudson build
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@997437 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
787d807b37
commit
5d8b73585f
|
@ -521,6 +521,7 @@ Release 0.21.0 - Unreleased
|
||||||
HBASE-2983 TestHLog unit test is mis-comparing an assertion
|
HBASE-2983 TestHLog unit test is mis-comparing an assertion
|
||||||
(Alex Newman via Todd Lipcon)
|
(Alex Newman via Todd Lipcon)
|
||||||
HBASE-2986 multi writable can npe causing client hang
|
HBASE-2986 multi writable can npe causing client hang
|
||||||
|
HBASE-2979 Fix failing TestMultParrallel in hudson build
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-1760 Cleanup TODOs in HTable
|
HBASE-1760 Cleanup TODOs in HTable
|
||||||
|
|
|
@ -82,89 +82,3 @@ else
|
||||||
--hosts "${HBASE_REGIONSERVERS}" restart regionserver
|
--hosts "${HBASE_REGIONSERVERS}" restart regionserver
|
||||||
|
|
||||||
fi
|
fi
|
||||||
|
|
||||||
#!/usr/bin/env bash
|
|
||||||
#
|
|
||||||
#/**
|
|
||||||
# * Copyright 2010 The Apache Software Foundation
|
|
||||||
# *
|
|
||||||
# * Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
# * or more contributor license agreements. See the NOTICE file
|
|
||||||
# * distributed with this work for additional information
|
|
||||||
# * regarding copyright ownership. The ASF licenses this file
|
|
||||||
# * to you under the Apache License, Version 2.0 (the
|
|
||||||
# * "License"); you may not use this file except in compliance
|
|
||||||
# * with the License. You may obtain a copy of the License at
|
|
||||||
# *
|
|
||||||
# * http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
# *
|
|
||||||
# * Unless required by applicable law or agreed to in writing, software
|
|
||||||
# * distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# * See the License for the specific language governing permissions and
|
|
||||||
# * limitations under the License.
|
|
||||||
# */
|
|
||||||
#
|
|
||||||
# Run a shell command on all regionserver hosts.
|
|
||||||
#
|
|
||||||
# Environment Variables
|
|
||||||
#
|
|
||||||
# HBASE_REGIONSERVERS File naming remote hosts.
|
|
||||||
# Default is ${HADOOP_CONF_DIR}/regionservers
|
|
||||||
# HADOOP_CONF_DIR Alternate conf dir. Default is ${HADOOP_HOME}/conf.
|
|
||||||
# HBASE_CONF_DIR Alternate hbase conf dir. Default is ${HBASE_HOME}/conf.
|
|
||||||
# HADOOP_SLAVE_SLEEP Seconds to sleep between spawning remote commands.
|
|
||||||
# HADOOP_SLAVE_TIMEOUT Seconds to wait for timing out a remote command.
|
|
||||||
# HADOOP_SSH_OPTS Options passed to ssh when running remote commands.
|
|
||||||
#
|
|
||||||
# Modelled after $HADOOP_HOME/bin/slaves.sh.
|
|
||||||
|
|
||||||
usage="Usage: $0 [--config <hbase-confdir>] commands..."
|
|
||||||
|
|
||||||
bin=`dirname "$0"`
|
|
||||||
bin=`cd "$bin">/dev/null; pwd`
|
|
||||||
|
|
||||||
. "$bin"/hbase-config.sh
|
|
||||||
|
|
||||||
# start hbase daemons
|
|
||||||
errCode=$?
|
|
||||||
if [ $errCode -ne 0 ]
|
|
||||||
then
|
|
||||||
exit $errCode
|
|
||||||
fi
|
|
||||||
|
|
||||||
# quick function to get a value from the HBase config file
|
|
||||||
distMode=`$bin/hbase org.apache.hadoop.hbase.util.HBaseConfTool hbase.cluster.distributed`
|
|
||||||
if [ "$distMode" == 'false' ]; then
|
|
||||||
"$bin"/hbase-daemon.sh restart master
|
|
||||||
else
|
|
||||||
# stop all masters before re-start to avoid races for master znode
|
|
||||||
"$bin"/hbase-daemon.sh --config "${HBASE_CONF_DIR}" stop master
|
|
||||||
"$bin"/hbase-daemons.sh --config "${HBASE_CONF_DIR}" \
|
|
||||||
--hosts "${HBASE_BACKUP_MASTERS}" stop master-backup
|
|
||||||
|
|
||||||
# make sure the master znode has been deleted before continuing
|
|
||||||
zparent=`$bin/hbase org.apache.hadoop.hbase.util.HBaseConfTool zookeeper.znode.parent`
|
|
||||||
if [ "$zparent" == "null" ]; then zparent="/hbase"; fi
|
|
||||||
zmaster=`$bin/hbase org.apache.hadoop.hbase.util.HBaseConfTool zookeeper.znode.master`
|
|
||||||
if [ "$zmaster" == "null" ]; then zmaster="master"; fi
|
|
||||||
zmaster=$zparent/$zmaster
|
|
||||||
echo -n "Waiting for Master ZNode to expire"
|
|
||||||
while bin/hbase zkcli stat $zmaster >/dev/null 2>&1; do
|
|
||||||
echo -n "."
|
|
||||||
sleep 1
|
|
||||||
done
|
|
||||||
echo #force a newline
|
|
||||||
|
|
||||||
# all masters are down, now restart
|
|
||||||
"$bin"/hbase-daemon.sh --config "${HBASE_CONF_DIR}" start master
|
|
||||||
"$bin"/hbase-daemons.sh --config "${HBASE_CONF_DIR}" \
|
|
||||||
--hosts "${HBASE_BACKUP_MASTERS}" start master-backup
|
|
||||||
|
|
||||||
# unlike the masters, roll all regionservers one-at-a-time
|
|
||||||
export HBASE_SLAVE_PARALLEL=false
|
|
||||||
"$bin"/hbase-daemons.sh --config "${HBASE_CONF_DIR}" \
|
|
||||||
--hosts "${HBASE_REGIONSERVERS}" restart regionserver
|
|
||||||
|
|
||||||
fi
|
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,8 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||||
|
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import org.apache.hadoop.hbase.master.HMaster;
|
import org.apache.hadoop.hbase.master.HMaster;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||||
|
@ -166,7 +168,8 @@ public class LocalHBaseCluster {
|
||||||
public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
|
public List<JVMClusterUtil.RegionServerThread> getLiveRegionServers() {
|
||||||
List<JVMClusterUtil.RegionServerThread> liveServers =
|
List<JVMClusterUtil.RegionServerThread> liveServers =
|
||||||
new ArrayList<JVMClusterUtil.RegionServerThread>();
|
new ArrayList<JVMClusterUtil.RegionServerThread>();
|
||||||
for (JVMClusterUtil.RegionServerThread rst: getRegionServers()) {
|
List<RegionServerThread> list = getRegionServers();
|
||||||
|
for (JVMClusterUtil.RegionServerThread rst: list) {
|
||||||
if (rst.isAlive()) liveServers.add(rst);
|
if (rst.isAlive()) liveServers.add(rst);
|
||||||
}
|
}
|
||||||
return liveServers;
|
return liveServers;
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.apache.hadoop.hbase.catalog;
|
package org.apache.hadoop.hbase.catalog;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.UndeclaredThrowableException;
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
@ -34,6 +35,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
import org.apache.hadoop.hbase.client.HConnection;
|
import org.apache.hadoop.hbase.client.HConnection;
|
||||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.zookeeper.MetaNodeTracker;
|
import org.apache.hadoop.hbase.zookeeper.MetaNodeTracker;
|
||||||
import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
|
import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
|
||||||
|
@ -43,38 +45,34 @@ import org.apache.zookeeper.KeeperException;
|
||||||
/**
|
/**
|
||||||
* Tracks the availability of the catalog tables <code>-ROOT-</code> and
|
* Tracks the availability of the catalog tables <code>-ROOT-</code> and
|
||||||
* <code>.META.</code>.
|
* <code>.META.</code>.
|
||||||
* <p>
|
*
|
||||||
* This class is "read-only" in that the locations of the catalog tables cannot
|
* This class is "read-only" in that the locations of the catalog tables cannot
|
||||||
* be explicitly set. Instead, ZooKeeper is used to learn of the availability
|
* be explicitly set. Instead, ZooKeeper is used to learn of the availability
|
||||||
* and location of ROOT. ROOT is used to learn of the location of META. If not
|
* and location of <code>-ROOT-</code>. <code>-ROOT-</code> is used to learn of
|
||||||
* available in ROOT, ZooKeeper is used to monitor for a new location of META.
|
* the location of <code>.META.</code> If not available in <code>-ROOT-</code>,
|
||||||
|
* ZooKeeper is used to monitor for a new location of <code>.META.</code>.
|
||||||
|
*
|
||||||
* <p>Call {@link #start()} to start up operation.
|
* <p>Call {@link #start()} to start up operation.
|
||||||
*/
|
*/
|
||||||
public class CatalogTracker {
|
public class CatalogTracker {
|
||||||
private static final Log LOG = LogFactory.getLog(CatalogTracker.class);
|
private static final Log LOG = LogFactory.getLog(CatalogTracker.class);
|
||||||
|
|
||||||
private final HConnection connection;
|
private final HConnection connection;
|
||||||
|
|
||||||
private final ZooKeeperWatcher zookeeper;
|
private final ZooKeeperWatcher zookeeper;
|
||||||
|
|
||||||
private final RootRegionTracker rootRegionTracker;
|
private final RootRegionTracker rootRegionTracker;
|
||||||
|
|
||||||
private final MetaNodeTracker metaNodeTracker;
|
private final MetaNodeTracker metaNodeTracker;
|
||||||
|
|
||||||
private final AtomicBoolean metaAvailable = new AtomicBoolean(false);
|
private final AtomicBoolean metaAvailable = new AtomicBoolean(false);
|
||||||
private HServerAddress metaLocation;
|
private HServerAddress metaLocation;
|
||||||
|
|
||||||
private final int defaultTimeout;
|
private final int defaultTimeout;
|
||||||
|
|
||||||
public static final byte [] ROOT_REGION =
|
public static final byte [] ROOT_REGION =
|
||||||
HRegionInfo.ROOT_REGIONINFO.getRegionName();
|
HRegionInfo.ROOT_REGIONINFO.getRegionName();
|
||||||
|
|
||||||
public static final byte [] META_REGION =
|
public static final byte [] META_REGION =
|
||||||
HRegionInfo.FIRST_META_REGIONINFO.getRegionName();
|
HRegionInfo.FIRST_META_REGIONINFO.getRegionName();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs the catalog tracker. Find current state of catalog tables and
|
* Constructs the catalog tracker. Find current state of catalog tables and
|
||||||
* begin active tracking by executing {@link #start()}.
|
* begin active tracking by executing {@link #start()} post construction.
|
||||||
|
* Does not timeout.
|
||||||
* @param zk
|
* @param zk
|
||||||
* @param connection server connection
|
* @param connection server connection
|
||||||
* @param abortable if fatal exception
|
* @param abortable if fatal exception
|
||||||
|
@ -88,11 +86,12 @@ public class CatalogTracker {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs the catalog tracker. Find current state of catalog tables and
|
* Constructs the catalog tracker. Find current state of catalog tables and
|
||||||
* begin active tracking by executing {@link #start()}.
|
* begin active tracking by executing {@link #start()} post construction.
|
||||||
* @param zk
|
* @param zk
|
||||||
* @param connection server connection
|
* @param connection server connection
|
||||||
* @param abortable if fatal exception
|
* @param abortable if fatal exception
|
||||||
* @param defaultTimeout Timeout to use.
|
* @param defaultTimeout Timeout to use. Pass zero for no timeout
|
||||||
|
* ({@link Object#wait(long)} when passed a <code>0</code> waits for ever).
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public CatalogTracker(final ZooKeeperWatcher zk, final HConnection connection,
|
public CatalogTracker(final ZooKeeperWatcher zk, final HConnection connection,
|
||||||
|
@ -101,26 +100,22 @@ public class CatalogTracker {
|
||||||
this.zookeeper = zk;
|
this.zookeeper = zk;
|
||||||
this.connection = connection;
|
this.connection = connection;
|
||||||
this.rootRegionTracker = new RootRegionTracker(zookeeper, abortable);
|
this.rootRegionTracker = new RootRegionTracker(zookeeper, abortable);
|
||||||
this.metaNodeTracker = new MetaNodeTracker(zookeeper, this);
|
this.metaNodeTracker = new MetaNodeTracker(zookeeper, this, abortable);
|
||||||
this.defaultTimeout = defaultTimeout;
|
this.defaultTimeout = defaultTimeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Starts the catalog tracker.
|
* Starts the catalog tracker.
|
||||||
* <p>
|
|
||||||
* Determines current availability of catalog tables and ensures all further
|
* Determines current availability of catalog tables and ensures all further
|
||||||
* transitions of either region is tracked.
|
* transitions of either region are tracked.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
public void start() throws IOException, InterruptedException {
|
public void start() throws IOException, InterruptedException {
|
||||||
// Register listeners with zk
|
this.rootRegionTracker.start();
|
||||||
zookeeper.registerListener(rootRegionTracker);
|
this.metaNodeTracker.start();
|
||||||
zookeeper.registerListener(metaNodeTracker);
|
|
||||||
// Start root tracking
|
|
||||||
rootRegionTracker.start();
|
|
||||||
// Determine meta assignment; may not work because root and meta not yet
|
// Determine meta assignment; may not work because root and meta not yet
|
||||||
// deployed.
|
// deployed. Calling the below will set {@link #metaLocation}.
|
||||||
getMetaServerConnection(true);
|
getMetaServerConnection(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -148,7 +143,7 @@ public class CatalogTracker {
|
||||||
*/
|
*/
|
||||||
public void waitForRoot()
|
public void waitForRoot()
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
rootRegionTracker.getRootRegionLocation();
|
this.rootRegionTracker.blockUntilAvailable();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -161,7 +156,7 @@ public class CatalogTracker {
|
||||||
* @throws NotAllMetaRegionsOnlineException if root not available before
|
* @throws NotAllMetaRegionsOnlineException if root not available before
|
||||||
* timeout
|
* timeout
|
||||||
*/
|
*/
|
||||||
public HServerAddress waitForRoot(final long timeout)
|
HServerAddress waitForRoot(final long timeout)
|
||||||
throws InterruptedException, NotAllMetaRegionsOnlineException {
|
throws InterruptedException, NotAllMetaRegionsOnlineException {
|
||||||
HServerAddress address = rootRegionTracker.waitRootRegionLocation(timeout);
|
HServerAddress address = rootRegionTracker.waitRootRegionLocation(timeout);
|
||||||
if (address == null) {
|
if (address == null) {
|
||||||
|
@ -235,27 +230,27 @@ public class CatalogTracker {
|
||||||
*/
|
*/
|
||||||
private HRegionInterface getMetaServerConnection(boolean refresh)
|
private HRegionInterface getMetaServerConnection(boolean refresh)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
synchronized(metaAvailable) {
|
synchronized (metaAvailable) {
|
||||||
if(metaAvailable.get()) {
|
if (metaAvailable.get()) {
|
||||||
HRegionInterface current = getCachedConnection(metaLocation);
|
HRegionInterface current = getCachedConnection(metaLocation);
|
||||||
if(!refresh) {
|
if (!refresh) {
|
||||||
return current;
|
return current;
|
||||||
}
|
}
|
||||||
if(verifyRegionLocation(current, META_REGION)) {
|
if (verifyRegionLocation(current, META_REGION)) {
|
||||||
return current;
|
return current;
|
||||||
}
|
}
|
||||||
resetMetaLocation();
|
resetMetaLocation();
|
||||||
}
|
}
|
||||||
HRegionInterface rootConnection = getRootServerConnection();
|
HRegionInterface rootConnection = getRootServerConnection();
|
||||||
if(rootConnection == null) {
|
if (rootConnection == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
HServerAddress newLocation = MetaReader.readMetaLocation(rootConnection);
|
HServerAddress newLocation = MetaReader.readMetaLocation(rootConnection);
|
||||||
if(newLocation == null) {
|
if (newLocation == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
HRegionInterface newConnection = getCachedConnection(newLocation);
|
HRegionInterface newConnection = getCachedConnection(newLocation);
|
||||||
if(verifyRegionLocation(newConnection, META_REGION)) {
|
if (verifyRegionLocation(newConnection, META_REGION)) {
|
||||||
setMetaLocation(newLocation);
|
setMetaLocation(newLocation);
|
||||||
return newConnection;
|
return newConnection;
|
||||||
}
|
}
|
||||||
|
@ -270,7 +265,7 @@ public class CatalogTracker {
|
||||||
*/
|
*/
|
||||||
public void waitForMeta() throws InterruptedException {
|
public void waitForMeta() throws InterruptedException {
|
||||||
synchronized(metaAvailable) {
|
synchronized(metaAvailable) {
|
||||||
while(!metaAvailable.get()) {
|
while (!metaAvailable.get()) {
|
||||||
metaAvailable.wait();
|
metaAvailable.wait();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -339,8 +334,8 @@ public class CatalogTracker {
|
||||||
|
|
||||||
private void resetMetaLocation() {
|
private void resetMetaLocation() {
|
||||||
LOG.info("Current cached META location is not valid, resetting");
|
LOG.info("Current cached META location is not valid, resetting");
|
||||||
metaAvailable.set(false);
|
this.metaAvailable.set(false);
|
||||||
metaLocation = null;
|
this.metaLocation = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setMetaLocation(HServerAddress metaLocation) {
|
private void setMetaLocation(HServerAddress metaLocation) {
|
||||||
|
@ -368,11 +363,20 @@ public class CatalogTracker {
|
||||||
|
|
||||||
private boolean verifyRegionLocation(HRegionInterface metaServer,
|
private boolean verifyRegionLocation(HRegionInterface metaServer,
|
||||||
byte [] regionName) {
|
byte [] regionName) {
|
||||||
|
Throwable t = null;
|
||||||
try {
|
try {
|
||||||
return metaServer.getRegionInfo(regionName) != null;
|
return metaServer.getRegionInfo(regionName) != null;
|
||||||
} catch (NotServingRegionException e) {
|
} catch (NotServingRegionException e) {
|
||||||
return false;
|
t = e;
|
||||||
|
} catch (UndeclaredThrowableException e) {
|
||||||
|
// We can get a ConnectException wrapped by a UTE if client fails connect
|
||||||
|
// If not a ConnectException, rethrow.
|
||||||
|
if (!(e.getCause() instanceof ConnectException)) throw e;
|
||||||
|
t = e.getCause();
|
||||||
}
|
}
|
||||||
|
LOG.info("Failed verification of " + Bytes.toString(regionName) +
|
||||||
|
": " + t.getMessage());
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -407,4 +411,8 @@ public class CatalogTracker {
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
MetaNodeTracker getMetaNodeTracker() {
|
||||||
|
return this.metaNodeTracker;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -20,6 +20,7 @@
|
||||||
package org.apache.hadoop.hbase.catalog;
|
package org.apache.hadoop.hbase.catalog;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.ConnectException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -115,11 +116,15 @@ public class MetaEditor {
|
||||||
* @param regionInfo region to update location of
|
* @param regionInfo region to update location of
|
||||||
* @param serverInfo server the region is located on
|
* @param serverInfo server the region is located on
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
* @throws ConnectException Usually because the regionserver carrying .META.
|
||||||
|
* is down.
|
||||||
|
* @throws NullPointerException Because no -ROOT- server connection
|
||||||
*/
|
*/
|
||||||
public static void updateMetaLocation(CatalogTracker catalogTracker,
|
public static void updateMetaLocation(CatalogTracker catalogTracker,
|
||||||
HRegionInfo regionInfo, HServerInfo serverInfo)
|
HRegionInfo regionInfo, HServerInfo serverInfo)
|
||||||
throws IOException {
|
throws IOException, ConnectException {
|
||||||
HRegionInterface server = catalogTracker.waitForRootServerConnectionDefault();
|
HRegionInterface server = catalogTracker.waitForRootServerConnectionDefault();
|
||||||
|
if (server == null) throw new NullPointerException("No server for -ROOT-");
|
||||||
updateLocation(server, CatalogTracker.ROOT_REGION, regionInfo, serverInfo);
|
updateLocation(server, CatalogTracker.ROOT_REGION, regionInfo, serverInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,9 +157,10 @@ public class MetaEditor {
|
||||||
* @param catalogRegionName name of catalog region being updated
|
* @param catalogRegionName name of catalog region being updated
|
||||||
* @param regionInfo region to update location of
|
* @param regionInfo region to update location of
|
||||||
* @param serverInfo server the region is located on
|
* @param serverInfo server the region is located on
|
||||||
* @throws IOException
|
* @throws IOException In particular could throw {@link java.net.ConnectException}
|
||||||
|
* if the server is down on other end.
|
||||||
*/
|
*/
|
||||||
public static void updateLocation(HRegionInterface server,
|
private static void updateLocation(HRegionInterface server,
|
||||||
byte [] catalogRegionName, HRegionInfo regionInfo, HServerInfo serverInfo)
|
byte [] catalogRegionName, HRegionInfo regionInfo, HServerInfo serverInfo)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Put put = new Put(regionInfo.getRegionName());
|
Put put = new Put(regionInfo.getRegionName());
|
||||||
|
|
|
@ -273,7 +273,9 @@ public class HBaseAdmin implements Abortable {
|
||||||
byte [] lastKey = null;
|
byte [] lastKey = null;
|
||||||
for(byte [] splitKey : splitKeys) {
|
for(byte [] splitKey : splitKeys) {
|
||||||
if(lastKey != null && Bytes.equals(splitKey, lastKey)) {
|
if(lastKey != null && Bytes.equals(splitKey, lastKey)) {
|
||||||
throw new IllegalArgumentException("All split keys must be unique, found duplicate");
|
throw new IllegalArgumentException("All split keys must be unique, " +
|
||||||
|
"found duplicate: " + Bytes.toStringBinary(splitKey) +
|
||||||
|
", " + Bytes.toStringBinary(lastKey));
|
||||||
}
|
}
|
||||||
lastKey = splitKey;
|
lastKey = splitKey;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,14 +20,15 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.mortbay.log.Log;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -35,6 +36,7 @@ import java.io.IOException;
|
||||||
* Used by {@link ResultScanner}s made by {@link HTable}.
|
* Used by {@link ResultScanner}s made by {@link HTable}.
|
||||||
*/
|
*/
|
||||||
public class ScannerCallable extends ServerCallable<Result[]> {
|
public class ScannerCallable extends ServerCallable<Result[]> {
|
||||||
|
private static final Log LOG = LogFactory.getLog(ScannerCallable.class);
|
||||||
private long scannerId = -1L;
|
private long scannerId = -1L;
|
||||||
private boolean instantiated = false;
|
private boolean instantiated = false;
|
||||||
private boolean closed = false;
|
private boolean closed = false;
|
||||||
|
@ -103,7 +105,7 @@ public class ScannerCallable extends ServerCallable<Result[]> {
|
||||||
try {
|
try {
|
||||||
this.server.close(this.scannerId);
|
this.server.close(this.scannerId);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
Log.warn("Ignore, probably already closed", e);
|
LOG.warn("Ignore, probably already closed", e);
|
||||||
}
|
}
|
||||||
this.scannerId = -1L;
|
this.scannerId = -1L;
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,9 +31,9 @@ import org.apache.hadoop.hbase.Stoppable;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.MultiAction;
|
import org.apache.hadoop.hbase.client.MultiAction;
|
||||||
import org.apache.hadoop.hbase.client.MultiResponse;
|
|
||||||
import org.apache.hadoop.hbase.client.MultiPut;
|
import org.apache.hadoop.hbase.client.MultiPut;
|
||||||
import org.apache.hadoop.hbase.client.MultiPutResponse;
|
import org.apache.hadoop.hbase.client.MultiPutResponse;
|
||||||
|
import org.apache.hadoop.hbase.client.MultiResponse;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
@ -56,7 +56,6 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion, Stoppable, Ab
|
||||||
public HRegionInfo getRegionInfo(final byte [] regionName)
|
public HRegionInfo getRegionInfo(final byte [] regionName)
|
||||||
throws NotServingRegionException;
|
throws NotServingRegionException;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return all the data for the row that matches <i>row</i> exactly,
|
* Return all the data for the row that matches <i>row</i> exactly,
|
||||||
* or the one that immediately preceeds it.
|
* or the one that immediately preceeds it.
|
||||||
|
@ -288,8 +287,8 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion, Stoppable, Ab
|
||||||
/**
|
/**
|
||||||
* Bulk load an HFile into an open region
|
* Bulk load an HFile into an open region
|
||||||
*/
|
*/
|
||||||
public void bulkLoadHFile(String hfilePath,
|
public void bulkLoadHFile(String hfilePath, byte[] regionName, byte[] familyName)
|
||||||
byte[] regionName, byte[] familyName) throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
// Master methods
|
// Master methods
|
||||||
|
|
||||||
|
|
|
@ -88,7 +88,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
private final Map<String, RegionState> regionsInTransition =
|
private final Map<String, RegionState> regionsInTransition =
|
||||||
new TreeMap<String, RegionState>();
|
new TreeMap<String, RegionState>();
|
||||||
|
|
||||||
/** Plans for region movement. */
|
/** Plans for region movement. Key is the encoded version of a region name*/
|
||||||
// TODO: When do plans get cleaned out? Ever?
|
// TODO: When do plans get cleaned out? Ever?
|
||||||
// Its cleaned on server shutdown processing -- St.Ack
|
// Its cleaned on server shutdown processing -- St.Ack
|
||||||
private final Map<String, RegionPlan> regionPlans =
|
private final Map<String, RegionPlan> regionPlans =
|
||||||
|
@ -240,9 +240,10 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
private void handleRegion(RegionTransitionData data) {
|
private void handleRegion(RegionTransitionData data) {
|
||||||
synchronized(regionsInTransition) {
|
synchronized(regionsInTransition) {
|
||||||
// Verify this is a known server
|
// Verify this is a known server
|
||||||
if(!serverManager.isServerOnline(data.getServerName())) {
|
if (!serverManager.isServerOnline(data.getServerName()) &&
|
||||||
LOG.warn("Attempted to handle region transition for server " +
|
!this.master.getServerName().equals(data.getServerName())) {
|
||||||
data.getServerName() + " but server is not online");
|
LOG.warn("Attempted to handle region transition for server but " +
|
||||||
|
"server is not online: " + data);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
|
String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
|
||||||
|
@ -251,6 +252,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
", server=" + data.getServerName() + ", region=" + prettyPrintedRegionName);
|
", server=" + data.getServerName() + ", region=" + prettyPrintedRegionName);
|
||||||
RegionState regionState = regionsInTransition.get(encodedName);
|
RegionState regionState = regionsInTransition.get(encodedName);
|
||||||
switch(data.getEventType()) {
|
switch(data.getEventType()) {
|
||||||
|
case M2ZK_REGION_OFFLINE:
|
||||||
|
LOG.warn("What to do with this event? " + data);
|
||||||
|
break;
|
||||||
|
|
||||||
case RS2ZK_REGION_CLOSING:
|
case RS2ZK_REGION_CLOSING:
|
||||||
// Should see CLOSING after we have asked it to CLOSE or additional
|
// Should see CLOSING after we have asked it to CLOSE or additional
|
||||||
|
@ -494,7 +498,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
// go of this lock. There must be a better construct that this -- St.Ack 20100811
|
// go of this lock. There must be a better construct that this -- St.Ack 20100811
|
||||||
this.assignLock.lock();
|
this.assignLock.lock();
|
||||||
try {
|
try {
|
||||||
synchronized(regionsInTransition) {
|
synchronized (regionsInTransition) {
|
||||||
state = regionsInTransition.get(encodedName);
|
state = regionsInTransition.get(encodedName);
|
||||||
if(state == null) {
|
if(state == null) {
|
||||||
state = new RegionState(region, RegionState.State.OFFLINE);
|
state = new RegionState(region, RegionState.State.OFFLINE);
|
||||||
|
@ -546,6 +550,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
plan = new RegionPlan(state.getRegion(), null,
|
plan = new RegionPlan(state.getRegion(), null,
|
||||||
LoadBalancer.randomAssignment(serverManager.getOnlineServersList()));
|
LoadBalancer.randomAssignment(serverManager.getOnlineServersList()));
|
||||||
regionPlans.put(encodedName, plan);
|
regionPlans.put(encodedName, plan);
|
||||||
|
} else {
|
||||||
|
LOG.debug("Using preexisting plan=" + plan);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
@ -937,11 +943,23 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
this.regionPlans.entrySet().iterator(); i.hasNext();) {
|
this.regionPlans.entrySet().iterator(); i.hasNext();) {
|
||||||
Map.Entry<String, RegionPlan> e = i.next();
|
Map.Entry<String, RegionPlan> e = i.next();
|
||||||
if (e.getValue().getDestination().equals(hsi)) {
|
if (e.getValue().getDestination().equals(hsi)) {
|
||||||
// Use iterator's remove else we'll get CME.fail a
|
// Use iterator's remove else we'll get CME
|
||||||
i.remove();
|
i.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Remove assignment info related to the downed server. Remove the downed
|
||||||
|
// server from list of servers else it looks like a server w/ no load.
|
||||||
|
synchronized (this.regions) {
|
||||||
|
Set<HRegionInfo> hris = new HashSet<HRegionInfo>();
|
||||||
|
for (Map.Entry<HRegionInfo, HServerInfo> e: this.regions.entrySet()) {
|
||||||
|
// Add to a Set -- don't call setOffline in here else we get a CME.
|
||||||
|
if (e.getValue().equals(hsi)) hris.add(e.getKey());
|
||||||
|
}
|
||||||
|
for (HRegionInfo hri: hris) setOffline(hri);
|
||||||
|
this.servers.remove(hsi);
|
||||||
|
}
|
||||||
|
// If anything in transition related to the server, clean it up.
|
||||||
synchronized (regionsInTransition) {
|
synchronized (regionsInTransition) {
|
||||||
// Iterate all regions in transition checking if were on this server
|
// Iterate all regions in transition checking if were on this server
|
||||||
final String serverName = hsi.getServerName();
|
final String serverName = hsi.getServerName();
|
||||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Chore;
|
import org.apache.hadoop.hbase.Chore;
|
||||||
import org.apache.hadoop.hbase.ClusterStatus;
|
import org.apache.hadoop.hbase.ClusterStatus;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HMsg;
|
import org.apache.hadoop.hbase.HMsg;
|
||||||
|
@ -55,8 +54,8 @@ import org.apache.hadoop.hbase.catalog.MetaReader;
|
||||||
import org.apache.hadoop.hbase.client.HConnection;
|
import org.apache.hadoop.hbase.client.HConnection;
|
||||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||||
import org.apache.hadoop.hbase.client.MetaScanner;
|
import org.apache.hadoop.hbase.client.MetaScanner;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
|
||||||
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
|
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||||
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
|
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
|
||||||
import org.apache.hadoop.hbase.ipc.HBaseRPC;
|
import org.apache.hadoop.hbase.ipc.HBaseRPC;
|
||||||
|
@ -87,7 +86,6 @@ import org.apache.hadoop.io.MapWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.net.DNS;
|
import org.apache.hadoop.net.DNS;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.apache.zookeeper.Watcher;
|
import org.apache.zookeeper.Watcher;
|
||||||
|
|
||||||
|
@ -258,7 +256,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
this.serverManager = new ServerManager(this, this);
|
this.serverManager = new ServerManager(this, this);
|
||||||
|
|
||||||
this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection,
|
this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection,
|
||||||
this, conf.getInt("hbase.master.catalog.timeout", -1));
|
this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE));
|
||||||
this.catalogTracker.start();
|
this.catalogTracker.start();
|
||||||
|
|
||||||
this.assignmentManager = new AssignmentManager(this, serverManager,
|
this.assignmentManager = new AssignmentManager(this, serverManager,
|
||||||
|
@ -563,6 +561,9 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
this.assignmentManager.getRegionsInTransition());
|
this.assignmentManager.getRegionsInTransition());
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
if (!this.serverManager.getDeadServers().isEmpty()) {
|
||||||
|
LOG.debug("Not running balancer because dead regionserver processing");
|
||||||
|
}
|
||||||
Map<HServerInfo, List<HRegionInfo>> assignments =
|
Map<HServerInfo, List<HRegionInfo>> assignments =
|
||||||
this.assignmentManager.getAssignments();
|
this.assignmentManager.getAssignments();
|
||||||
// Returned Map from AM does not include mention of servers w/o assignments.
|
// Returned Map from AM does not include mention of servers w/o assignments.
|
||||||
|
@ -576,6 +577,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
List<RegionPlan> plans = this.balancer.balanceCluster(assignments);
|
List<RegionPlan> plans = this.balancer.balanceCluster(assignments);
|
||||||
if (plans != null && !plans.isEmpty()) {
|
if (plans != null && !plans.isEmpty()) {
|
||||||
for (RegionPlan plan: plans) {
|
for (RegionPlan plan: plans) {
|
||||||
|
LOG.info("balance=" + plan);
|
||||||
this.assignmentManager.balance(plan);
|
this.assignmentManager.balance(plan);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -141,7 +141,7 @@ public class LoadBalancer {
|
||||||
}
|
}
|
||||||
int numRegions = 0;
|
int numRegions = 0;
|
||||||
// Iterate so we can count regions as we build the map
|
// Iterate so we can count regions as we build the map
|
||||||
for(Map.Entry<HServerInfo, List<HRegionInfo>> server :
|
for(Map.Entry<HServerInfo, List<HRegionInfo>> server:
|
||||||
clusterState.entrySet()) {
|
clusterState.entrySet()) {
|
||||||
server.getKey().getLoad().setNumberOfRegions(server.getValue().size());
|
server.getKey().getLoad().setNumberOfRegions(server.getValue().size());
|
||||||
numRegions += server.getKey().getLoad().getNumberOfRegions();
|
numRegions += server.getKey().getLoad().getNumberOfRegions();
|
||||||
|
@ -527,8 +527,6 @@ public class LoadBalancer {
|
||||||
private final HServerInfo source;
|
private final HServerInfo source;
|
||||||
private HServerInfo dest;
|
private HServerInfo dest;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Instantiate a plan for a region move, moving the specified region from
|
* Instantiate a plan for a region move, moving the specified region from
|
||||||
* the specified source server to the specified destination server.
|
* the specified source server to the specified destination server.
|
||||||
|
@ -589,5 +587,11 @@ public class LoadBalancer {
|
||||||
public int compareTo(RegionPlan o) {
|
public int compareTo(RegionPlan o) {
|
||||||
return getRegionName().compareTo(o.getRegionName());
|
return getRegionName().compareTo(o.getRegionName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "hri=" + this.hri.getRegionNameAsString() + ", src=" +
|
||||||
|
this.source.getServerName() + ", dest=" + this.dest.getServerName();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -499,7 +499,7 @@ public class ServerManager {
|
||||||
*/
|
*/
|
||||||
public void sendRegionOpen(HServerInfo server, HRegionInfo region) {
|
public void sendRegionOpen(HServerInfo server, HRegionInfo region) {
|
||||||
HRegionInterface hri = getServerConnection(server);
|
HRegionInterface hri = getServerConnection(server);
|
||||||
if(hri == null) {
|
if (hri == null) {
|
||||||
LOG.warn("Attempting to send OPEN RPC to server " + server.getServerName()
|
LOG.warn("Attempting to send OPEN RPC to server " + server.getServerName()
|
||||||
+ " failed because no RPC connection found to this server");
|
+ " failed because no RPC connection found to this server");
|
||||||
return;
|
return;
|
||||||
|
@ -533,7 +533,7 @@ public class ServerManager {
|
||||||
HConnection connection =
|
HConnection connection =
|
||||||
HConnectionManager.getConnection(this.master.getConfiguration());
|
HConnectionManager.getConnection(this.master.getConfiguration());
|
||||||
HRegionInterface hri = serverConnections.get(info.getServerName());
|
HRegionInterface hri = serverConnections.get(info.getServerName());
|
||||||
if(hri == null) {
|
if (hri == null) {
|
||||||
LOG.info("new connection");
|
LOG.info("new connection");
|
||||||
hri = connection.getHRegionConnection(info.getServerAddress(), false);
|
hri = connection.getHRegionConnection(info.getServerAddress(), false);
|
||||||
serverConnections.put(info.getServerName(), hri);
|
serverConnections.put(info.getServerName(), hri);
|
||||||
|
|
|
@ -23,7 +23,6 @@ import java.io.IOException;
|
||||||
import java.lang.Thread.UncaughtExceptionHandler;
|
import java.lang.Thread.UncaughtExceptionHandler;
|
||||||
import java.lang.management.ManagementFactory;
|
import java.lang.management.ManagementFactory;
|
||||||
import java.lang.management.MemoryUsage;
|
import java.lang.management.MemoryUsage;
|
||||||
import java.lang.management.RuntimeMXBean;
|
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
import java.net.BindException;
|
import java.net.BindException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
@ -64,7 +63,6 @@ import org.apache.hadoop.hbase.HServerAddress;
|
||||||
import org.apache.hadoop.hbase.HServerInfo;
|
import org.apache.hadoop.hbase.HServerInfo;
|
||||||
import org.apache.hadoop.hbase.HServerLoad;
|
import org.apache.hadoop.hbase.HServerLoad;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.LocalHBaseCluster;
|
|
||||||
import org.apache.hadoop.hbase.MasterAddressTracker;
|
import org.apache.hadoop.hbase.MasterAddressTracker;
|
||||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||||
|
@ -352,7 +350,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
|
|
||||||
// create the catalog tracker and start it
|
// create the catalog tracker and start it
|
||||||
this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection,
|
this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection,
|
||||||
this, this.conf.getInt("hbase.regionserver.catalog.timeout", -1));
|
this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE));
|
||||||
catalogTracker.start();
|
catalogTracker.start();
|
||||||
|
|
||||||
this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this);
|
this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this);
|
||||||
|
@ -1143,7 +1141,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
RootLocationEditor.setRootLocation(getZooKeeper(),
|
RootLocationEditor.setRootLocation(getZooKeeper(),
|
||||||
getServerInfo().getServerAddress());
|
getServerInfo().getServerAddress());
|
||||||
} else if (r.getRegionInfo().isMetaRegion()) {
|
} else if (r.getRegionInfo().isMetaRegion()) {
|
||||||
// TODO: doh, this has weird naming between RootEditor/MetaEditor
|
|
||||||
MetaEditor.updateMetaLocation(ct, r.getRegionInfo(), getServerInfo());
|
MetaEditor.updateMetaLocation(ct, r.getRegionInfo(), getServerInfo());
|
||||||
} else {
|
} else {
|
||||||
if (daughter) {
|
if (daughter) {
|
||||||
|
@ -1361,6 +1358,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public HRegionInfo getRegionInfo(final byte[] regionName)
|
public HRegionInfo getRegionInfo(final byte[] regionName)
|
||||||
throws NotServingRegionException {
|
throws NotServingRegionException {
|
||||||
requestCount.incrementAndGet();
|
requestCount.incrementAndGet();
|
||||||
|
@ -2231,14 +2229,13 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
if (multi.size() == 1) {
|
if (multi.size() == 1) {
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} else {
|
} else {
|
||||||
LOG.error("Exception found while attempting " + action.toString()
|
LOG.error("Exception found while attempting " + action.toString() +
|
||||||
+ " " + StringUtils.stringifyException(ioe));
|
" " + StringUtils.stringifyException(ioe));
|
||||||
response.add(regionName,null);
|
response.add(regionName,null);
|
||||||
// stop processing on this region, continue to the next.
|
// stop processing on this region, continue to the next.
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -155,7 +155,7 @@ public class OpenRegionHandler extends EventHandler {
|
||||||
|
|
||||||
// Finally, Transition ZK node to OPENED
|
// Finally, Transition ZK node to OPENED
|
||||||
try {
|
try {
|
||||||
if(ZKAssign.transitionNodeOpened(server.getZooKeeper(), regionInfo,
|
if (ZKAssign.transitionNodeOpened(server.getZooKeeper(), regionInfo,
|
||||||
server.getServerName(), openingVersion) == -1) {
|
server.getServerName(), openingVersion) == -1) {
|
||||||
LOG.warn("Completed the OPEN of a region but when transitioning from " +
|
LOG.warn("Completed the OPEN of a region but when transitioning from " +
|
||||||
" OPENING to OPENED got a version mismatch, someone else clashed " +
|
" OPENING to OPENED got a version mismatch, someone else clashed " +
|
||||||
|
|
|
@ -23,24 +23,23 @@ import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tracks the unassigned zookeeper node used by the META table.
|
* Tracks the unassigned zookeeper node used by the META table.
|
||||||
* <p>
|
*
|
||||||
* A callback is made into a {@link CatalogTracker} when META completes a new
|
* A callback is made into the passed {@link CatalogTracker} when
|
||||||
* assignment.
|
* <code>.META.</code> completes a new assignment.
|
||||||
* <p>
|
* <p>
|
||||||
* If META is already assigned when instantiating this class, you will not
|
* If META is already assigned when instantiating this class, you will not
|
||||||
* receive any notification for that assignment. You will receive a
|
* receive any notification for that assignment. You will receive a
|
||||||
* notification after META has been successfully assigned to a new location.
|
* notification after META has been successfully assigned to a new location.
|
||||||
*/
|
*/
|
||||||
public class MetaNodeTracker extends ZooKeeperListener {
|
public class MetaNodeTracker extends ZooKeeperNodeTracker {
|
||||||
private static final Log LOG = LogFactory.getLog(MetaNodeTracker.class);
|
private static final Log LOG = LogFactory.getLog(MetaNodeTracker.class);
|
||||||
|
|
||||||
private final String node;
|
|
||||||
|
|
||||||
/** Catalog tracker to notify when META has a new assignment completed. */
|
/** Catalog tracker to notify when META has a new assignment completed. */
|
||||||
private final CatalogTracker catalogTracker;
|
private final CatalogTracker catalogTracker;
|
||||||
|
|
||||||
|
@ -49,25 +48,22 @@ public class MetaNodeTracker extends ZooKeeperListener {
|
||||||
* @param watcher
|
* @param watcher
|
||||||
* @param abortable
|
* @param abortable
|
||||||
*/
|
*/
|
||||||
public MetaNodeTracker(ZooKeeperWatcher watcher,
|
public MetaNodeTracker(final ZooKeeperWatcher watcher,
|
||||||
CatalogTracker catalogTracker) {
|
final CatalogTracker catalogTracker, final Abortable abortable) {
|
||||||
super(watcher);
|
super(watcher, ZKUtil.joinZNode(watcher.assignmentZNode,
|
||||||
|
HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()), abortable);
|
||||||
this.catalogTracker = catalogTracker;
|
this.catalogTracker = catalogTracker;
|
||||||
node = ZKUtil.joinZNode(watcher.assignmentZNode,
|
|
||||||
HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void nodeDeleted(String path) {
|
public void nodeDeleted(String path) {
|
||||||
if(path.equals(node)) {
|
if (!path.equals(node)) return;
|
||||||
LOG.info("Detected completed assignment of META, notifying catalog " +
|
LOG.info("Detected completed assignment of META, notifying catalog tracker");
|
||||||
"tracker");
|
|
||||||
try {
|
try {
|
||||||
catalogTracker.waitForMetaServerConnectionDefault();
|
this.catalogTracker.waitForMetaServerConnectionDefault();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Tried to reset META server location after seeing the " +
|
LOG.warn("Tried to reset META server location after seeing the " +
|
||||||
"completion of a new META assignment but got an IOE", e);
|
"completion of a new META assignment but got an IOE", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -28,7 +28,8 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
/**
|
/**
|
||||||
* Tracks the root region server location node in zookeeper.
|
* Tracks the root region server location node in zookeeper.
|
||||||
* Root region location is set by {@link RootLocationEditor} usually called
|
* Root region location is set by {@link RootLocationEditor} usually called
|
||||||
* out of {@link RegionServerServices#postOpenDeployTasks(org.apache.hadoop.hbase.regionserver.HRegion, org.apache.hadoop.hbase.catalog.CatalogTracker)}
|
* out of {@link RegionServerServices#postOpenDeployTasks(org.apache.hadoop.hbase.regionserver.HRegion, org.apache.hadoop.hbase.catalog.CatalogTracker)}.
|
||||||
|
* This class has a watcher on the root location and notices changes.
|
||||||
*/
|
*/
|
||||||
public class RootRegionTracker extends ZooKeeperNodeTracker {
|
public class RootRegionTracker extends ZooKeeperNodeTracker {
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -132,6 +132,10 @@ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getNode() {
|
||||||
|
return this.node;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void nodeCreated(String path) {
|
public synchronized void nodeCreated(String path) {
|
||||||
if(path.equals(node)) {
|
if(path.equals(node)) {
|
||||||
|
|
|
@ -169,9 +169,6 @@ public class ZooKeeperWatcher implements Watcher {
|
||||||
"state=" + event.getState() + ", " +
|
"state=" + event.getState() + ", " +
|
||||||
"path=" + event.getPath());
|
"path=" + event.getPath());
|
||||||
|
|
||||||
// While we are still using both ZKWs, need to call parent process()
|
|
||||||
// super.process(event);
|
|
||||||
|
|
||||||
switch(event.getType()) {
|
switch(event.getType()) {
|
||||||
|
|
||||||
// If event type is NONE, this is a connection status change
|
// If event type is NONE, this is a connection status change
|
||||||
|
|
|
@ -609,6 +609,18 @@ public class HBaseTestingUtility {
|
||||||
return createMultiRegions(getConfiguration(), table, columnFamily);
|
return createMultiRegions(getConfiguration(), table, columnFamily);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static final byte[][] KEYS = {
|
||||||
|
HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
|
||||||
|
Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
|
||||||
|
Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
|
||||||
|
Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
|
||||||
|
Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
|
||||||
|
Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
|
||||||
|
Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
|
||||||
|
Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
|
||||||
|
Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates many regions names "aaa" to "zzz".
|
* Creates many regions names "aaa" to "zzz".
|
||||||
* @param c Configuration to use.
|
* @param c Configuration to use.
|
||||||
|
@ -620,17 +632,6 @@ public class HBaseTestingUtility {
|
||||||
public int createMultiRegions(final Configuration c, final HTable table,
|
public int createMultiRegions(final Configuration c, final HTable table,
|
||||||
final byte[] columnFamily)
|
final byte[] columnFamily)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
byte[][] KEYS = {
|
|
||||||
HConstants.EMPTY_BYTE_ARRAY, Bytes.toBytes("bbb"),
|
|
||||||
Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
|
|
||||||
Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
|
|
||||||
Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"),
|
|
||||||
Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
|
|
||||||
Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
|
|
||||||
Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
|
|
||||||
Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"),
|
|
||||||
Bytes.toBytes("xxx"), Bytes.toBytes("yyy")
|
|
||||||
};
|
|
||||||
return createMultiRegions(c, table, columnFamily, KEYS);
|
return createMultiRegions(c, table, columnFamily, KEYS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -963,16 +964,18 @@ public class HBaseTestingUtility {
|
||||||
* Make sure that at least the specified number of region servers
|
* Make sure that at least the specified number of region servers
|
||||||
* are running
|
* are running
|
||||||
* @param num minimum number of region servers that should be running
|
* @param num minimum number of region servers that should be running
|
||||||
|
* @return True if we started some servers
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void ensureSomeRegionServersAvailable(final int num)
|
public boolean ensureSomeRegionServersAvailable(final int num)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (this.getHBaseCluster().getLiveRegionServerThreads().size() < num) {
|
if (this.getHBaseCluster().getLiveRegionServerThreads().size() < num) {
|
||||||
// Need at least "num" servers.
|
// Need at least "num" servers.
|
||||||
LOG.info("Started new server=" +
|
LOG.info("Started new server=" +
|
||||||
this.getHBaseCluster().startRegionServer());
|
this.getHBaseCluster().startRegionServer());
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -223,6 +223,8 @@ public class MiniHBaseCluster {
|
||||||
try {
|
try {
|
||||||
LOG.info("Hook closing fs=" + this.fs);
|
LOG.info("Hook closing fs=" + this.fs);
|
||||||
this.fs.close();
|
this.fs.close();
|
||||||
|
} catch (NullPointerException npe) {
|
||||||
|
LOG.debug("Need to fix these: " + npe.toString());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Running hook", e);
|
LOG.warn("Running hook", e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,247 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2010 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.catalog;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.HServerAddress;
|
||||||
|
import org.apache.hadoop.hbase.HServerInfo;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.HConnection;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.apache.hadoop.hbase.util.Writables;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test {@link CatalogTracker}
|
||||||
|
*/
|
||||||
|
public class TestCatalogTracker {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestCatalogTracker.class);
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
private static final HServerAddress HSA =
|
||||||
|
new HServerAddress("example.org:1234");
|
||||||
|
private ZooKeeperWatcher watcher;
|
||||||
|
private Abortable abortable;
|
||||||
|
|
||||||
|
@BeforeClass public static void beforeClass() throws Exception {
|
||||||
|
UTIL.startMiniZKCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass public static void afterClass() throws IOException {
|
||||||
|
UTIL.getZkCluster().shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before public void before() throws IOException {
|
||||||
|
this.abortable = new Abortable() {
|
||||||
|
@Override
|
||||||
|
public void abort(String why, Throwable e) {
|
||||||
|
LOG.info(why, e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
this.watcher = new ZooKeeperWatcher(UTIL.getConfiguration(),
|
||||||
|
this.getClass().getSimpleName(), this.abortable);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After public void after() {
|
||||||
|
this.watcher.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private CatalogTracker constructAndStartCatalogTracker()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
return constructAndStartCatalogTracker(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private CatalogTracker constructAndStartCatalogTracker(final HConnection c)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
CatalogTracker ct = new CatalogTracker(this.watcher, c, this.abortable);
|
||||||
|
ct.start();
|
||||||
|
return ct;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (expected = NotAllMetaRegionsOnlineException.class)
|
||||||
|
public void testTimeoutWaitForRoot()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
final CatalogTracker ct = constructAndStartCatalogTracker();
|
||||||
|
ct.waitForRoot(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (expected = NotAllMetaRegionsOnlineException.class)
|
||||||
|
public void testTimeoutWaitForMeta()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
final CatalogTracker ct = constructAndStartCatalogTracker();
|
||||||
|
ct.waitForMeta(100);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test waiting on root w/ no timeout specified.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
* @throws KeeperException
|
||||||
|
*/
|
||||||
|
@Test public void testNoTimeoutWaitForRoot()
|
||||||
|
throws IOException, InterruptedException, KeeperException {
|
||||||
|
final CatalogTracker ct = constructAndStartCatalogTracker();
|
||||||
|
HServerAddress hsa = ct.getRootLocation();
|
||||||
|
Assert.assertNull(hsa);
|
||||||
|
|
||||||
|
// Now test waiting on root location getting set.
|
||||||
|
Thread t = new WaitOnMetaThread(ct);
|
||||||
|
startWaitAliveThenWaitItLives(t, 1000);
|
||||||
|
// Set a root location.
|
||||||
|
hsa = setRootLocation();
|
||||||
|
// Join the thread... should exit shortly.
|
||||||
|
t.join();
|
||||||
|
// Now root is available.
|
||||||
|
Assert.assertTrue(ct.getRootLocation().equals(hsa));
|
||||||
|
}
|
||||||
|
|
||||||
|
private HServerAddress setRootLocation() throws KeeperException {
|
||||||
|
RootLocationEditor.setRootLocation(this.watcher, HSA);
|
||||||
|
return HSA;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test waiting on meta w/ no timeout specified.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
* @throws KeeperException
|
||||||
|
*/
|
||||||
|
@Test public void testNoTimeoutWaitForMeta()
|
||||||
|
throws IOException, InterruptedException, KeeperException {
|
||||||
|
// Mock an HConnection and a HRegionInterface implementation. Have the
|
||||||
|
// HConnection return the HRI. Have the HRI return a few mocked up responses
|
||||||
|
// to make our test work.
|
||||||
|
HConnection connection = Mockito.mock(HConnection.class);
|
||||||
|
HRegionInterface mockHRI = Mockito.mock(HRegionInterface.class);
|
||||||
|
// Make the HRI return an answer no matter how Get is called. Same for
|
||||||
|
// getHRegionInfo. Thats enough for this test.
|
||||||
|
Mockito.when(connection.getHRegionConnection((HServerAddress)Mockito.any(), Mockito.anyBoolean())).
|
||||||
|
thenReturn(mockHRI);
|
||||||
|
|
||||||
|
final CatalogTracker ct = constructAndStartCatalogTracker(connection);
|
||||||
|
HServerAddress hsa = ct.getMetaLocation();
|
||||||
|
Assert.assertNull(hsa);
|
||||||
|
|
||||||
|
// Now test waiting on meta location getting set.
|
||||||
|
Thread t = new WaitOnMetaThread(ct) {
|
||||||
|
@Override
|
||||||
|
void doWaiting() throws InterruptedException {
|
||||||
|
this.ct.waitForMeta();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
startWaitAliveThenWaitItLives(t, 1000);
|
||||||
|
|
||||||
|
// Now the ct is up... set into the mocks some answers that make it look
|
||||||
|
// like things have been getting assigned. Make it so we'll return a
|
||||||
|
// location (no matter what the Get is). Same for getHRegionInfo -- always
|
||||||
|
// just return the meta region.
|
||||||
|
List<KeyValue> kvs = new ArrayList<KeyValue>();
|
||||||
|
kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
|
||||||
|
HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
|
||||||
|
Bytes.toBytes(HSA.toString())));
|
||||||
|
final Result result = new Result(kvs);
|
||||||
|
Mockito.when(mockHRI.get((byte [])Mockito.any(), (Get)Mockito.any())).
|
||||||
|
thenReturn(result);
|
||||||
|
Mockito.when(mockHRI.getRegionInfo((byte [])Mockito.any())).
|
||||||
|
thenReturn(HRegionInfo.FIRST_META_REGIONINFO);
|
||||||
|
// This should trigger wake up of meta wait (Its the removal of the meta
|
||||||
|
// region unassigned node that triggers catalogtrackers that a meta has
|
||||||
|
// been assigned.
|
||||||
|
String node = ct.getMetaNodeTracker().getNode();
|
||||||
|
ZKUtil.createAndFailSilent(this.watcher, node);
|
||||||
|
MetaEditor.updateMetaLocation(ct, HRegionInfo.FIRST_META_REGIONINFO,
|
||||||
|
new HServerInfo(HSA, -1, "example.com"));
|
||||||
|
ZKUtil.deleteNode(this.watcher, node);
|
||||||
|
// Join the thread... should exit shortly.
|
||||||
|
t.join();
|
||||||
|
// Now meta is available.
|
||||||
|
Assert.assertTrue(ct.getMetaLocation().equals(HSA));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void startWaitAliveThenWaitItLives(final Thread t, final int ms) {
|
||||||
|
t.start();
|
||||||
|
while(!t.isAlive()) {
|
||||||
|
// Wait
|
||||||
|
}
|
||||||
|
// Wait one second.
|
||||||
|
Threads.sleep(ms);
|
||||||
|
Assert.assertTrue("Assert " + t.getName() + " still waiting", t.isAlive());
|
||||||
|
}
|
||||||
|
|
||||||
|
class CountingProgressable implements Progressable {
|
||||||
|
final AtomicInteger counter = new AtomicInteger(0);
|
||||||
|
@Override
|
||||||
|
public void progress() {
|
||||||
|
this.counter.incrementAndGet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wait on META.
|
||||||
|
* Default is wait on -ROOT-.
|
||||||
|
*/
|
||||||
|
class WaitOnMetaThread extends Thread {
|
||||||
|
final CatalogTracker ct;
|
||||||
|
|
||||||
|
WaitOnMetaThread(final CatalogTracker ct) {
|
||||||
|
super("WaitOnMeta");
|
||||||
|
this.ct = ct;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
doWaiting();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
throw new RuntimeException("Failed wait on root", e);
|
||||||
|
}
|
||||||
|
LOG.info("Exiting " + getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
void doWaiting() throws InterruptedException {
|
||||||
|
this.ct.waitForRoot();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,54 +17,67 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
|
||||||
import org.apache.hadoop.hbase.client.Row;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
public class TestMultiParallel {
|
||||||
import java.util.List;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
|
|
||||||
public class TestMultiParallel extends MultiRegionTable {
|
|
||||||
// This test needs to be rewritten to use HBaseTestingUtility -- St.Ack 20100910
|
|
||||||
private static final Log LOG = LogFactory.getLog(TestMultiParallel.class);
|
private static final Log LOG = LogFactory.getLog(TestMultiParallel.class);
|
||||||
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
private static final byte[] VALUE = Bytes.toBytes("value");
|
private static final byte[] VALUE = Bytes.toBytes("value");
|
||||||
private static final byte[] QUALIFIER = Bytes.toBytes("qual");
|
private static final byte[] QUALIFIER = Bytes.toBytes("qual");
|
||||||
private static final String FAMILY = "family";
|
private static final String FAMILY = "family";
|
||||||
private static final String TEST_TABLE = "multi_test_table";
|
private static final String TEST_TABLE = "multi_test_table";
|
||||||
private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
|
private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
|
||||||
private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
|
private static final byte[] ONE_ROW = Bytes.toBytes("xxx");
|
||||||
|
private static final byte [][] KEYS = makeKeys();
|
||||||
|
|
||||||
List<byte[]> keys = new ArrayList<byte[]>();
|
@BeforeClass public static void beforeClass() throws Exception {
|
||||||
|
UTIL.startMiniCluster(2);
|
||||||
public TestMultiParallel() {
|
HTable t = UTIL.createTable(Bytes.toBytes(TEST_TABLE), Bytes.toBytes(FAMILY));
|
||||||
super(2, FAMILY);
|
UTIL.createMultiRegions(t, Bytes.toBytes(FAMILY));
|
||||||
desc = new HTableDescriptor(TEST_TABLE);
|
|
||||||
desc.addFamily(new HColumnDescriptor(FAMILY));
|
|
||||||
makeKeys();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void makeKeys() {
|
@AfterClass public static void afterClass() throws IOException {
|
||||||
|
UTIL.getMiniHBaseCluster().shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before public void before() throws IOException {
|
||||||
|
LOG.info("before");
|
||||||
|
if (UTIL.ensureSomeRegionServersAvailable(2)) {
|
||||||
|
// Distribute regions
|
||||||
|
UTIL.getMiniHBaseCluster().getMaster().balance();
|
||||||
|
}
|
||||||
|
LOG.info("before done");
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[][] makeKeys() {
|
||||||
|
byte [][] starterKeys = HBaseTestingUtility.KEYS;
|
||||||
// Create a "non-uniform" test set with the following characteristics:
|
// Create a "non-uniform" test set with the following characteristics:
|
||||||
// a) Unequal number of keys per region
|
// a) Unequal number of keys per region
|
||||||
|
|
||||||
// Don't use integer as a multiple, so that we have a number of keys that is
|
// Don't use integer as a multiple, so that we have a number of keys that is
|
||||||
// not a multiple of the number of regions
|
// not a multiple of the number of regions
|
||||||
int numKeys = (int) ((float) KEYS.length * 10.33F);
|
int numKeys = (int) ((float) starterKeys.length * 10.33F);
|
||||||
|
|
||||||
|
List<byte[]> keys = new ArrayList<byte[]>();
|
||||||
for (int i = 0; i < numKeys; i++) {
|
for (int i = 0; i < numKeys; i++) {
|
||||||
int kIdx = i % KEYS.length;
|
int kIdx = i % starterKeys.length;
|
||||||
byte[] k = KEYS[kIdx];
|
byte[] k = starterKeys[kIdx];
|
||||||
byte[] cp = new byte[k.length + 1];
|
byte[] cp = new byte[k.length + 1];
|
||||||
System.arraycopy(k, 0, cp, 0, k.length);
|
System.arraycopy(k, 0, cp, 0, k.length);
|
||||||
cp[k.length] = new Integer(i % 256).byteValue();
|
cp[k.length] = new Integer(i % 256).byteValue();
|
||||||
|
@ -76,18 +89,19 @@ public class TestMultiParallel extends MultiRegionTable {
|
||||||
// c) keys are not in sorted order (within a region), to ensure that the
|
// c) keys are not in sorted order (within a region), to ensure that the
|
||||||
// sorting code and index mapping doesn't break the functionality
|
// sorting code and index mapping doesn't break the functionality
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
int kIdx = i % KEYS.length;
|
int kIdx = i % starterKeys.length;
|
||||||
byte[] k = KEYS[kIdx];
|
byte[] k = starterKeys[kIdx];
|
||||||
byte[] cp = new byte[k.length + 1];
|
byte[] cp = new byte[k.length + 1];
|
||||||
System.arraycopy(k, 0, cp, 0, k.length);
|
System.arraycopy(k, 0, cp, 0, k.length);
|
||||||
cp[k.length] = new Integer(i % 256).byteValue();
|
cp[k.length] = new Integer(i % 256).byteValue();
|
||||||
keys.add(cp);
|
keys.add(cp);
|
||||||
}
|
}
|
||||||
|
return keys.toArray(new byte [][] {new byte [] {}});
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testBatchWithGet() throws Exception {
|
@Test public void testBatchWithGet() throws Exception {
|
||||||
LOG.info("test=testBatchWithGet");
|
LOG.info("test=testBatchWithGet");
|
||||||
HTable table = new HTable(conf, TEST_TABLE);
|
HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
|
||||||
|
|
||||||
// load test data
|
// load test data
|
||||||
List<Row> puts = constructPutRequests();
|
List<Row> puts = constructPutRequests();
|
||||||
|
@ -95,7 +109,7 @@ public class TestMultiParallel extends MultiRegionTable {
|
||||||
|
|
||||||
// create a list of gets and run it
|
// create a list of gets and run it
|
||||||
List<Row> gets = new ArrayList<Row>();
|
List<Row> gets = new ArrayList<Row>();
|
||||||
for (byte[] k : keys) {
|
for (byte[] k : KEYS) {
|
||||||
Get get = new Get(k);
|
Get get = new Get(k);
|
||||||
get.addColumn(BYTES_FAMILY, QUALIFIER);
|
get.addColumn(BYTES_FAMILY, QUALIFIER);
|
||||||
gets.add(get);
|
gets.add(get);
|
||||||
|
@ -110,14 +124,14 @@ public class TestMultiParallel extends MultiRegionTable {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compare results
|
// Compare results
|
||||||
assertEquals(singleRes.size(), multiRes.length);
|
Assert.assertEquals(singleRes.size(), multiRes.length);
|
||||||
for (int i = 0; i < singleRes.size(); i++) {
|
for (int i = 0; i < singleRes.size(); i++) {
|
||||||
assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER));
|
Assert.assertTrue(singleRes.get(i).containsColumn(BYTES_FAMILY, QUALIFIER));
|
||||||
KeyValue[] singleKvs = singleRes.get(i).raw();
|
KeyValue[] singleKvs = singleRes.get(i).raw();
|
||||||
KeyValue[] multiKvs = multiRes[i].raw();
|
KeyValue[] multiKvs = multiRes[i].raw();
|
||||||
for (int j = 0; j < singleKvs.length; j++) {
|
for (int j = 0; j < singleKvs.length; j++) {
|
||||||
assertEquals(singleKvs[j], multiKvs[j]);
|
Assert.assertEquals(singleKvs[j], multiKvs[j]);
|
||||||
assertEquals(0, Bytes.compareTo(singleKvs[j].getValue(), multiKvs[j]
|
Assert.assertEquals(0, Bytes.compareTo(singleKvs[j].getValue(), multiKvs[j]
|
||||||
.getValue()));
|
.getValue()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -129,32 +143,32 @@ public class TestMultiParallel extends MultiRegionTable {
|
||||||
*
|
*
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
public void testFlushCommitsWithAbort() throws Exception {
|
@Test public void testFlushCommitsWithAbort() throws Exception {
|
||||||
LOG.info("test=testFlushCommitsWithAbort");
|
LOG.info("test=testFlushCommitsWithAbort");
|
||||||
doTestFlushCommits(true);
|
doTestFlushCommits(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testFlushCommitsNoAbort() throws Exception {
|
@Test public void testFlushCommitsNoAbort() throws Exception {
|
||||||
|
LOG.info("test=testFlushCommitsNoAbort");
|
||||||
doTestFlushCommits(false);
|
doTestFlushCommits(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void doTestFlushCommits(boolean doAbort) throws Exception {
|
private void doTestFlushCommits(boolean doAbort) throws Exception {
|
||||||
LOG.info("test=doTestFlushCommits");
|
|
||||||
// Load the data
|
// Load the data
|
||||||
Configuration newconf = new Configuration(conf);
|
LOG.info("get new table");
|
||||||
newconf.setInt("hbase.client.retries.number", 10);
|
HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
|
||||||
HTable table = new HTable(newconf, TEST_TABLE);
|
|
||||||
table.setAutoFlush(false);
|
table.setAutoFlush(false);
|
||||||
table.setWriteBufferSize(10 * 1024 * 1024);
|
table.setWriteBufferSize(10 * 1024 * 1024);
|
||||||
|
|
||||||
|
LOG.info("constructPutRequests");
|
||||||
List<Row> puts = constructPutRequests();
|
List<Row> puts = constructPutRequests();
|
||||||
for (Row put : puts) {
|
for (Row put : puts) {
|
||||||
table.put((Put) put);
|
table.put((Put) put);
|
||||||
}
|
}
|
||||||
|
LOG.info("puts");
|
||||||
table.flushCommits();
|
table.flushCommits();
|
||||||
|
|
||||||
if (doAbort) {
|
if (doAbort) {
|
||||||
cluster.abortRegionServer(0);
|
LOG.info("Aborted=" + UTIL.getMiniHBaseCluster().abortRegionServer(0));
|
||||||
|
|
||||||
// try putting more keys after the abort. same key/qual... just validating
|
// try putting more keys after the abort. same key/qual... just validating
|
||||||
// no exceptions thrown
|
// no exceptions thrown
|
||||||
|
@ -166,100 +180,107 @@ public class TestMultiParallel extends MultiRegionTable {
|
||||||
table.flushCommits();
|
table.flushCommits();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOG.info("validating loaded data");
|
||||||
validateLoadedData(table);
|
validateLoadedData(table);
|
||||||
|
|
||||||
// Validate server and region count
|
// Validate server and region count
|
||||||
HBaseAdmin admin = new HBaseAdmin(conf);
|
List<JVMClusterUtil.RegionServerThread> liveRSs =
|
||||||
ClusterStatus cs = admin.getClusterStatus();
|
UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
|
||||||
assertEquals((doAbort ? 1 : 2), cs.getServers());
|
int count = 0;
|
||||||
for (HServerInfo info : cs.getServerInfo()) {
|
for (JVMClusterUtil.RegionServerThread t: liveRSs) {
|
||||||
System.out.println(info);
|
count++;
|
||||||
assertTrue(info.getLoad().getNumberOfRegions() > 10);
|
LOG.info("Count=" + count + ", Alive=" + t.getRegionServer());
|
||||||
}
|
}
|
||||||
|
LOG.info("Count=" + count);
|
||||||
|
Assert.assertEquals("Server count=" + count + ", abort=" + doAbort,
|
||||||
|
(doAbort ? 1 : 2), count);
|
||||||
|
for (JVMClusterUtil.RegionServerThread t: liveRSs) {
|
||||||
|
int regions = t.getRegionServer().getOnlineRegions().size();
|
||||||
|
Assert.assertTrue("Count of regions=" + regions, regions > 10);
|
||||||
|
}
|
||||||
|
LOG.info("done");
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testBatchWithPut() throws Exception {
|
@Test public void testBatchWithPut() throws Exception {
|
||||||
LOG.info("test=testBatchWithPut");
|
LOG.info("test=testBatchWithPut");
|
||||||
Configuration newconf = new Configuration(conf);
|
HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
|
||||||
newconf.setInt("hbase.client.retries.number", 10);
|
|
||||||
HTable table = new HTable(newconf, TEST_TABLE);
|
|
||||||
|
|
||||||
// put multiple rows using a batch
|
// put multiple rows using a batch
|
||||||
List<Row> puts = constructPutRequests();
|
List<Row> puts = constructPutRequests();
|
||||||
|
|
||||||
Result[] results = table.batch(puts);
|
Result[] results = table.batch(puts);
|
||||||
validateSizeAndEmpty(results, keys.size());
|
validateSizeAndEmpty(results, KEYS.length);
|
||||||
|
|
||||||
if (true) {
|
if (true) {
|
||||||
cluster.abortRegionServer(0);
|
UTIL.getMiniHBaseCluster().abortRegionServer(0);
|
||||||
|
|
||||||
puts = constructPutRequests();
|
puts = constructPutRequests();
|
||||||
results = table.batch(puts);
|
results = table.batch(puts);
|
||||||
validateSizeAndEmpty(results, keys.size());
|
validateSizeAndEmpty(results, KEYS.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
validateLoadedData(table);
|
validateLoadedData(table);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testBatchWithDelete() throws Exception {
|
@Test public void testBatchWithDelete() throws Exception {
|
||||||
LOG.info("test=testBatchWithDelete");
|
LOG.info("test=testBatchWithDelete");
|
||||||
HTable table = new HTable(conf, TEST_TABLE);
|
HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
|
||||||
|
|
||||||
// Load some data
|
// Load some data
|
||||||
List<Row> puts = constructPutRequests();
|
List<Row> puts = constructPutRequests();
|
||||||
Result[] results = table.batch(puts);
|
Result[] results = table.batch(puts);
|
||||||
validateSizeAndEmpty(results, keys.size());
|
validateSizeAndEmpty(results, KEYS.length);
|
||||||
|
|
||||||
// Deletes
|
// Deletes
|
||||||
List<Row> deletes = new ArrayList<Row>();
|
List<Row> deletes = new ArrayList<Row>();
|
||||||
for (int i = 0; i < keys.size(); i++) {
|
for (int i = 0; i < KEYS.length; i++) {
|
||||||
Delete delete = new Delete(keys.get(i));
|
Delete delete = new Delete(KEYS[i]);
|
||||||
delete.deleteFamily(BYTES_FAMILY);
|
delete.deleteFamily(BYTES_FAMILY);
|
||||||
deletes.add(delete);
|
deletes.add(delete);
|
||||||
}
|
}
|
||||||
results = table.batch(deletes);
|
results = table.batch(deletes);
|
||||||
validateSizeAndEmpty(results, keys.size());
|
validateSizeAndEmpty(results, KEYS.length);
|
||||||
|
|
||||||
// Get to make sure ...
|
// Get to make sure ...
|
||||||
for (byte[] k : keys) {
|
for (byte[] k : KEYS) {
|
||||||
Get get = new Get(k);
|
Get get = new Get(k);
|
||||||
get.addColumn(BYTES_FAMILY, QUALIFIER);
|
get.addColumn(BYTES_FAMILY, QUALIFIER);
|
||||||
assertFalse(table.exists(get));
|
Assert.assertFalse(table.exists(get));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testHTableDeleteWithList() throws Exception {
|
@Test public void testHTableDeleteWithList() throws Exception {
|
||||||
LOG.info("test=testHTableDeleteWithList");
|
LOG.info("test=testHTableDeleteWithList");
|
||||||
HTable table = new HTable(conf, TEST_TABLE);
|
HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
|
||||||
|
|
||||||
// Load some data
|
// Load some data
|
||||||
List<Row> puts = constructPutRequests();
|
List<Row> puts = constructPutRequests();
|
||||||
Result[] results = table.batch(puts);
|
Result[] results = table.batch(puts);
|
||||||
validateSizeAndEmpty(results, keys.size());
|
validateSizeAndEmpty(results, KEYS.length);
|
||||||
|
|
||||||
// Deletes
|
// Deletes
|
||||||
ArrayList<Delete> deletes = new ArrayList<Delete>();
|
ArrayList<Delete> deletes = new ArrayList<Delete>();
|
||||||
for (int i = 0; i < keys.size(); i++) {
|
for (int i = 0; i < KEYS.length; i++) {
|
||||||
Delete delete = new Delete(keys.get(i));
|
Delete delete = new Delete(KEYS[i]);
|
||||||
delete.deleteFamily(BYTES_FAMILY);
|
delete.deleteFamily(BYTES_FAMILY);
|
||||||
deletes.add(delete);
|
deletes.add(delete);
|
||||||
}
|
}
|
||||||
table.delete(deletes);
|
table.delete(deletes);
|
||||||
assertTrue(deletes.isEmpty());
|
Assert.assertTrue(deletes.isEmpty());
|
||||||
|
|
||||||
// Get to make sure ...
|
// Get to make sure ...
|
||||||
for (byte[] k : keys) {
|
for (byte[] k : KEYS) {
|
||||||
Get get = new Get(k);
|
Get get = new Get(k);
|
||||||
get.addColumn(BYTES_FAMILY, QUALIFIER);
|
get.addColumn(BYTES_FAMILY, QUALIFIER);
|
||||||
assertFalse(table.exists(get));
|
Assert.assertFalse(table.exists(get));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testBatchWithManyColsInOneRowGetAndPut() throws Exception {
|
@Test public void testBatchWithManyColsInOneRowGetAndPut() throws Exception {
|
||||||
LOG.info("test=testBatchWithManyColsInOneRowGetAndPut");
|
LOG.info("test=testBatchWithManyColsInOneRowGetAndPut");
|
||||||
HTable table = new HTable(conf, TEST_TABLE);
|
HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
|
||||||
|
|
||||||
List<Row> puts = new ArrayList<Row>();
|
List<Row> puts = new ArrayList<Row>();
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
|
@ -293,13 +314,13 @@ public class TestMultiParallel extends MultiRegionTable {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testBatchWithMixedActions() throws Exception {
|
@Test public void testBatchWithMixedActions() throws Exception {
|
||||||
LOG.info("test=testBatchWithMixedActions");
|
LOG.info("test=testBatchWithMixedActions");
|
||||||
HTable table = new HTable(conf, TEST_TABLE);
|
HTable table = new HTable(UTIL.getConfiguration(), TEST_TABLE);
|
||||||
|
|
||||||
// Load some data to start
|
// Load some data to start
|
||||||
Result[] results = table.batch(constructPutRequests());
|
Result[] results = table.batch(constructPutRequests());
|
||||||
validateSizeAndEmpty(results, keys.size());
|
validateSizeAndEmpty(results, KEYS.length);
|
||||||
|
|
||||||
// Batch: get, get, put(new col), delete, get, get of put, get of deleted,
|
// Batch: get, get, put(new col), delete, get, get of put, get of deleted,
|
||||||
// put
|
// put
|
||||||
|
@ -309,42 +330,42 @@ public class TestMultiParallel extends MultiRegionTable {
|
||||||
byte[] val2 = Bytes.toBytes("putvalue2");
|
byte[] val2 = Bytes.toBytes("putvalue2");
|
||||||
|
|
||||||
// 0 get
|
// 0 get
|
||||||
Get get = new Get(keys.get(10));
|
Get get = new Get(KEYS[10]);
|
||||||
get.addColumn(BYTES_FAMILY, QUALIFIER);
|
get.addColumn(BYTES_FAMILY, QUALIFIER);
|
||||||
actions.add(get);
|
actions.add(get);
|
||||||
|
|
||||||
// 1 get
|
// 1 get
|
||||||
get = new Get(keys.get(11));
|
get = new Get(KEYS[11]);
|
||||||
get.addColumn(BYTES_FAMILY, QUALIFIER);
|
get.addColumn(BYTES_FAMILY, QUALIFIER);
|
||||||
actions.add(get);
|
actions.add(get);
|
||||||
|
|
||||||
// 2 put of new column
|
// 2 put of new column
|
||||||
Put put = new Put(keys.get(10));
|
Put put = new Put(KEYS[10]);
|
||||||
put.add(BYTES_FAMILY, qual2, val2);
|
put.add(BYTES_FAMILY, qual2, val2);
|
||||||
actions.add(put);
|
actions.add(put);
|
||||||
|
|
||||||
// 3 delete
|
// 3 delete
|
||||||
Delete delete = new Delete(keys.get(20));
|
Delete delete = new Delete(KEYS[20]);
|
||||||
delete.deleteFamily(BYTES_FAMILY);
|
delete.deleteFamily(BYTES_FAMILY);
|
||||||
actions.add(delete);
|
actions.add(delete);
|
||||||
|
|
||||||
// 4 get
|
// 4 get
|
||||||
get = new Get(keys.get(30));
|
get = new Get(KEYS[30]);
|
||||||
get.addColumn(BYTES_FAMILY, QUALIFIER);
|
get.addColumn(BYTES_FAMILY, QUALIFIER);
|
||||||
actions.add(get);
|
actions.add(get);
|
||||||
|
|
||||||
// 5 get of the put in #2 (entire family)
|
// 5 get of the put in #2 (entire family)
|
||||||
get = new Get(keys.get(10));
|
get = new Get(KEYS[10]);
|
||||||
get.addFamily(BYTES_FAMILY);
|
get.addFamily(BYTES_FAMILY);
|
||||||
actions.add(get);
|
actions.add(get);
|
||||||
|
|
||||||
// 6 get of the delete from #3
|
// 6 get of the delete from #3
|
||||||
get = new Get(keys.get(20));
|
get = new Get(KEYS[20]);
|
||||||
get.addColumn(BYTES_FAMILY, QUALIFIER);
|
get.addColumn(BYTES_FAMILY, QUALIFIER);
|
||||||
actions.add(get);
|
actions.add(get);
|
||||||
|
|
||||||
// 7 put of new column
|
// 7 put of new column
|
||||||
put = new Put(keys.get(40));
|
put = new Put(KEYS[40]);
|
||||||
put.add(BYTES_FAMILY, qual2, val2);
|
put.add(BYTES_FAMILY, qual2, val2);
|
||||||
actions.add(put);
|
actions.add(put);
|
||||||
|
|
||||||
|
@ -363,7 +384,7 @@ public class TestMultiParallel extends MultiRegionTable {
|
||||||
validateEmpty(results[7]);
|
validateEmpty(results[7]);
|
||||||
|
|
||||||
// validate last put, externally from the batch
|
// validate last put, externally from the batch
|
||||||
get = new Get(keys.get(40));
|
get = new Get(KEYS[40]);
|
||||||
get.addColumn(BYTES_FAMILY, qual2);
|
get.addColumn(BYTES_FAMILY, qual2);
|
||||||
Result r = table.get(get);
|
Result r = table.get(get);
|
||||||
validateResult(r, qual2, val2);
|
validateResult(r, qual2, val2);
|
||||||
|
@ -376,13 +397,13 @@ public class TestMultiParallel extends MultiRegionTable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void validateResult(Result r, byte[] qual, byte[] val) {
|
private void validateResult(Result r, byte[] qual, byte[] val) {
|
||||||
assertTrue(r.containsColumn(BYTES_FAMILY, qual));
|
Assert.assertTrue(r.containsColumn(BYTES_FAMILY, qual));
|
||||||
assertEquals(0, Bytes.compareTo(val, r.getValue(BYTES_FAMILY, qual)));
|
Assert.assertEquals(0, Bytes.compareTo(val, r.getValue(BYTES_FAMILY, qual)));
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<Row> constructPutRequests() {
|
private List<Row> constructPutRequests() {
|
||||||
List<Row> puts = new ArrayList<Row>();
|
List<Row> puts = new ArrayList<Row>();
|
||||||
for (byte[] k : keys) {
|
for (byte[] k : KEYS) {
|
||||||
Put put = new Put(k);
|
Put put = new Put(k);
|
||||||
put.add(BYTES_FAMILY, QUALIFIER, VALUE);
|
put.add(BYTES_FAMILY, QUALIFIER, VALUE);
|
||||||
puts.add(put);
|
puts.add(put);
|
||||||
|
@ -392,28 +413,28 @@ public class TestMultiParallel extends MultiRegionTable {
|
||||||
|
|
||||||
private void validateLoadedData(HTable table) throws IOException {
|
private void validateLoadedData(HTable table) throws IOException {
|
||||||
// get the data back and validate that it is correct
|
// get the data back and validate that it is correct
|
||||||
for (byte[] k : keys) {
|
for (byte[] k : KEYS) {
|
||||||
|
LOG.info("Assert=" + Bytes.toString(k));
|
||||||
Get get = new Get(k);
|
Get get = new Get(k);
|
||||||
get.addColumn(BYTES_FAMILY, QUALIFIER);
|
get.addColumn(BYTES_FAMILY, QUALIFIER);
|
||||||
Result r = table.get(get);
|
Result r = table.get(get);
|
||||||
assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
|
Assert.assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
|
||||||
assertEquals(0, Bytes.compareTo(VALUE, r
|
Assert.assertEquals(0, Bytes.compareTo(VALUE, r
|
||||||
.getValue(BYTES_FAMILY, QUALIFIER)));
|
.getValue(BYTES_FAMILY, QUALIFIER)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void validateEmpty(Result result) {
|
private void validateEmpty(Result result) {
|
||||||
assertTrue(result != null);
|
Assert.assertTrue(result != null);
|
||||||
assertTrue(result.getRow() == null);
|
Assert.assertTrue(result.getRow() == null);
|
||||||
assertEquals(0, result.raw().length);
|
Assert.assertEquals(0, result.raw().length);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void validateSizeAndEmpty(Result[] results, int expectedSize) {
|
private void validateSizeAndEmpty(Result[] results, int expectedSize) {
|
||||||
// Validate got back the same number of Result objects, all empty
|
// Validate got back the same number of Result objects, all empty
|
||||||
assertEquals(expectedSize, results.length);
|
Assert.assertEquals(expectedSize, results.length);
|
||||||
for (Result result : results) {
|
for (Result result : results) {
|
||||||
validateEmpty(result);
|
validateEmpty(result);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue