HBASE-2482 regions in transition do not get reassigned by master when RS crashes
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@940589 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d3ce422002
commit
196882e2cd
@ -300,8 +300,8 @@ Release 0.21.0 - Unreleased
|
||||
HBASE-2499 Race condition when disabling a table leaves regions in transition
|
||||
HBASE-2489 Make the "Filesystem needs to be upgraded" error message more
|
||||
useful (Benoit Sigoure via Stack)
|
||||
|
||||
|
||||
HBASE-2482 regions in transition do not get reassigned by master when RS
|
||||
crashes (Todd Lipcon via Stack)
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-1760 Cleanup TODOs in HTable
|
||||
|
@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Copyright 2007 The Apache Software Foundation
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
@ -125,6 +125,13 @@ public class HMsg implements Writable {
|
||||
* rather than send them individually in MSG_REPORT_OPEN messages.
|
||||
*/
|
||||
MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS,
|
||||
|
||||
/**
|
||||
* When RegionServer receives this message, it goes into a sleep that only
|
||||
* an exit will cure. This message is sent by unit tests simulating
|
||||
* pathological states.
|
||||
*/
|
||||
TESTING_MSG_BLOCK_RS,
|
||||
}
|
||||
|
||||
private Type type = null;
|
||||
|
@ -82,10 +82,15 @@ public class LocalHBaseCluster implements HConstants {
|
||||
* @param noRegionServers Count of regionservers to start.
|
||||
* @throws IOException
|
||||
*/
|
||||
public LocalHBaseCluster(final Configuration conf,
|
||||
final int noRegionServers)
|
||||
public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
|
||||
throws IOException {
|
||||
this(conf, noRegionServers, HMaster.class);
|
||||
this(conf, noRegionServers, HMaster.class, getRegionServerImplementation(conf));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static Class<? extends HRegionServer> getRegionServerImplementation(final Configuration conf) {
|
||||
return (Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
|
||||
HRegionServer.class);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -98,7 +103,8 @@ public class LocalHBaseCluster implements HConstants {
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public LocalHBaseCluster(final Configuration conf,
|
||||
final int noRegionServers, final Class masterClass)
|
||||
final int noRegionServers, final Class<? extends HMaster> masterClass,
|
||||
final Class<? extends HRegionServer> regionServerClass)
|
||||
throws IOException {
|
||||
this.conf = conf;
|
||||
// Create the master
|
||||
@ -111,7 +117,7 @@ public class LocalHBaseCluster implements HConstants {
|
||||
new CopyOnWriteArrayList<JVMClusterUtil.RegionServerThread>();
|
||||
this.regionServerClass =
|
||||
(Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
|
||||
HRegionServer.class);
|
||||
regionServerClass);
|
||||
for (int i = 0; i < noRegionServers; i++) {
|
||||
addRegionServer(i);
|
||||
}
|
||||
|
@ -90,6 +90,8 @@ import java.util.NavigableMap;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* HMaster is the "master server" for HBase. An HBase cluster has one active
|
||||
@ -124,6 +126,9 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
||||
|
||||
// Metrics is set when we call run.
|
||||
private final MasterMetrics metrics;
|
||||
|
||||
final Lock splitLogLock = new ReentrantLock();
|
||||
|
||||
// Our zk client.
|
||||
private ZooKeeperWrapper zooKeeperWrapper;
|
||||
// Watcher for master address and for cluster shutdown.
|
||||
@ -561,7 +566,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
||||
if(this.serverManager.getServerInfo(serverName) == null) {
|
||||
LOG.info("Log folder doesn't belong " +
|
||||
"to a known region server, splitting");
|
||||
this.regionManager.splitLogLock.lock();
|
||||
this.splitLogLock.lock();
|
||||
Path logDir =
|
||||
new Path(this.rootdir, HLog.getHLogDirectoryName(serverName));
|
||||
try {
|
||||
@ -569,7 +574,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed splitting " + logDir.toString(), e);
|
||||
} finally {
|
||||
this.regionManager.splitLogLock.unlock();
|
||||
this.splitLogLock.unlock();
|
||||
}
|
||||
} else {
|
||||
LOG.info("Log folder belongs to an existing region server");
|
||||
@ -1127,7 +1132,8 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
||||
return c.newInstance(conf);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed construction of " +
|
||||
"Master: " + masterClass.toString(), e);
|
||||
"Master: " + masterClass.toString() +
|
||||
((e.getCause() != null)? e.getCause().getMessage(): ""), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -30,11 +30,13 @@ import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.master.RegionManager.RegionState;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
@ -78,6 +80,7 @@ class ProcessServerShutdown extends RegionServerOperation {
|
||||
|
||||
// check to see if I am responsible for either ROOT or any of the META tables.
|
||||
|
||||
// TODO Why do we do this now instead of at processing time?
|
||||
closeMetaRegions();
|
||||
}
|
||||
|
||||
@ -116,6 +119,19 @@ class ProcessServerShutdown extends RegionServerOperation {
|
||||
return this.deadServerAddress;
|
||||
}
|
||||
|
||||
private void closeRegionsInTransition() {
|
||||
Map<String, RegionState> inTransition =
|
||||
master.getRegionManager().getRegionsInTransitionOnServer(deadServer);
|
||||
for (Map.Entry<String, RegionState> entry : inTransition.entrySet()) {
|
||||
String regionName = entry.getKey();
|
||||
RegionState state = entry.getValue();
|
||||
|
||||
LOG.info("Region " + regionName + " was in transition " +
|
||||
state + " on dead server " + deadServer + " - marking unassigned");
|
||||
master.getRegionManager().setUnassigned(state.getRegionInfo(), true);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ProcessServerShutdown of " + this.deadServer;
|
||||
@ -282,7 +298,7 @@ class ProcessServerShutdown extends RegionServerOperation {
|
||||
if (!logSplit) {
|
||||
// Process the old log file
|
||||
if (this.master.getFileSystem().exists(rsLogDir)) {
|
||||
if (!master.getRegionManager().splitLogLock.tryLock()) {
|
||||
if (!master.splitLogLock.tryLock()) {
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
@ -290,7 +306,7 @@ class ProcessServerShutdown extends RegionServerOperation {
|
||||
this.master.getOldLogDir(), this.master.getFileSystem(),
|
||||
this.master.getConfiguration());
|
||||
} finally {
|
||||
master.getRegionManager().splitLogLock.unlock();
|
||||
master.splitLogLock.unlock();
|
||||
}
|
||||
}
|
||||
logSplit = true;
|
||||
@ -355,6 +371,9 @@ class ProcessServerShutdown extends RegionServerOperation {
|
||||
Bytes.toString(r.getRegionName()) + " on " + r.getServer());
|
||||
}
|
||||
}
|
||||
|
||||
closeRegionsInTransition();
|
||||
|
||||
// Remove this server from dead servers list. Finished splitting logs.
|
||||
this.master.getServerManager().removeDeadServer(deadServer);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.util.Writables;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
@ -54,8 +55,6 @@ import java.util.TreeMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
/**
|
||||
* Class to manage assigning regions to servers, state of root and meta, etc.
|
||||
@ -66,8 +65,6 @@ public class RegionManager implements HConstants {
|
||||
private AtomicReference<HServerAddress> rootRegionLocation =
|
||||
new AtomicReference<HServerAddress>(null);
|
||||
|
||||
final Lock splitLogLock = new ReentrantLock();
|
||||
|
||||
private final RootScanner rootScannerThread;
|
||||
final MetaScanner metaScannerThread;
|
||||
|
||||
@ -166,8 +163,8 @@ public class RegionManager implements HConstants {
|
||||
unsetRootRegion();
|
||||
if (!master.getShutdownRequested().get()) {
|
||||
synchronized (regionsInTransition) {
|
||||
RegionState s = new RegionState(HRegionInfo.ROOT_REGIONINFO);
|
||||
s.setUnassigned();
|
||||
RegionState s = new RegionState(HRegionInfo.ROOT_REGIONINFO,
|
||||
RegionState.State.UNASSIGNED);
|
||||
regionsInTransition.put(
|
||||
HRegionInfo.ROOT_REGIONINFO.getRegionNameAsString(), s);
|
||||
LOG.info("ROOT inserted into regionsInTransition");
|
||||
@ -572,6 +569,23 @@ public class RegionManager implements HConstants {
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a map of the regions in transition on a server.
|
||||
* Returned map entries are region name -> RegionState
|
||||
*/
|
||||
Map<String, RegionState> getRegionsInTransitionOnServer(String serverName) {
|
||||
Map<String, RegionState> ret = new HashMap<String, RegionState>();
|
||||
synchronized (regionsInTransition) {
|
||||
for (Map.Entry<String, RegionState> entry : regionsInTransition.entrySet()) {
|
||||
RegionState rs = entry.getValue();
|
||||
if (serverName.equals(rs.getServerName())) {
|
||||
ret.put(entry.getKey(), rs);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the root and meta scanners so that the region servers serving meta
|
||||
@ -824,6 +838,10 @@ public class RegionManager implements HConstants {
|
||||
&& !s.isUnassigned()
|
||||
&& s.getServerName() != null
|
||||
&& s.getServerName().equals(server.toString())) {
|
||||
// TODO this code appears to be entirely broken, since
|
||||
// server.toString() has no start code, but s.getServerName()
|
||||
// does!
|
||||
LOG.fatal("I DONT BELIEVE YOU WILL EVER SEE THIS!");
|
||||
// Has an outstanding meta region to be assigned.
|
||||
return true;
|
||||
}
|
||||
@ -956,7 +974,7 @@ public class RegionManager implements HConstants {
|
||||
synchronized(this.regionsInTransition) {
|
||||
s = regionsInTransition.get(info.getRegionNameAsString());
|
||||
if (s == null) {
|
||||
s = new RegionState(info);
|
||||
s = new RegionState(info, RegionState.State.UNASSIGNED);
|
||||
regionsInTransition.put(info.getRegionNameAsString(), s);
|
||||
}
|
||||
}
|
||||
@ -1038,7 +1056,7 @@ public class RegionManager implements HConstants {
|
||||
RegionState s =
|
||||
this.regionsInTransition.get(regionInfo.getRegionNameAsString());
|
||||
if (s == null) {
|
||||
s = new RegionState(regionInfo);
|
||||
s = new RegionState(regionInfo, RegionState.State.CLOSING);
|
||||
}
|
||||
// If region was asked to open before getting here, we could be taking
|
||||
// the wrong server name
|
||||
@ -1474,22 +1492,30 @@ public class RegionManager implements HConstants {
|
||||
* note on regionsInTransition data member above for listing of state
|
||||
* transitions.
|
||||
*/
|
||||
private static class RegionState implements Comparable<RegionState> {
|
||||
static class RegionState implements Comparable<RegionState> {
|
||||
private final HRegionInfo regionInfo;
|
||||
private volatile boolean unassigned = false;
|
||||
private volatile boolean pendingOpen = false;
|
||||
private volatile boolean open = false;
|
||||
private volatile boolean closing = false;
|
||||
private volatile boolean pendingClose = false;
|
||||
private volatile boolean closed = false;
|
||||
private volatile boolean offlined = false;
|
||||
|
||||
enum State {
|
||||
UNASSIGNED, // awaiting a server to be assigned
|
||||
PENDING_OPEN, // told a server to open, hasn't opened yet
|
||||
OPEN, // has been opened on RS, but not yet marked in META/ROOT
|
||||
CLOSING, // a msg has been enqueued to close ths region, but not delivered to RS yet
|
||||
PENDING_CLOSE, // msg has been delivered to RS to close this region
|
||||
CLOSED // region has been closed but not yet marked in meta
|
||||
|
||||
}
|
||||
|
||||
private State state;
|
||||
|
||||
private boolean isOfflined;
|
||||
|
||||
/* Set when region is assigned or closing */
|
||||
private volatile String serverName = null;
|
||||
private String serverName = null;
|
||||
|
||||
/* Constructor */
|
||||
RegionState(HRegionInfo info) {
|
||||
RegionState(HRegionInfo info, State state) {
|
||||
this.regionInfo = info;
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
synchronized HRegionInfo getRegionInfo() {
|
||||
@ -1511,14 +1537,16 @@ public class RegionManager implements HConstants {
|
||||
* @return true if the region is being opened
|
||||
*/
|
||||
synchronized boolean isOpening() {
|
||||
return this.unassigned || this.pendingOpen || this.open;
|
||||
return state == State.UNASSIGNED ||
|
||||
state == State.PENDING_OPEN ||
|
||||
state == State.OPEN;
|
||||
}
|
||||
|
||||
/*
|
||||
* @return true if region is unassigned
|
||||
*/
|
||||
synchronized boolean isUnassigned() {
|
||||
return unassigned;
|
||||
return state == State.UNASSIGNED;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -1527,120 +1555,84 @@ public class RegionManager implements HConstants {
|
||||
* called unless it is safe to do so.
|
||||
*/
|
||||
synchronized void setUnassigned() {
|
||||
this.unassigned = true;
|
||||
this.pendingOpen = false;
|
||||
this.open = false;
|
||||
this.closing = false;
|
||||
this.pendingClose = false;
|
||||
this.closed = false;
|
||||
this.offlined = false;
|
||||
state = State.UNASSIGNED;
|
||||
this.serverName = null;
|
||||
}
|
||||
|
||||
synchronized boolean isPendingOpen() {
|
||||
return pendingOpen;
|
||||
return state == State.PENDING_OPEN;
|
||||
}
|
||||
|
||||
/*
|
||||
* @param serverName Server region was assigned to.
|
||||
*/
|
||||
synchronized void setPendingOpen(final String serverName) {
|
||||
if (!this.unassigned) {
|
||||
if (state != State.UNASSIGNED) {
|
||||
LOG.warn("Cannot assign a region that is not currently unassigned. " +
|
||||
"FIX!! State: " + toString());
|
||||
}
|
||||
this.unassigned = false;
|
||||
this.pendingOpen = true;
|
||||
this.open = false;
|
||||
this.closing = false;
|
||||
this.pendingClose = false;
|
||||
this.closed = false;
|
||||
this.offlined = false;
|
||||
state = State.PENDING_OPEN;
|
||||
this.serverName = serverName;
|
||||
}
|
||||
|
||||
synchronized boolean isOpen() {
|
||||
return open;
|
||||
return state == State.OPEN;
|
||||
}
|
||||
|
||||
synchronized void setOpen() {
|
||||
if (!pendingOpen) {
|
||||
if (state != State.PENDING_OPEN) {
|
||||
LOG.warn("Cannot set a region as open if it has not been pending. " +
|
||||
"FIX!! State: " + toString());
|
||||
}
|
||||
this.unassigned = false;
|
||||
this.pendingOpen = false;
|
||||
this.open = true;
|
||||
this.closing = false;
|
||||
this.pendingClose = false;
|
||||
this.closed = false;
|
||||
this.offlined = false;
|
||||
state = State.OPEN;
|
||||
}
|
||||
|
||||
synchronized boolean isClosing() {
|
||||
return closing;
|
||||
return state == State.CLOSING;
|
||||
}
|
||||
|
||||
synchronized void setClosing(String serverName, boolean setOffline) {
|
||||
this.unassigned = false;
|
||||
this.pendingOpen = false;
|
||||
this.open = false;
|
||||
this.closing = true;
|
||||
this.pendingClose = false;
|
||||
this.closed = false;
|
||||
this.offlined = setOffline;
|
||||
state = State.CLOSING;
|
||||
this.serverName = serverName;
|
||||
this.isOfflined = setOffline;
|
||||
}
|
||||
|
||||
synchronized boolean isPendingClose() {
|
||||
return this.pendingClose;
|
||||
return state == State.PENDING_CLOSE;
|
||||
}
|
||||
|
||||
synchronized void setPendingClose() {
|
||||
if (!closing) {
|
||||
if (state != State.CLOSING) {
|
||||
LOG.warn("Cannot set a region as pending close if it has not been " +
|
||||
"closing. FIX!! State: " + toString());
|
||||
}
|
||||
this.unassigned = false;
|
||||
this.pendingOpen = false;
|
||||
this.open = false;
|
||||
this.closing = false;
|
||||
this.pendingClose = true;
|
||||
this.closed = false;
|
||||
state = State.PENDING_CLOSE;
|
||||
}
|
||||
|
||||
synchronized boolean isClosed() {
|
||||
return this.closed;
|
||||
return state == State.CLOSED;
|
||||
}
|
||||
|
||||
synchronized void setClosed() {
|
||||
if (!pendingClose && !pendingOpen && !closing) {
|
||||
if (state != State.PENDING_CLOSE &&
|
||||
state != State.PENDING_OPEN &&
|
||||
state != State.CLOSING) {
|
||||
throw new IllegalStateException(
|
||||
"Cannot set a region to be closed if it was not already marked as" +
|
||||
" pending close, pending open or closing. State: " + toString());
|
||||
" pending close, pending open or closing. State: " + this);
|
||||
}
|
||||
this.unassigned = false;
|
||||
this.pendingOpen = false;
|
||||
this.open = false;
|
||||
this.closing = false;
|
||||
this.pendingClose = false;
|
||||
this.closed = true;
|
||||
state = State.CLOSED;
|
||||
}
|
||||
|
||||
synchronized boolean isOfflined() {
|
||||
return this.offlined;
|
||||
return (state == State.CLOSING ||
|
||||
state == State.PENDING_CLOSE) && isOfflined;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized String toString() {
|
||||
return ("name=" + Bytes.toString(getRegionName()) +
|
||||
", unassigned=" + this.unassigned +
|
||||
", pendingOpen=" + this.pendingOpen +
|
||||
", open=" + this.open +
|
||||
", closing=" + this.closing +
|
||||
", pendingClose=" + this.pendingClose +
|
||||
", closed=" + this.closed +
|
||||
", offlined=" + this.offlined);
|
||||
", state=" + this.state);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -21,12 +21,27 @@ package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HMsg;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
|
||||
/**
|
||||
* Listener for regionserver events in master.
|
||||
* @see HMaster#registerRegionServerOperationListener(RegionServerOperationListener)
|
||||
* @see HMaster#unregisterRegionServerOperationListener(RegionServerOperationListener)
|
||||
*/
|
||||
public interface RegionServerOperationListener {
|
||||
/**
|
||||
* Called for each message passed the master. Most of the messages that come
|
||||
* in here will go on to become {@link #process(RegionServerOperation)}s but
|
||||
* others like {@linke HMsg.Type#MSG_REPORT_PROCESS_OPEN} go no further;
|
||||
* only in here can you see them come in.
|
||||
* @param serverInfo Server we got the message from.
|
||||
* @param incomingMsg The message received.
|
||||
* @return True to continue processing, false to skip.
|
||||
*/
|
||||
public boolean process(final HServerInfo serverInfo,
|
||||
final HMsg incomingMsg);
|
||||
|
||||
/**
|
||||
* Called before processing <code>op</code>
|
||||
* @param op
|
||||
|
@ -31,6 +31,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HMsg;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.util.Sleeper;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
@ -216,6 +218,24 @@ public class RegionServerOperationQueue {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called for each message passed the master. Most of the messages that come
|
||||
* in here will go on to become {@link #process(RegionServerOperation)}s but
|
||||
* others like {@linke HMsg.Type#MSG_REPORT_PROCESS_OPEN} go no further;
|
||||
* only in here can you see them come in.
|
||||
* @param serverInfo Server we got the message from.
|
||||
* @param incomingMsg The message received.
|
||||
* @return True to continue processing, false to skip.
|
||||
*/
|
||||
boolean process(final HServerInfo serverInfo,
|
||||
final HMsg incomingMsg) {
|
||||
if (this.listeners.isEmpty()) return true;
|
||||
for (RegionServerOperationListener listener: this.listeners) {
|
||||
if (!listener.process(serverInfo, incomingMsg)) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Tell listeners that we processed a RegionServerOperation.
|
||||
* @param op Operation to tell the world about.
|
||||
|
@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.Leases;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.master.RegionManager.RegionState;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
@ -326,7 +327,11 @@ public class ServerManager implements HConstants {
|
||||
}
|
||||
}
|
||||
|
||||
/* Region server is exiting
|
||||
/*
|
||||
* Region server is exiting with a clean shutdown.
|
||||
*
|
||||
* In this case, the server sends MSG_REPORT_EXITING in msgs[0] followed by
|
||||
* a MSG_REPORT_CLOSE for each region it was serving.
|
||||
* @param serverInfo
|
||||
* @param msgs
|
||||
*/
|
||||
@ -347,6 +352,7 @@ public class ServerManager implements HConstants {
|
||||
for (int i = 1; i < msgs.length; i++) {
|
||||
LOG.info("Processing " + msgs[i] + " from " +
|
||||
serverInfo.getServerName());
|
||||
assert msgs[i].getType() == HMsg.Type.MSG_REGION_CLOSE;
|
||||
HRegionInfo info = msgs[i].getRegionInfo();
|
||||
// Meta/root region offlining is handed in removeServerInfo above.
|
||||
if (!info.isMetaRegion()) {
|
||||
@ -361,6 +367,18 @@ public class ServerManager implements HConstants {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// There should not be any regions in transition for this server - the
|
||||
// server should finish transitions itself before closing
|
||||
Map<String, RegionState> inTransition =
|
||||
master.getRegionManager().getRegionsInTransitionOnServer(
|
||||
serverInfo.getServerName());
|
||||
for (Map.Entry<String, RegionState> entry : inTransition.entrySet()) {
|
||||
LOG.warn("Region server " + serverInfo.getServerName() +
|
||||
" shut down with region " + entry.getKey() + " in transition " +
|
||||
"state " + entry.getValue());
|
||||
master.getRegionManager().setUnassigned(entry.getValue().getRegionInfo(), true);
|
||||
}
|
||||
}
|
||||
// We don't need to return anything to the server because it isn't
|
||||
// going to do any more work.
|
||||
@ -418,7 +436,7 @@ public class ServerManager implements HConstants {
|
||||
* @return
|
||||
*/
|
||||
private HMsg[] processMsgs(HServerInfo serverInfo,
|
||||
HRegionInfo[] mostLoadedRegions, HMsg incomingMsgs[]) {
|
||||
HRegionInfo[] mostLoadedRegions, HMsg incomingMsgs[]) {
|
||||
ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>();
|
||||
if (serverInfo.getServerAddress() == null) {
|
||||
throw new NullPointerException("Server address cannot be null; " +
|
||||
@ -433,6 +451,10 @@ public class ServerManager implements HConstants {
|
||||
LOG.info("Processing " + incomingMsgs[i] + " from " +
|
||||
serverInfo.getServerName() + "; " + (i + 1) + " of " +
|
||||
incomingMsgs.length);
|
||||
if (!this.master.getRegionServerOperationQueue().
|
||||
process(serverInfo, incomingMsgs[i])) {
|
||||
continue;
|
||||
}
|
||||
switch (incomingMsgs[i].getType()) {
|
||||
case MSG_REPORT_PROCESS_OPEN:
|
||||
openingCount++;
|
||||
|
@ -132,7 +132,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
||||
// Go down hard. Used if file system becomes unavailable and also in
|
||||
// debugging and unit tests.
|
||||
protected volatile boolean abortRequested;
|
||||
|
||||
|
||||
// If false, the file system has become unavailable
|
||||
protected volatile boolean fsOk;
|
||||
|
||||
@ -666,7 +666,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
||||
}
|
||||
|
||||
join();
|
||||
zooKeeperWrapper.close();
|
||||
this.zooKeeperWrapper.close();
|
||||
|
||||
LOG.info(Thread.currentThread().getName() + " exiting");
|
||||
}
|
||||
@ -1423,6 +1423,14 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
||||
region.flushcache();
|
||||
break;
|
||||
|
||||
case TESTING_MSG_BLOCK_RS:
|
||||
while (!stopRequested.get()) {
|
||||
Threads.sleep(1000);
|
||||
LOG.info("Regionserver blocked by " +
|
||||
HMsg.Type.TESTING_MSG_BLOCK_RS + "; " + stopRequested.get());
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new AssertionError(
|
||||
"Impossible state during msg processing. Instruction: "
|
||||
@ -1461,7 +1469,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void openRegion(final HRegionInfo regionInfo) {
|
||||
Integer mapKey = Bytes.mapKey(regionInfo.getRegionName());
|
||||
HRegion region = this.onlineRegions.get(mapKey);
|
||||
@ -2383,7 +2391,7 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
||||
*/
|
||||
public static Thread startRegionServer(final HRegionServer hrs) {
|
||||
return startRegionServer(hrs,
|
||||
"regionserver" + hrs.server.getListenerAddress());
|
||||
"regionserver" + hrs.getServerInfo().getServerAddress().getPort());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -2411,6 +2419,24 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
||||
System.exit(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility for constructing an instance of the passed HRegionServer class.
|
||||
* @param regionServerClass
|
||||
* @param conf2
|
||||
* @return HRegionServer instance.
|
||||
*/
|
||||
public static HRegionServer constructRegionServer(Class<? extends HRegionServer> regionServerClass,
|
||||
final Configuration conf2) {
|
||||
try {
|
||||
Constructor<? extends HRegionServer> c =
|
||||
regionServerClass.getConstructor(HBaseConfiguration.class);
|
||||
return c.newInstance(conf2);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Failed construction of " +
|
||||
"Master: " + regionServerClass.toString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Do class main.
|
||||
* @param args
|
||||
@ -2438,9 +2464,8 @@ public class HRegionServer implements HConstants, HRegionInterface,
|
||||
if (runtime != null) {
|
||||
LOG.info("vmInputArguments=" + runtime.getInputArguments());
|
||||
}
|
||||
Constructor<? extends HRegionServer> c =
|
||||
regionServerClass.getConstructor(Configuration.class);
|
||||
startRegionServer(c.newInstance(conf));
|
||||
HRegionServer hrs = constructRegionServer(regionServerClass, conf);
|
||||
startRegionServer(hrs);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.error( "Can not start region server because "+
|
||||
|
@ -109,13 +109,24 @@ public class MiniHBaseCluster implements HConstants {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Subclass so can get at protected methods (none at moment).
|
||||
*/
|
||||
public static class MiniHBaseClusterRegionServer extends HRegionServer {
|
||||
public MiniHBaseClusterRegionServer(HBaseConfiguration conf)
|
||||
throws IOException {
|
||||
super(conf);
|
||||
}
|
||||
}
|
||||
|
||||
private void init(final int nRegionNodes) throws IOException {
|
||||
try {
|
||||
// start up a LocalHBaseCluster
|
||||
while (true) {
|
||||
try {
|
||||
hbaseCluster = new LocalHBaseCluster(conf, nRegionNodes,
|
||||
MiniHBaseCluster.MiniHBaseClusterMaster.class);
|
||||
MiniHBaseCluster.MiniHBaseClusterMaster.class,
|
||||
MiniHBaseCluster.MiniHBaseClusterRegionServer.class);
|
||||
hbaseCluster.startup();
|
||||
} catch (BindException e) {
|
||||
//this port is already in use. try to use another (for multiple testing)
|
||||
@ -137,13 +148,13 @@ public class MiniHBaseCluster implements HConstants {
|
||||
* Starts a region server thread running
|
||||
*
|
||||
* @throws IOException
|
||||
* @return Name of regionserver started.
|
||||
* @return New RegionServerThread
|
||||
*/
|
||||
public String startRegionServer() throws IOException {
|
||||
public JVMClusterUtil.RegionServerThread startRegionServer() throws IOException {
|
||||
JVMClusterUtil.RegionServerThread t = this.hbaseCluster.addRegionServer();
|
||||
t.start();
|
||||
t.waitForServerOnline();
|
||||
return t.getName();
|
||||
return t;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -296,7 +307,21 @@ public class MiniHBaseCluster implements HConstants {
|
||||
public void addMessageToSendRegionServer(final int serverNumber,
|
||||
final HMsg msg)
|
||||
throws IOException {
|
||||
HRegionServer hrs = getRegionServer(serverNumber);
|
||||
MiniHBaseClusterRegionServer hrs =
|
||||
(MiniHBaseClusterRegionServer)getRegionServer(serverNumber);
|
||||
addMessageToSendRegionServer(hrs, msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a message to include in the responses send a regionserver when it
|
||||
* checks back in.
|
||||
* @param hrs Which region server.
|
||||
* @param msg The MESSAGE
|
||||
* @throws IOException
|
||||
*/
|
||||
public void addMessageToSendRegionServer(final MiniHBaseClusterRegionServer hrs,
|
||||
final HMsg msg)
|
||||
throws IOException {
|
||||
((MiniHBaseClusterMaster)getMaster()).addMessage(hrs.getHServerInfo(), msg);
|
||||
}
|
||||
}
|
||||
|
@ -19,35 +19,45 @@
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HMsg;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Test transitions of state across the master.
|
||||
* Test transitions of state across the master. Sets up the cluster once and
|
||||
* then runs a couple of tests.
|
||||
*/
|
||||
public class TestMasterTransistions {
|
||||
private static final Log LOG = LogFactory.getLog(TestMasterTransistions.class);
|
||||
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static final String TABLENAME = "master_transitions";
|
||||
private static final byte [][] FAMILIES = new byte [][] {Bytes.toBytes("a"),
|
||||
@ -63,14 +73,159 @@ public class TestMasterTransistions {
|
||||
// Create a table of three families. This will assign a region.
|
||||
TEST_UTIL.createTable(Bytes.toBytes(TABLENAME), FAMILIES);
|
||||
HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
|
||||
int countOfRegions = TEST_UTIL.createMultiRegions(t, FAMILIES[0]);
|
||||
int countOfRegions = TEST_UTIL.createMultiRegions(t, getTestFamily());
|
||||
waitUntilAllRegionsAssigned(countOfRegions);
|
||||
addToEachStartKey(countOfRegions);
|
||||
}
|
||||
|
||||
@AfterClass public static void afterAllTests() throws IOException {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* HBase2482 is about outstanding region openings. If any are outstanding
|
||||
* when a regionserver goes down, then they'll never deploy. They'll be
|
||||
* stuck in the regions-in-transition list for ever. This listener looks
|
||||
* for a region opening HMsg and if its from the server passed on construction,
|
||||
* then we kill it. It also looks out for a close message on the victim
|
||||
* server because that signifies start of the fireworks.
|
||||
*/
|
||||
static class HBase2482Listener implements RegionServerOperationListener {
|
||||
private final HRegionServer victim;
|
||||
private boolean abortSent = false;
|
||||
// We closed regions on new server.
|
||||
private volatile boolean closed = false;
|
||||
// Copy of regions on new server
|
||||
private final Collection<HRegion> copyOfOnlineRegions;
|
||||
// This is the region that was in transition on the server we aborted. Test
|
||||
// passes if this region comes back online successfully.
|
||||
private HRegionInfo regionToFind;
|
||||
|
||||
HBase2482Listener(final HRegionServer victim) {
|
||||
this.victim = victim;
|
||||
// Copy regions currently open on this server so I can notice when
|
||||
// there is a close.
|
||||
this.copyOfOnlineRegions =
|
||||
this.victim.getCopyOfOnlineRegionsSortedBySize().values();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean process(HServerInfo serverInfo, HMsg incomingMsg) {
|
||||
if (!victim.getServerInfo().equals(serverInfo) ||
|
||||
this.abortSent || !this.closed) {
|
||||
return true;
|
||||
}
|
||||
if (!incomingMsg.isType(HMsg.Type.MSG_REPORT_PROCESS_OPEN)) return true;
|
||||
// Save the region that is in transition so can test later it came back.
|
||||
this.regionToFind = incomingMsg.getRegionInfo();
|
||||
LOG.info("ABORTING " + this.victim + " because got a " +
|
||||
HMsg.Type.MSG_REPORT_PROCESS_OPEN + " on this server for " +
|
||||
incomingMsg.getRegionInfo().getRegionNameAsString());
|
||||
this.victim.abort();
|
||||
this.abortSent = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean process(RegionServerOperation op) throws IOException {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processed(RegionServerOperation op) {
|
||||
if (this.closed || !(op instanceof ProcessRegionClose)) return;
|
||||
ProcessRegionClose close = (ProcessRegionClose)op;
|
||||
for (HRegion r: this.copyOfOnlineRegions) {
|
||||
if (r.getRegionInfo().equals(close.regionInfo)) {
|
||||
// We've closed one of the regions that was on the victim server.
|
||||
// Now can start testing for when all regions are back online again
|
||||
LOG.info("Found close of " +
|
||||
r.getRegionInfo().getRegionNameAsString() +
|
||||
"; setting close happened flag");
|
||||
this.closed = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* In 2482, a RS with an opening region on it dies. The said region is then
|
||||
* stuck in the master's regions-in-transition and never leaves it. This
|
||||
* test works by bringing up a new regionserver, waiting for the load
|
||||
* balancer to give it some regions. Then, we close all on the new server.
|
||||
* After sending all the close messages, we send the new regionserver the
|
||||
* special blocking message so it can not process any more messages.
|
||||
* Meantime reopening of the just-closed regions is backed up on the new
|
||||
* server. Soon as master gets an opening region from the new regionserver,
|
||||
* we kill it. We then wait on all regions to combe back on line. If bug
|
||||
* is fixed, this should happen soon as the processing of the killed server is
|
||||
* done.
|
||||
* @see <a href="https://issues.apache.org/jira/browse/HBASE-2482">HBASE-2482</a>
|
||||
*/
|
||||
@Test public void testKillRSWithOpeningRegion2482() throws Exception {
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
// Count how many regions are online. They need to be all back online for
|
||||
// this test to succeed.
|
||||
int countOfMetaRegions = countOfMetaRegions();
|
||||
// Add a listener on the server.
|
||||
HMaster m = cluster.getMaster();
|
||||
// Start new regionserver.
|
||||
MiniHBaseClusterRegionServer hrs =
|
||||
(MiniHBaseClusterRegionServer)cluster.startRegionServer().getRegionServer();
|
||||
LOG.info("Started new regionserver: " + hrs.toString());
|
||||
// Wait until has some regions before proceeding. Balancer will give it some.
|
||||
int minimumRegions =
|
||||
countOfMetaRegions/(cluster.getRegionServerThreads().size() * 2);
|
||||
while (hrs.getOnlineRegions().size() < minimumRegions) Threads.sleep(100);
|
||||
// Set the listener only after some regions have been opened on new server.
|
||||
HBase2482Listener listener = new HBase2482Listener(hrs);
|
||||
m.getRegionServerOperationQueue().
|
||||
registerRegionServerOperationListener(listener);
|
||||
try {
|
||||
// Go close all non-catalog regions on this new server
|
||||
closeAlltNonCatalogRegions(cluster, hrs);
|
||||
// After all closes, add blocking message before the region opens start to
|
||||
// come in.
|
||||
cluster.addMessageToSendRegionServer(hrs,
|
||||
new HMsg(HMsg.Type.TESTING_MSG_BLOCK_RS));
|
||||
// Wait till one of the above close messages has an effect before we start
|
||||
// wait on all regions back online.
|
||||
while (!listener.closed) Threads.sleep(100);
|
||||
LOG.info("Past close");
|
||||
// Make sure the abort server message was sent.
|
||||
while(!listener.abortSent) Threads.sleep(100);
|
||||
LOG.info("Past abort send; waiting on all regions to redeploy");
|
||||
// Now wait for regions to come back online.
|
||||
assertRegionIsBackOnline(listener.regionToFind);
|
||||
} finally {
|
||||
m.getRegionServerOperationQueue().
|
||||
unregisterRegionServerOperationListener(listener);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* @param cluster
|
||||
* @param hrs
|
||||
* @return Count of regions closed.
|
||||
* @throws IOException
|
||||
*/
|
||||
private int closeAlltNonCatalogRegions(final MiniHBaseCluster cluster,
|
||||
final MiniHBaseCluster.MiniHBaseClusterRegionServer hrs)
|
||||
throws IOException {
|
||||
int countOfRegions = 0;
|
||||
for (HRegion r: hrs.getOnlineRegions()) {
|
||||
if (r.getRegionInfo().isMetaRegion()) continue;
|
||||
cluster.addMessageToSendRegionServer(hrs,
|
||||
new HMsg(HMsg.Type.MSG_REGION_CLOSE, r.getRegionInfo()));
|
||||
LOG.info("Sent close of " + r.getRegionInfo().getRegionNameAsString() +
|
||||
" on " + hrs.toString());
|
||||
countOfRegions++;
|
||||
}
|
||||
return countOfRegions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Listener for regionserver events testing hbase-2428 (Infinite loop of
|
||||
* region closes if META region is offline). In particular, listen
|
||||
@ -167,6 +322,11 @@ public class TestMasterTransistions {
|
||||
int getCloseCount() {
|
||||
return this.closeCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean process(HServerInfo serverInfo, HMsg incomingMsg) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -211,24 +371,19 @@ public class TestMasterTransistions {
|
||||
assertTrue(listener.getCloseCount() <
|
||||
((HBase2428Listener.SERVER_DURATION/HBase2428Listener.CLOSE_DURATION) * 2));
|
||||
|
||||
assertClosedRegionIsBackOnline(hri);
|
||||
// Assert the closed region came back online
|
||||
assertRegionIsBackOnline(hri);
|
||||
} finally {
|
||||
master.getRegionServerOperationQueue().
|
||||
unregisterRegionServerOperationListener(listener);
|
||||
}
|
||||
}
|
||||
|
||||
private void assertClosedRegionIsBackOnline(final HRegionInfo hri)
|
||||
private void assertRegionIsBackOnline(final HRegionInfo hri)
|
||||
throws IOException {
|
||||
// When we get here, region should be successfully deployed. Assert so.
|
||||
// 'aaa' is safe as first row if startkey is EMPTY_BYTE_ARRAY because we
|
||||
// loaded with HBaseTestingUtility#createMultiRegions.
|
||||
byte [] row = Bytes.equals(HConstants.EMPTY_BYTE_ARRAY, hri.getStartKey())?
|
||||
new byte [] {'a', 'a', 'a'}: hri.getStartKey();
|
||||
Put p = new Put(row);
|
||||
p.add(FAMILIES[0], FAMILIES[0], FAMILIES[0]);
|
||||
// Region should have an entry in its startkey because of addRowToEachRegion.
|
||||
byte [] row = getStartKey(hri);
|
||||
HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
|
||||
t.put(p);
|
||||
Get g = new Get(row);
|
||||
assertTrue((t.get(g)).size() > 0);
|
||||
}
|
||||
@ -256,8 +411,81 @@ public class TestMasterTransistions {
|
||||
rows++;
|
||||
}
|
||||
s.close();
|
||||
// If I got to hear and all rows have a Server, then all have been assigned.
|
||||
// If I get to here and all rows have a Server, then all have been assigned.
|
||||
if (rows == countOfRegions) break;
|
||||
LOG.info("Found=" + rows);
|
||||
Threads.sleep(1000);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* @return Count of regions in meta table.
|
||||
* @throws IOException
|
||||
*/
|
||||
private static int countOfMetaRegions()
|
||||
throws IOException {
|
||||
HTable meta = new HTable(TEST_UTIL.getConfiguration(),
|
||||
HConstants.META_TABLE_NAME);
|
||||
int rows = 0;
|
||||
Scan scan = new Scan();
|
||||
scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
|
||||
ResultScanner s = meta.getScanner(scan);
|
||||
for (Result r = null; (r = s.next()) != null;) {
|
||||
byte [] b =
|
||||
r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER);
|
||||
if (b == null || b.length <= 0) break;
|
||||
rows++;
|
||||
}
|
||||
s.close();
|
||||
return rows;
|
||||
}
|
||||
|
||||
/*
|
||||
* Add to each of the regions in .META. a value. Key is the startrow of the
|
||||
* region (except its 'aaa' for first region). Actual value is the row name.
|
||||
* @param expected
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
private static int addToEachStartKey(final int expected) throws IOException {
|
||||
HTable t = new HTable(TEST_UTIL.getConfiguration(), TABLENAME);
|
||||
HTable meta = new HTable(TEST_UTIL.getConfiguration(),
|
||||
HConstants.META_TABLE_NAME);
|
||||
int rows = 0;
|
||||
Scan scan = new Scan();
|
||||
scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
|
||||
ResultScanner s = meta.getScanner(scan);
|
||||
for (Result r = null; (r = s.next()) != null;) {
|
||||
byte [] b =
|
||||
r.getValue(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
|
||||
if (b == null || b.length <= 0) break;
|
||||
HRegionInfo hri = Writables.getHRegionInfo(b);
|
||||
// If start key, add 'aaa'.
|
||||
byte [] row = getStartKey(hri);
|
||||
Put p = new Put(row);
|
||||
p.add(getTestFamily(), getTestQualifier(), row);
|
||||
t.put(p);
|
||||
rows++;
|
||||
}
|
||||
s.close();
|
||||
Assert.assertEquals(expected, rows);
|
||||
return rows;
|
||||
}
|
||||
|
||||
/*
|
||||
* @param hri
|
||||
* @return Start key for hri (If start key is '', then return 'aaa'.
|
||||
*/
|
||||
private static byte [] getStartKey(final HRegionInfo hri) {
|
||||
return Bytes.equals(HConstants.EMPTY_START_ROW, hri.getStartKey())?
|
||||
Bytes.toBytes("aaa"): hri.getStartKey();
|
||||
}
|
||||
|
||||
private static byte [] getTestFamily() {
|
||||
return FAMILIES[0];
|
||||
}
|
||||
|
||||
private static byte [] getTestQualifier() {
|
||||
return getTestFamily();
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user