HBASE-5328 Small changes to Master to make it more testable
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1303626 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8105f90937
commit
327e2d1f74
|
@ -185,7 +185,7 @@ public class CatalogTracker {
|
|||
this(zk, conf, HConnectionManager.getConnection(conf), abortable, defaultTimeout);
|
||||
}
|
||||
|
||||
CatalogTracker(final ZooKeeperWatcher zk, final Configuration conf,
|
||||
public CatalogTracker(final ZooKeeperWatcher zk, final Configuration conf,
|
||||
HConnection connection, Abortable abortable, final int defaultTimeout)
|
||||
throws IOException {
|
||||
this.connection = connection;
|
||||
|
@ -309,16 +309,6 @@ public class CatalogTracker {
|
|||
return sn != null? sn: MetaReader.getMetaRegionLocation(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits indefinitely for availability of <code>-ROOT-</code>. Used during
|
||||
* cluster startup.
|
||||
* @throws InterruptedException if interrupted while waiting
|
||||
*/
|
||||
public void waitForRoot()
|
||||
throws InterruptedException {
|
||||
this.rootRegionTracker.blockUntilAvailable();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the current location for <code>-ROOT-</code> if available and waits
|
||||
* for up to the specified timeout if not immediately available. Returns null
|
||||
|
@ -330,7 +320,7 @@ public class CatalogTracker {
|
|||
* @throws NotAllMetaRegionsOnlineException if root not available before
|
||||
* timeout
|
||||
*/
|
||||
ServerName waitForRoot(final long timeout)
|
||||
public ServerName waitForRoot(final long timeout)
|
||||
throws InterruptedException, NotAllMetaRegionsOnlineException {
|
||||
ServerName sn = rootRegionTracker.waitRootRegionLocation(timeout);
|
||||
if (sn == null) {
|
||||
|
|
|
@ -346,6 +346,9 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
// Returns servers who have not checked in (assumed dead) and their regions
|
||||
Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers = rebuildUserRegions(onlineServers);
|
||||
|
||||
// This method will assign all user regions if a clean server startup or
|
||||
// it will reconstitute master state and cleanup any leftovers from
|
||||
// previous master process.
|
||||
processDeadServersAndRegionsInTransition(deadServers);
|
||||
|
||||
// Recover the tables that were not fully moved to DISABLED state.
|
||||
|
@ -380,7 +383,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
/**
|
||||
* Process all regions that are in transition in zookeeper and also
|
||||
* processes the list of dead servers by scanning the META.
|
||||
* Used by master joining an cluster.
|
||||
* Used by master joining an cluster. If we figure this is a clean cluster
|
||||
* startup, will assign all user regions.
|
||||
* @param deadServers
|
||||
* Map of dead servers and their regions. Can be null.
|
||||
* @throws KeeperException
|
||||
|
@ -395,8 +399,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
// Run through all regions. If they are not assigned and not in RIT, then
|
||||
// its a clean cluster startup, else its a failover.
|
||||
for (Map.Entry<HRegionInfo, ServerName> e: this.regions.entrySet()) {
|
||||
if (!e.getKey().isMetaTable()
|
||||
&& e.getValue() != null) {
|
||||
if (!e.getKey().isMetaTable() && e.getValue() != null) {
|
||||
LOG.debug("Found " + e + " out on cluster");
|
||||
this.failover = true;
|
||||
break;
|
||||
|
@ -2127,7 +2130,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
public void waitForAssignment(HRegionInfo regionInfo)
|
||||
throws InterruptedException {
|
||||
synchronized(regions) {
|
||||
while(!regions.containsKey(regionInfo)) {
|
||||
while(!this.master.isStopped() && !regions.containsKey(regionInfo)) {
|
||||
// We should receive a notification, but it's
|
||||
// better to have a timeout to recheck the condition here:
|
||||
// it lowers the impact of a race condition if any
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.lang.reflect.Constructor;
|
|||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
@ -43,6 +44,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.Chore;
|
||||
import org.apache.hadoop.hbase.ClusterStatus;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
|
@ -51,6 +53,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.HServerLoad;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
|
||||
import org.apache.hadoop.hbase.PleaseHoldException;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
|
@ -58,6 +61,7 @@ import org.apache.hadoop.hbase.TableDescriptors;
|
|||
import org.apache.hadoop.hbase.TableNotDisabledException;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.UnknownRegionException;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.catalog.MetaReader;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
|
@ -154,6 +158,9 @@ Server {
|
|||
|
||||
// RPC server for the HMaster
|
||||
private final RpcServer rpcServer;
|
||||
// Set after we've called HBaseServer#openServer and ready to receive RPCs.
|
||||
// Set back to false after we stop rpcServer. Used by tests.
|
||||
private volatile boolean rpcServerOpen = false;
|
||||
|
||||
/**
|
||||
* This servers address.
|
||||
|
@ -290,17 +297,18 @@ Server {
|
|||
this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true);
|
||||
this.rpcServer.startThreads();
|
||||
this.metrics = new MasterMetrics(getServerName().toString());
|
||||
// initialize instant schema change settings
|
||||
this.supportInstantSchemaChanges = conf.getBoolean(
|
||||
"hbase.instant.schema.alter.enabled", false);
|
||||
if (supportInstantSchemaChanges) {
|
||||
LOG.info("Instant schema change enabled. All schema alter operations will " +
|
||||
"happen through ZK.");
|
||||
}
|
||||
else {
|
||||
LOG.info("Instant schema change disabled. All schema alter operations will " +
|
||||
"happen normally.");
|
||||
}
|
||||
this.supportInstantSchemaChanges = getSupportInstantSchemaChanges(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get whether instant schema change is on or not.
|
||||
* @param c
|
||||
* @return True if instant schema enabled.
|
||||
*/
|
||||
private boolean getSupportInstantSchemaChanges(final Configuration c) {
|
||||
boolean b = c.getBoolean("hbase.instant.schema.alter.enabled", false);
|
||||
LOG.debug("Instant schema change enabled=" + b + ".");
|
||||
return b;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -418,7 +426,7 @@ Server {
|
|||
*/
|
||||
private void initializeZKBasedSystemTrackers() throws IOException,
|
||||
InterruptedException, KeeperException {
|
||||
this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf,
|
||||
this.catalogTracker = createCatalogTracker(this.zooKeeper, this.conf,
|
||||
this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE));
|
||||
this.catalogTracker.start();
|
||||
|
||||
|
@ -452,8 +460,27 @@ Server {
|
|||
", cluster-up flag was=" + wasUp);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create CatalogTracker.
|
||||
* In its own method so can intercept and mock it over in tests.
|
||||
* @param zk If zk is null, we'll create an instance (and shut it down
|
||||
* when {@link #stop()} is called) else we'll use what is passed.
|
||||
* @param conf
|
||||
* @param abortable If fatal exception we'll call abort on this. May be null.
|
||||
* If it is we'll use the Connection associated with the passed
|
||||
* {@link Configuration} as our {@link Abortable}.
|
||||
* @param defaultTimeout Timeout to use. Pass zero for no timeout
|
||||
* ({@link Object#wait(long)} when passed a <code>0</code> waits for ever).
|
||||
* @throws IOException
|
||||
*/
|
||||
CatalogTracker createCatalogTracker(final ZooKeeperWatcher zk,
|
||||
final Configuration conf, Abortable abortable, final int defaultTimeout)
|
||||
throws IOException {
|
||||
return new CatalogTracker(zk, conf, abortable, defaultTimeout);
|
||||
}
|
||||
|
||||
// Check if we should stop every second.
|
||||
private Sleeper stopSleeper = new Sleeper(1000, this);
|
||||
private Sleeper stopSleeper = new Sleeper(100, this);
|
||||
private void loop() {
|
||||
while (!this.stopped) {
|
||||
stopSleeper.sleep();
|
||||
|
@ -505,7 +532,7 @@ Server {
|
|||
|
||||
this.executorService = new ExecutorService(getServerName().toString());
|
||||
|
||||
this.serverManager = new ServerManager(this, this);
|
||||
this.serverManager = createServerManager(this, this);
|
||||
|
||||
status.setStatus("Initializing ZK system trackers");
|
||||
initializeZKBasedSystemTrackers();
|
||||
|
@ -537,7 +564,7 @@ Server {
|
|||
splitLogAfterStartup(this.fileSystemManager, onlineServers);
|
||||
|
||||
// Make sure root and meta assigned before proceeding.
|
||||
assignRootAndMeta(status);
|
||||
if (!assignRootAndMeta(status)) return;
|
||||
serverShutdownHandlerEnabled = true;
|
||||
this.serverManager.expireDeadNotExpiredServers();
|
||||
|
||||
|
@ -595,15 +622,31 @@ Server {
|
|||
mfs.splitLogAfterStartup(onlineServers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link ServerManager} instance.
|
||||
* @param master
|
||||
* @param services
|
||||
* @return An instance of {@link ServerManager}
|
||||
* @throws ZooKeeperConnectionException
|
||||
* @throws IOException
|
||||
*/
|
||||
ServerManager createServerManager(final Server master,
|
||||
final MasterServices services)
|
||||
throws IOException {
|
||||
// We put this out here in a method so can do a Mockito.spy and stub it out
|
||||
// w/ a mocked up ServerManager.
|
||||
return new ServerManager(master, services);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check <code>-ROOT-</code> and <code>.META.</code> are assigned. If not,
|
||||
* assign them.
|
||||
* @throws InterruptedException
|
||||
* @throws IOException
|
||||
* @throws KeeperException
|
||||
* @return Count of regions we assigned.
|
||||
* @return True if root and meta are healthy, assigned
|
||||
*/
|
||||
int assignRootAndMeta(MonitoredTask status)
|
||||
boolean assignRootAndMeta(MonitoredTask status)
|
||||
throws InterruptedException, IOException, KeeperException {
|
||||
int assigned = 0;
|
||||
long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
|
||||
|
@ -617,8 +660,9 @@ Server {
|
|||
currentRootServer = this.catalogTracker.getRootLocation();
|
||||
splitLogAndExpireIfOnline(currentRootServer);
|
||||
this.assignmentManager.assignRoot();
|
||||
this.catalogTracker.waitForRoot();
|
||||
//This guarantees that the transition has completed
|
||||
// Make sure a -ROOT- location is set.
|
||||
if (!isRootLocation()) return false;
|
||||
// This guarantees that the transition assigning -ROOT- has completed
|
||||
this.assignmentManager.waitForAssignment(HRegionInfo.ROOT_REGIONINFO);
|
||||
assigned++;
|
||||
} else {
|
||||
|
@ -629,6 +673,8 @@ Server {
|
|||
// Enable the ROOT table if on process fail over the RS containing ROOT
|
||||
// was active.
|
||||
enableCatalogTables(Bytes.toString(HConstants.ROOT_TABLE_NAME));
|
||||
// Check for stopped, just in case
|
||||
if (this.stopped) return false;
|
||||
LOG.info("-ROOT- assigned=" + assigned + ", rit=" + rit +
|
||||
", location=" + catalogTracker.getRootLocation());
|
||||
|
||||
|
@ -658,7 +704,25 @@ Server {
|
|||
LOG.info(".META. assigned=" + assigned + ", rit=" + rit +
|
||||
", location=" + catalogTracker.getMetaLocation());
|
||||
status.setStatus("META and ROOT assigned.");
|
||||
return assigned;
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return True if there a root available
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
private boolean isRootLocation() throws InterruptedException {
|
||||
// Cycle up here in master rather than down in catalogtracker so we can
|
||||
// check the master stopped flag every so often.
|
||||
while (!this.stopped) {
|
||||
try {
|
||||
if (this.catalogTracker.waitForRoot(100) != null) break;
|
||||
} catch (NotAllMetaRegionsOnlineException e) {
|
||||
// Ignore. I know -ROOT- is not online yet.
|
||||
}
|
||||
}
|
||||
// We got here because we came of above loop.
|
||||
return !this.stopped;
|
||||
}
|
||||
|
||||
private void enableCatalogTables(String catalogTableName) {
|
||||
|
@ -793,7 +857,7 @@ Server {
|
|||
* as OOMEs; it should be lightly loaded. See what HRegionServer does if
|
||||
* need to install an unexpected exception handler.
|
||||
*/
|
||||
private void startServiceThreads() throws IOException{
|
||||
void startServiceThreads() throws IOException{
|
||||
|
||||
// Start the executor service pools
|
||||
this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
|
||||
|
@ -831,10 +895,18 @@ Server {
|
|||
|
||||
// Start allowing requests to happen.
|
||||
this.rpcServer.openServer();
|
||||
this.rpcServerOpen = true;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Started service threads");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this when trying to figure when its ok to send in rpcs. Used by tests.
|
||||
* @return True if we have successfully run {@link HBaseServer#openServer()}
|
||||
*/
|
||||
boolean isRpcServerOpen() {
|
||||
return this.rpcServerOpen;
|
||||
}
|
||||
|
||||
private void stopServiceThreads() {
|
||||
|
@ -842,6 +914,7 @@ Server {
|
|||
LOG.debug("Stopping service threads");
|
||||
}
|
||||
if (this.rpcServer != null) this.rpcServer.stop();
|
||||
this.rpcServerOpen = false;
|
||||
// Clean up and close up shop
|
||||
if (this.logCleaner!= null) this.logCleaner.interrupt();
|
||||
if (this.infoServer != null) {
|
||||
|
@ -908,7 +981,7 @@ Server {
|
|||
final long serverStartCode, final long serverCurrentTime)
|
||||
throws IOException {
|
||||
// Register with server manager
|
||||
InetAddress ia = HBaseServer.getRemoteIp();
|
||||
InetAddress ia = getRemoteInetAddress(port, serverStartCode);
|
||||
ServerName rs = this.serverManager.regionServerStartup(ia, port,
|
||||
serverStartCode, serverCurrentTime);
|
||||
// Send back some config info
|
||||
|
@ -918,6 +991,17 @@ Server {
|
|||
return mw;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Get remote side's InetAddress
|
||||
* @throws UnknownHostException
|
||||
*/
|
||||
InetAddress getRemoteInetAddress(final int port, final long serverStartCode)
|
||||
throws UnknownHostException {
|
||||
// Do it out here in its own little method so can fake an address when
|
||||
// mocking up in tests.
|
||||
return HBaseServer.getRemoteIp();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Subset of configuration to pass initializing regionservers: e.g.
|
||||
* the filesystem to use and root directory to use.
|
||||
|
@ -1255,8 +1339,6 @@ Server {
|
|||
LOG.debug("Getting AlterStatus from SchemaChangeTracker for table = "
|
||||
+ Bytes.toString(tableName) + " Alter Status = "
|
||||
+ alterStatus.toString());
|
||||
int numberPending = alterStatus.getNumberOfRegionsToProcess() -
|
||||
alterStatus.getNumberOfRegionsProcessed();
|
||||
return new Pair<Integer, Integer>(alterStatus.getNumberOfRegionsProcessed(),
|
||||
alterStatus.getNumberOfRegionsToProcess());
|
||||
} else {
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
|
||||
import org.apache.hadoop.hbase.util.Addressing;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* Tracks the root region server location node in zookeeper.
|
||||
|
|
|
@ -523,7 +523,9 @@ public class ZKUtil {
|
|||
logRetrievedMsg(zkw, znode, data, watcherSet);
|
||||
return data;
|
||||
} catch (KeeperException.NoNodeException e) {
|
||||
LOG.debug(zkw.prefix("Unable to get data of znode " + znode + " " +
|
||||
// This log can get pretty annoying when we cycle on 100ms waits.
|
||||
// Enable trace if you really want to see it.
|
||||
LOG.trace(zkw.prefix("Unable to get data of znode " + znode + " " +
|
||||
"because node does not exist (not an error)"));
|
||||
return null;
|
||||
} catch (KeeperException e) {
|
||||
|
|
|
@ -22,9 +22,7 @@ package org.apache.hadoop.hbase.zookeeper;
|
|||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
|
@ -80,9 +78,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
|||
// negotiation to complete
|
||||
public CountDownLatch saslLatch = new CountDownLatch(1);
|
||||
|
||||
// set of unassigned nodes watched
|
||||
private Set<String> unassignedNodes = new HashSet<String>();
|
||||
|
||||
// node names
|
||||
|
||||
// base znode for this cluster
|
||||
|
@ -179,10 +174,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean isFinishedRetryingRecoverable(final long finished) {
|
||||
return System.currentTimeMillis() < finished;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.identifier;
|
||||
|
|
|
@ -340,7 +340,7 @@ public class TestCatalogTracker {
|
|||
|
||||
// Now test waiting on root location getting set.
|
||||
Thread t = new WaitOnMetaThread(ct);
|
||||
startWaitAliveThenWaitItLives(t, 1000);
|
||||
startWaitAliveThenWaitItLives(t, 1);
|
||||
// Set a root location.
|
||||
hsa = setRootLocation();
|
||||
// Join the thread... should exit shortly.
|
||||
|
@ -511,12 +511,15 @@ public class TestCatalogTracker {
|
|||
}
|
||||
|
||||
void doWaiting() throws InterruptedException {
|
||||
this.ct.waitForRoot();
|
||||
try {
|
||||
while (this.ct.waitForRoot(100) == null);
|
||||
} catch (NotAllMetaRegionsOnlineException e) {
|
||||
// Ignore.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@org.junit.Rule
|
||||
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
|
||||
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,605 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Increment;
|
||||
import org.apache.hadoop.hbase.client.MultiAction;
|
||||
import org.apache.hadoop.hbase.client.MultiResponse;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.RowMutations;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.Exec;
|
||||
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
|
||||
import org.apache.hadoop.hbase.regionserver.FlushRequester;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* A mock RegionServer implementation.
|
||||
* Use this when you can't bend Mockito to your liking (e.g. return null result
|
||||
* when 'scanning' until master timesout and then return a coherent meta row
|
||||
* result thereafter. Have some facility for faking gets and scans. See
|
||||
* {@link #setGetResult(byte[], byte[], Result)} for how to fill the backing data
|
||||
* store that the get pulls from.
|
||||
*/
|
||||
class MockRegionServer implements HRegionInterface, RegionServerServices {
|
||||
private final ServerName sn;
|
||||
private final ZooKeeperWatcher zkw;
|
||||
private final Configuration conf;
|
||||
private final Random random = new Random();
|
||||
|
||||
/**
|
||||
* Map of regions to map of rows and {@link Results}. Used as data source when
|
||||
* {@link MockRegionServer#get(byte[], Get)} is called. Because we have a byte
|
||||
* key, need to use TreeMap and provide a Comparator. Use
|
||||
* {@link #setGetResult(byte[], byte[], Result)} filling this map.
|
||||
*/
|
||||
private final Map<byte [], Map<byte [], Result>> gets =
|
||||
new TreeMap<byte [], Map<byte [], Result>>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
/**
|
||||
* Map of regions to results to return when scanning.
|
||||
*/
|
||||
private final Map<byte [], Result []> nexts =
|
||||
new TreeMap<byte [], Result []>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
/**
|
||||
* Data structure that holds regionname and index used scanning.
|
||||
*/
|
||||
class RegionNameAndIndex {
|
||||
private final byte[] regionName;
|
||||
private int index = 0;
|
||||
|
||||
RegionNameAndIndex(final byte[] regionName) {
|
||||
this.regionName = regionName;
|
||||
}
|
||||
|
||||
byte[] getRegionName() {
|
||||
return this.regionName;
|
||||
}
|
||||
|
||||
int getThenIncrement() {
|
||||
int currentIndex = this.index;
|
||||
this.index++;
|
||||
return currentIndex;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Outstanding scanners and their offset into <code>nexts</code>
|
||||
*/
|
||||
private final Map<Long, RegionNameAndIndex> scannersAndOffsets =
|
||||
new HashMap<Long, RegionNameAndIndex>();
|
||||
|
||||
/**
|
||||
* @param sn Name of this mock regionserver
|
||||
* @throws IOException
|
||||
* @throws ZooKeeperConnectionException
|
||||
*/
|
||||
MockRegionServer(final Configuration conf, final ServerName sn)
|
||||
throws ZooKeeperConnectionException, IOException {
|
||||
this.sn = sn;
|
||||
this.conf = conf;
|
||||
this.zkw = new ZooKeeperWatcher(conf, sn.toString(), this, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this method filling the backing data source used by {@link #get(byte[], Get)}
|
||||
* @param regionName
|
||||
* @param row
|
||||
* @param r
|
||||
*/
|
||||
void setGetResult(final byte [] regionName, final byte [] row, final Result r) {
|
||||
Map<byte [], Result> value = this.gets.get(regionName);
|
||||
if (value == null) {
|
||||
// If no value already, create one. Needs to be treemap because we are
|
||||
// using byte array as key. Not thread safe.
|
||||
value = new TreeMap<byte [], Result>(Bytes.BYTES_COMPARATOR);
|
||||
this.gets.put(regionName, value);
|
||||
}
|
||||
value.put(row, r);
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this method to set what a scanner will reply as we next through
|
||||
* @param regionName
|
||||
* @param rs
|
||||
*/
|
||||
void setNextResults(final byte [] regionName, final Result [] rs) {
|
||||
this.nexts.put(regionName, rs);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getProtocolVersion(String protocol, long clientVersion)
|
||||
throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ProtocolSignature getProtocolSignature(String protocol,
|
||||
long clientVersion, int clientMethodsHash) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStopped() {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void abort(String why, Throwable e) {
|
||||
throw new RuntimeException(this.sn + ": " + why, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAborted() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HRegionInfo getRegionInfo(byte[] regionName) {
|
||||
// Just return this. Calls to getRegionInfo are usually to test connection
|
||||
// to regionserver does reasonable things so should be safe to return
|
||||
// anything.
|
||||
return HRegionInfo.ROOT_REGIONINFO;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flushRegion(byte[] regionName) throws IllegalArgumentException,
|
||||
IOException {
|
||||
// TODO Auto-generated method stub
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flushRegion(byte[] regionName, long ifOlderThanTS)
|
||||
throws IllegalArgumentException, IOException {
|
||||
// TODO Auto-generated method stub
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastFlushTime(byte[] regionName) {
|
||||
// TODO Auto-generated method stub
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getStoreFileList(byte[] regionName, byte[] columnFamily)
|
||||
throws IllegalArgumentException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getStoreFileList(byte[] regionName,
|
||||
byte[][] columnFamilies) throws IllegalArgumentException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getStoreFileList(byte[] regionName)
|
||||
throws IllegalArgumentException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result getClosestRowBefore(byte[] regionName, byte[] row,
|
||||
byte[] family) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result get(byte[] regionName, Get get) throws IOException {
|
||||
Map<byte [], Result> m = this.gets.get(regionName);
|
||||
if (m == null) return null;
|
||||
return m.get(get.getRow());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean exists(byte[] regionName, Get get) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void put(byte[] regionName, Put put) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
}
|
||||
|
||||
@Override
|
||||
public int put(byte[] regionName, List<Put> puts) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(byte[] regionName, Delete delete) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
}
|
||||
|
||||
@Override
|
||||
public int delete(byte[] regionName, List<Delete> deletes)
|
||||
throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndPut(byte[] regionName, byte[] row, byte[] family,
|
||||
byte[] qualifier, byte[] value, Put put) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndDelete(byte[] regionName, byte[] row, byte[] family,
|
||||
byte[] qualifier, byte[] value, Delete delete) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long incrementColumnValue(byte[] regionName, byte[] row,
|
||||
byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
|
||||
throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result append(byte[] regionName, Append append) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result increment(byte[] regionName, Increment increment)
|
||||
throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long openScanner(byte[] regionName, Scan scan) throws IOException {
|
||||
long scannerId = this.random.nextLong();
|
||||
this.scannersAndOffsets.put(scannerId, new RegionNameAndIndex(regionName));
|
||||
return scannerId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result next(long scannerId) throws IOException {
|
||||
RegionNameAndIndex rnai = this.scannersAndOffsets.get(scannerId);
|
||||
int index = rnai.getThenIncrement();
|
||||
Result [] results = this.nexts.get(rnai.getRegionName());
|
||||
if (results == null) return null;
|
||||
return index < results.length? results[index]: null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result [] next(long scannerId, int numberOfRows) throws IOException {
|
||||
// Just return one result whatever they ask for.
|
||||
Result r = next(scannerId);
|
||||
return r == null? null: new Result [] {r};
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close(final long scannerId) throws IOException {
|
||||
this.scannersAndOffsets.remove(scannerId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long lockRow(byte[] regionName, byte[] row) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unlockRow(byte[] regionName, long lockId) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<HRegionInfo> getOnlineRegions() throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HServerInfo getHServerInfo() throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <R> MultiResponse multi(MultiAction<R> multi) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
|
||||
byte[] regionName) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionOpeningState openRegion(HRegionInfo region) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionOpeningState openRegion(HRegionInfo region,
|
||||
int versionOfOfflineNode) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void openRegions(List<HRegionInfo> regions) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean closeRegion(HRegionInfo region) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean closeRegion(HRegionInfo region, int versionOfClosingNode)
|
||||
throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean closeRegion(HRegionInfo region, boolean zk)
|
||||
throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean closeRegion(byte[] encodedRegionName, boolean zk)
|
||||
throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flushRegion(HRegionInfo regionInfo)
|
||||
throws NotServingRegionException, IOException {
|
||||
// TODO Auto-generated method stub
|
||||
}
|
||||
|
||||
@Override
|
||||
public void splitRegion(HRegionInfo regionInfo)
|
||||
throws NotServingRegionException, IOException {
|
||||
// TODO Auto-generated method stub
|
||||
}
|
||||
|
||||
@Override
|
||||
public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint)
|
||||
throws NotServingRegionException, IOException {
|
||||
// TODO Auto-generated method stub
|
||||
}
|
||||
|
||||
@Override
|
||||
public void compactRegion(HRegionInfo regionInfo, boolean major)
|
||||
throws NotServingRegionException, IOException {
|
||||
// TODO Auto-generated method stub
|
||||
}
|
||||
|
||||
@Override
|
||||
public void replicateLogEntries(Entry[] entries) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExecResult execCoprocessor(byte[] regionName, Exec call)
|
||||
throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndPut(byte[] regionName, byte[] row, byte[] family,
|
||||
byte[] qualifier, CompareOp compareOp,
|
||||
WritableByteArrayComparable comparator, Put put) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean checkAndDelete(byte[] regionName, byte[] row, byte[] family,
|
||||
byte[] qualifier, CompareOp compareOp,
|
||||
WritableByteArrayComparable comparator, Delete delete)
|
||||
throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries()
|
||||
throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[][] rollHLogWriter() throws IOException,
|
||||
FailedLogCloseException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(String why) {
|
||||
this.zkw.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addToOnlineRegions(HRegion r) {
|
||||
// TODO Auto-generated method stub
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean removeFromOnlineRegions(String encodedRegionName) {
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HRegion getFromOnlineRegions(String encodedRegionName) {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<HRegion> getOnlineRegions(byte[] tableName) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshRegion(HRegion hRegion) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
}
|
||||
|
||||
@Override
|
||||
public Configuration getConfiguration() {
|
||||
return this.conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ZooKeeperWatcher getZooKeeper() {
|
||||
return this.zkw;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CatalogTracker getCatalogTracker() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ServerName getServerName() {
|
||||
return this.sn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStopping() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HLog getWAL() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompactionRequestor getCompactionRequester() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FlushRequester getFlushRequester() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegionServerAccounting getRegionServerAccounting() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postOpenDeployTasks(HRegion r, CatalogTracker ct, boolean daughter)
|
||||
throws KeeperException, IOException {
|
||||
// TODO Auto-generated method stub
|
||||
}
|
||||
|
||||
@Override
|
||||
public RpcServer getRpcServer() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<byte[], Boolean> getRegionsInTransitionInRS() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileSystem getFileSystem() {
|
||||
// TODO Auto-generated method stub
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void mutateRow(byte[] regionName, RowMutations rm) throws IOException {
|
||||
// TODO Auto-generated method stub
|
||||
}
|
||||
}
|
|
@ -0,0 +1,97 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* Package scoped mocking utility.
|
||||
*/
|
||||
public class Mocking {
|
||||
/**
|
||||
* @param sn ServerName to use making startcode and server in meta
|
||||
* @param hri Region to serialize into HRegionInfo
|
||||
* @return A mocked up Result that fakes a Get on a row in the
|
||||
* <code>.META.</code> table.
|
||||
* @throws IOException
|
||||
*/
|
||||
static Result getMetaTableRowResult(final HRegionInfo hri,
|
||||
final ServerName sn)
|
||||
throws IOException {
|
||||
// TODO: Move to a utilities class. More than one test case can make use
|
||||
// of this facility.
|
||||
List<KeyValue> kvs = new ArrayList<KeyValue>();
|
||||
kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
|
||||
HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
|
||||
Writables.getBytes(hri)));
|
||||
kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
|
||||
HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
|
||||
Bytes.toBytes(sn.getHostAndPort())));
|
||||
kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
|
||||
HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
|
||||
Bytes.toBytes(sn.getStartcode())));
|
||||
return new Result(kvs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Fakes the regionserver-side zk transitions of a region open.
|
||||
* @param w ZooKeeperWatcher to use.
|
||||
* @param sn Name of the regionserver doing the 'opening'
|
||||
* @param hri Region we're 'opening'.
|
||||
* @throws KeeperException
|
||||
*/
|
||||
static void fakeRegionServerRegionOpenInZK(final ZooKeeperWatcher w,
|
||||
final ServerName sn, final HRegionInfo hri)
|
||||
throws KeeperException {
|
||||
// Wait till we see the OFFLINE zk node before we proceed.
|
||||
while (!ZKAssign.verifyRegionState(w, hri, EventType.M_ZK_REGION_OFFLINE)) {
|
||||
Threads.sleep(1);
|
||||
}
|
||||
// Get current versionid else will fail on transition from OFFLINE to OPENING below
|
||||
int versionid = ZKAssign.getVersion(w, hri);
|
||||
assertNotSame(-1, versionid);
|
||||
// This uglyness below is what the openregionhandler on RS side does. I
|
||||
// looked at exposing the method over in openregionhandler but its just a
|
||||
// one liner and its deep over in another package so just repeat it below.
|
||||
versionid = ZKAssign.transitionNode(w, hri, sn,
|
||||
EventType.M_ZK_REGION_OFFLINE, EventType.RS_ZK_REGION_OPENING, versionid);
|
||||
assertNotSame(-1, versionid);
|
||||
// Move znode from OPENING to OPENED as RS does on successful open.
|
||||
versionid = ZKAssign.transitionNodeOpened(w, hri, sn, versionid);
|
||||
assertNotSame(-1, versionid);
|
||||
// We should be done now. The master open handler will notice the
|
||||
// transition and remove this regions znode.
|
||||
}
|
||||
}
|
|
@ -52,7 +52,6 @@ import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
|
|||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
|
@ -145,7 +144,7 @@ public class TestAssignmentManager {
|
|||
|
||||
/**
|
||||
* Test a balance going on at same time as a master failover
|
||||
*
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws KeeperException
|
||||
* @throws InterruptedException
|
||||
|
@ -383,7 +382,7 @@ public class TestAssignmentManager {
|
|||
// Make an RS Interface implementation. Make it so a scanner can go against it.
|
||||
HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
|
||||
// Get a meta row result that has region up on SERVERNAME_A
|
||||
Result r = getMetaTableRowResult(REGIONINFO, SERVERNAME_A);
|
||||
Result r = Mocking.getMetaTableRowResult(REGIONINFO, SERVERNAME_A);
|
||||
Mockito.when(implementation.openScanner((byte [])Mockito.any(), (Scan)Mockito.any())).
|
||||
thenReturn(System.currentTimeMillis());
|
||||
// Return a good result first and then return null to indicate end of scan
|
||||
|
@ -419,31 +418,6 @@ public class TestAssignmentManager {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param sn ServerName to use making startcode and server in meta
|
||||
* @param hri Region to serialize into HRegionInfo
|
||||
* @return A mocked up Result that fakes a Get on a row in the
|
||||
* <code>.META.</code> table.
|
||||
* @throws IOException
|
||||
*/
|
||||
private Result getMetaTableRowResult(final HRegionInfo hri,
|
||||
final ServerName sn)
|
||||
throws IOException {
|
||||
// TODO: Move to a utilities class. More than one test case can make use
|
||||
// of this facility.
|
||||
List<KeyValue> kvs = new ArrayList<KeyValue>();
|
||||
kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
|
||||
HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
|
||||
Writables.getBytes(hri)));
|
||||
kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
|
||||
HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
|
||||
Bytes.toBytes(sn.getHostAndPort())));
|
||||
kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
|
||||
HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
|
||||
Bytes.toBytes(sn.getStartcode())));
|
||||
return new Result(kvs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create and startup executor pools. Start same set as master does (just
|
||||
* run a few less).
|
||||
|
@ -507,8 +481,8 @@ public class TestAssignmentManager {
|
|||
* @param region region to be created as offline
|
||||
* @param serverName server event originates from
|
||||
* @return Version of znode created.
|
||||
* @throws KeeperException
|
||||
* @throws IOException
|
||||
* @throws KeeperException
|
||||
* @throws IOException
|
||||
*/
|
||||
// Copied from SplitTransaction rather than open the method over there in
|
||||
// the regionserver package.
|
||||
|
@ -567,7 +541,7 @@ public class TestAssignmentManager {
|
|||
// with an encoded name by doing a Get on .META.
|
||||
HRegionInterface ri = Mockito.mock(HRegionInterface.class);
|
||||
// Get a meta row result that has region up on SERVERNAME_A for REGIONINFO
|
||||
Result r = getMetaTableRowResult(REGIONINFO, SERVERNAME_A);
|
||||
Result r = Mocking.getMetaTableRowResult(REGIONINFO, SERVERNAME_A);
|
||||
Mockito.when(ri .openScanner((byte[]) Mockito.any(), (Scan) Mockito.any())).
|
||||
thenReturn(System.currentTimeMillis());
|
||||
// Return good result 'r' first and then return null to indicate end of scan
|
||||
|
|
|
@ -0,0 +1,329 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HServerLoad;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.io.MapWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
/**
|
||||
* Standup the master and fake it to test various aspects of master function.
|
||||
* Does NOT spin up a mini hbase nor mini dfs cluster testing master (it does
|
||||
* put up a zk cluster but this is usually pretty fast compared). Also, should
|
||||
* be possible to inject faults at points difficult to get at in cluster context.
|
||||
* TODO: Speed up the zk connection by Master. It pauses 5 seconds establishing
|
||||
* session.
|
||||
*/
|
||||
public class TestMasterNoCluster {
|
||||
private static final HBaseTestingUtility TESTUTIL = new HBaseTestingUtility();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
Configuration c = TESTUTIL.getConfiguration();
|
||||
// We use local filesystem. Set it so it writes into the testdir.
|
||||
c.set(HConstants.HBASE_DIR, TESTUTIL.getDataTestDir().toString());
|
||||
// Startup a mini zk cluster.
|
||||
TESTUTIL.startMiniZKCluster();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TESTUTIL.shutdownMiniZKCluster();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown()
|
||||
throws KeeperException, ZooKeeperConnectionException, IOException {
|
||||
// Make sure zk is clean before we run the next test.
|
||||
ZooKeeperWatcher zkw = new ZooKeeperWatcher(TESTUTIL.getConfiguration(),
|
||||
"@Before", new Abortable() {
|
||||
@Override
|
||||
public void abort(String why, Throwable e) {
|
||||
throw new RuntimeException(why, e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAborted() {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
ZKUtil.deleteNodeRecursively(zkw, zkw.baseZNode);
|
||||
zkw.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test starting master then stopping it before its fully up.
|
||||
* @throws IOException
|
||||
* @throws KeeperException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test
|
||||
public void testStopDuringStart()
|
||||
throws IOException, KeeperException, InterruptedException {
|
||||
HMaster master = new HMaster(TESTUTIL.getConfiguration());
|
||||
master.start();
|
||||
// Immediately have it stop. We used hang in assigning root.
|
||||
master.stopMaster();
|
||||
master.join();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test master failover.
|
||||
* Start up three fake regionservers and a master.
|
||||
* @throws IOException
|
||||
* @throws KeeperException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test
|
||||
public void testFailover()
|
||||
throws IOException, KeeperException, InterruptedException {
|
||||
final long now = System.currentTimeMillis();
|
||||
// Names for our three servers. Make the port numbers match hostname.
|
||||
// Will come in use down in the server when we need to figure how to respond.
|
||||
final ServerName sn0 = new ServerName("0.example.org", 0, now);
|
||||
final ServerName sn1 = new ServerName("1.example.org", 1, now);
|
||||
final ServerName sn2 = new ServerName("2.example.org", 2, now);
|
||||
final ServerName [] sns = new ServerName [] {sn0, sn1, sn2};
|
||||
// Put up the mock servers
|
||||
final Configuration conf = TESTUTIL.getConfiguration();
|
||||
final MockRegionServer rs0 = new MockRegionServer(conf, sn0);
|
||||
final MockRegionServer rs1 = new MockRegionServer(conf, sn1);
|
||||
final MockRegionServer rs2 = new MockRegionServer(conf, sn2);
|
||||
// Put some data into the servers. Make it look like sn0 has the root
|
||||
// w/ an entry that points to sn1 as the host of .META. Put data into sn2
|
||||
// so it looks like it has a few regions for a table named 't'.
|
||||
RootLocationEditor.setRootLocation(rs0.getZooKeeper(), rs0.getServerName());
|
||||
byte [] rootregion = Bytes.toBytes("-ROOT-,,0");
|
||||
rs0.setGetResult(rootregion, HRegionInfo.FIRST_META_REGIONINFO.getRegionName(),
|
||||
Mocking.getMetaTableRowResult(HRegionInfo.FIRST_META_REGIONINFO,
|
||||
rs1.getServerName()));
|
||||
final byte [] tableName = Bytes.toBytes("t");
|
||||
Result [] results = new Result [] {
|
||||
Mocking.getMetaTableRowResult(
|
||||
new HRegionInfo(tableName, HConstants.EMPTY_START_ROW, HBaseTestingUtility.KEYS[1]),
|
||||
rs2.getServerName()),
|
||||
Mocking.getMetaTableRowResult(
|
||||
new HRegionInfo(tableName, HBaseTestingUtility.KEYS[1], HBaseTestingUtility.KEYS[2]),
|
||||
rs2.getServerName()),
|
||||
Mocking.getMetaTableRowResult(new HRegionInfo(tableName, HBaseTestingUtility.KEYS[2],
|
||||
HConstants.EMPTY_END_ROW),
|
||||
rs2.getServerName())
|
||||
};
|
||||
rs1.setNextResults(HRegionInfo.FIRST_META_REGIONINFO.getRegionName(), results);
|
||||
|
||||
// Create master. Subclass to override a few methods so we can insert mocks
|
||||
// and get notification on transitions. We need to fake out any rpcs the
|
||||
// master does opening/closing regions. Also need to fake out the address
|
||||
// of the 'remote' mocked up regionservers.
|
||||
HMaster master = new HMaster(conf) {
|
||||
InetAddress getRemoteInetAddress(final int port, final long serverStartCode)
|
||||
throws UnknownHostException {
|
||||
// Return different address dependent on port passed.
|
||||
ServerName sn = sns[port];
|
||||
return InetAddress.getByAddress(sn.getHostname(),
|
||||
new byte [] {10, 0, 0, (byte)sn.getPort()});
|
||||
}
|
||||
|
||||
@Override
|
||||
ServerManager createServerManager(Server master, MasterServices services)
|
||||
throws IOException {
|
||||
ServerManager sm = super.createServerManager(master, services);
|
||||
// Spy on the created servermanager
|
||||
ServerManager spy = Mockito.spy(sm);
|
||||
// Fake a successful open.
|
||||
Mockito.doReturn(RegionOpeningState.OPENED).when(spy).
|
||||
sendRegionOpen((ServerName)Mockito.any(), (HRegionInfo)Mockito.any(),
|
||||
Mockito.anyInt());
|
||||
return spy;
|
||||
}
|
||||
|
||||
@Override
|
||||
CatalogTracker createCatalogTracker(ZooKeeperWatcher zk,
|
||||
Configuration conf, Abortable abortable, int defaultTimeout)
|
||||
throws IOException {
|
||||
// Insert a mock for the connection used by the CatalogTracker. Any
|
||||
// regionserver should do. Use TESTUTIL.getConfiguration rather than
|
||||
// the conf from the master; the conf will already have an HConnection
|
||||
// associate so the below mocking of a connection will fail.
|
||||
HConnection connection =
|
||||
HConnectionTestingUtility.getMockedConnectionAndDecorate(TESTUTIL.getConfiguration(),
|
||||
rs0, rs0.getServerName(), HRegionInfo.ROOT_REGIONINFO);
|
||||
return new CatalogTracker(zk, conf, connection, abortable, defaultTimeout);
|
||||
}
|
||||
};
|
||||
master.start();
|
||||
|
||||
try {
|
||||
// Wait till master is up ready for RPCs.
|
||||
while (!master.isRpcServerOpen()) Threads.sleep(10);
|
||||
// Fake master that there are regionservers out there. Report in.
|
||||
for (int i = 0; i < sns.length; i++) {
|
||||
master.regionServerReport(sns[i].getVersionedBytes(), new HServerLoad());
|
||||
}
|
||||
// Master should now come up.
|
||||
while (!master.isInitialized()) {Threads.sleep(10);}
|
||||
assertTrue(master.isInitialized());
|
||||
} finally {
|
||||
rs0.stop("Test is done");
|
||||
rs1.stop("Test is done");
|
||||
rs2.stop("Test is done");
|
||||
master.stopMaster();
|
||||
master.join();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test starting master getting it up post initialized state using mocks.
|
||||
* @throws IOException
|
||||
* @throws KeeperException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test
|
||||
public void testCatalogDeploys()
|
||||
throws IOException, KeeperException, InterruptedException {
|
||||
final Configuration conf = TESTUTIL.getConfiguration();
|
||||
final long now = System.currentTimeMillis();
|
||||
// Name for our single mocked up regionserver.
|
||||
final ServerName sn = new ServerName("0.example.org", 0, now);
|
||||
// Here is our mocked up regionserver. Create it now. Need it setting up
|
||||
// master next.
|
||||
final MockRegionServer rs0 = new MockRegionServer(conf, sn);
|
||||
|
||||
// Create master. Subclass to override a few methods so we can insert mocks
|
||||
// and get notification on transitions. We need to fake out any rpcs the
|
||||
// master does opening/closing regions. Also need to fake out the address
|
||||
// of the 'remote' mocked up regionservers.
|
||||
HMaster master = new HMaster(conf) {
|
||||
InetAddress getRemoteInetAddress(final int port, final long serverStartCode)
|
||||
throws UnknownHostException {
|
||||
// Interject an unchecked, nonsense InetAddress; i.e. no resolve.
|
||||
return InetAddress.getByAddress(rs0.getServerName().getHostname(),
|
||||
new byte [] {10, 0, 0, 0});
|
||||
}
|
||||
|
||||
@Override
|
||||
ServerManager createServerManager(Server master, MasterServices services)
|
||||
throws IOException {
|
||||
ServerManager sm = super.createServerManager(master, services);
|
||||
// Spy on the created servermanager
|
||||
ServerManager spy = Mockito.spy(sm);
|
||||
// Fake a successful open.
|
||||
Mockito.doReturn(RegionOpeningState.OPENED).when(spy).
|
||||
sendRegionOpen((ServerName)Mockito.any(), (HRegionInfo)Mockito.any(),
|
||||
Mockito.anyInt());
|
||||
return spy;
|
||||
}
|
||||
|
||||
@Override
|
||||
CatalogTracker createCatalogTracker(ZooKeeperWatcher zk,
|
||||
Configuration conf, Abortable abortable, int defaultTimeout)
|
||||
throws IOException {
|
||||
// Insert a mock for the connection used by the CatalogTracker. Use
|
||||
// TESTUTIL.getConfiguration rather than the conf from the master; the
|
||||
// conf will already have an HConnection associate so the below mocking
|
||||
// of a connection will fail.
|
||||
HConnection connection =
|
||||
HConnectionTestingUtility.getMockedConnectionAndDecorate(TESTUTIL.getConfiguration(),
|
||||
rs0, rs0.getServerName(), HRegionInfo.ROOT_REGIONINFO);
|
||||
return new CatalogTracker(zk, conf, connection, abortable, defaultTimeout);
|
||||
}
|
||||
};
|
||||
master.start();
|
||||
|
||||
try {
|
||||
// Wait till master is up ready for RPCs.
|
||||
while (!master.isRpcServerOpen()) Threads.sleep(10);
|
||||
// Fake master that there is a regionserver out there. Report in.
|
||||
MapWritable mw = master.regionServerStartup(rs0.getServerName().getPort(),
|
||||
rs0.getServerName().getStartcode(), now);
|
||||
// Assert hostname is as expected.
|
||||
String rshostname =
|
||||
mw.get(new Text(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)).toString();
|
||||
assertEquals(rs0.getServerName().getHostname(), rshostname);
|
||||
// Now master knows there is at least one regionserver checked in and so
|
||||
// it'll wait a while to see if more and when none, will assign root and
|
||||
// meta to this single server. Will do an rpc open but we've
|
||||
// mocked it above in our master override to return 'success'. As part of
|
||||
// region open, master will have set an unassigned znode for the region up
|
||||
// into zk for the regionserver to transition. Lets do that now to
|
||||
// complete fake of a successful open.
|
||||
Mocking.fakeRegionServerRegionOpenInZK(rs0.getZooKeeper(),
|
||||
rs0.getServerName(), HRegionInfo.ROOT_REGIONINFO);
|
||||
// Need to set root location as r1. Usually the regionserver does this
|
||||
// when its figured it just opened the root region by setting the root
|
||||
// location up into zk. Since we're mocking regionserver, need to do this
|
||||
// ourselves.
|
||||
RootLocationEditor.setRootLocation(rs0.getZooKeeper(), rs0.getServerName());
|
||||
// Do same transitions for .META. (presuming master has by now assigned
|
||||
// .META. to rs1).
|
||||
Mocking.fakeRegionServerRegionOpenInZK(rs0.getZooKeeper(),
|
||||
rs0.getServerName(), HRegionInfo.FIRST_META_REGIONINFO);
|
||||
// Now trigger our mock regionserver to start returning a row when we
|
||||
// go to get .META. entry in -ROOT-. We do it by setting into
|
||||
// our MockRegionServer some data to be returned when there is a get on
|
||||
// -ROOT- table (up to this its been returning null making master think
|
||||
// nothing assigned, not even .META.). The region for -ROOT- table we
|
||||
// hardcode below. Its always the same, at least in tests. We need to do
|
||||
// this because CatalogTracker runs inside in Master initialization to
|
||||
// confirm .META. has a server.
|
||||
byte [] rootregion = Bytes.toBytes("-ROOT-,,0");
|
||||
rs0.setGetResult(rootregion, HRegionInfo.FIRST_META_REGIONINFO.getRegionName(),
|
||||
Mocking.getMetaTableRowResult(HRegionInfo.FIRST_META_REGIONINFO,
|
||||
rs0.getServerName()));
|
||||
// Master should now come up.
|
||||
while (!master.isInitialized()) {Threads.sleep(10);}
|
||||
assertTrue(master.isInitialized());
|
||||
} finally {
|
||||
rs0.stop("Test is done");
|
||||
master.stopMaster();
|
||||
master.join();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue