HBASE-5443 Convert admin protocol of HRegionInterface to PB

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1329358 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-04-23 18:12:16 +00:00
parent e2cd675020
commit f8c7f1b0fb
43 changed files with 2278 additions and 1400 deletions

View File

@ -212,12 +212,26 @@ public class SecureRpcEngine implements RpcEngine {
(VersionedProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[] { protocol },
new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout));
try {
long serverVersion = proxy.getProtocolVersion(protocol.getName(),
clientVersion);
if (serverVersion != clientVersion) {
throw new HBaseRPC.VersionMismatch(protocol.getName(), clientVersion,
serverVersion);
}
} catch (Throwable t) {
if (t instanceof UndeclaredThrowableException) {
t = t.getCause();
}
if (t instanceof ServiceException) {
throw ProtobufUtil.getRemoteException((ServiceException)t);
}
if (!(t instanceof IOException)) {
LOG.error("Unexpected throwable object ", t);
throw new IOException(t);
}
throw (IOException)t;
}
return proxy;
}

View File

@ -85,7 +85,7 @@ public abstract class SecureServer extends HBaseServer {
// 3 : Introduce the protocol into the RPC connection header
// 4 : Introduced SASL security layer
public static final byte CURRENT_VERSION = 4;
public static final Set<Byte> INSECURE_VERSIONS = ImmutableSet.of((byte) 3);
public static final Set<Byte> INSECURE_VERSIONS = ImmutableSet.of((byte) 5);
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.SecureServer");
private static final Log AUDITLOG =

View File

@ -34,11 +34,13 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MetaNodeTracker;
import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
@ -340,7 +342,7 @@ public class CatalogTracker {
* @throws IOException
* @deprecated Use #getRootServerConnection(long)
*/
public HRegionInterface waitForRootServerConnection(long timeout)
public AdminProtocol waitForRootServerConnection(long timeout)
throws InterruptedException, NotAllMetaRegionsOnlineException, IOException {
return getRootServerConnection(timeout);
}
@ -356,7 +358,7 @@ public class CatalogTracker {
* @throws NotAllMetaRegionsOnlineException if timed out waiting
* @throws IOException
*/
HRegionInterface getRootServerConnection(long timeout)
AdminProtocol getRootServerConnection(long timeout)
throws InterruptedException, NotAllMetaRegionsOnlineException, IOException {
return getCachedConnection(waitForRoot(timeout));
}
@ -370,7 +372,7 @@ public class CatalogTracker {
* @throws IOException
* @deprecated Use #getRootServerConnection(long)
*/
public HRegionInterface waitForRootServerConnectionDefault()
public AdminProtocol waitForRootServerConnectionDefault()
throws NotAllMetaRegionsOnlineException, IOException {
try {
return getRootServerConnection(this.defaultTimeout);
@ -395,11 +397,11 @@ public class CatalogTracker {
* @throws IOException
* @throws InterruptedException
*/
private HRegionInterface getMetaServerConnection()
private AdminProtocol getMetaServerConnection()
throws IOException, InterruptedException {
synchronized (metaAvailable) {
if (metaAvailable.get()) {
HRegionInterface current = getCachedConnection(this.metaLocation);
AdminProtocol current = getCachedConnection(this.metaLocation);
// If we are to refresh, verify we have a good connection by making
// an invocation on it.
if (verifyRegionLocation(current, this.metaLocation, META_REGION_NAME)) {
@ -416,7 +418,7 @@ public class CatalogTracker {
ServerName newLocation = MetaReader.getMetaRegionLocation(this);
if (newLocation == null) return null;
HRegionInterface newConnection = getCachedConnection(newLocation);
AdminProtocol newConnection = getCachedConnection(newLocation);
if (verifyRegionLocation(newConnection, newLocation, META_REGION_NAME)) {
setMetaLocation(newLocation);
return newConnection;
@ -495,7 +497,7 @@ public class CatalogTracker {
* @throws IOException
* @deprecated Does not retry; use an HTable instance instead.
*/
public HRegionInterface waitForMetaServerConnection(long timeout)
public AdminProtocol waitForMetaServerConnection(long timeout)
throws InterruptedException, NotAllMetaRegionsOnlineException, IOException {
return getCachedConnection(waitForMeta(timeout));
}
@ -510,7 +512,7 @@ public class CatalogTracker {
* @throws IOException
* @deprecated Does not retry; use an HTable instance instead.
*/
public HRegionInterface waitForMetaServerConnectionDefault()
public AdminProtocol waitForMetaServerConnectionDefault()
throws NotAllMetaRegionsOnlineException, IOException {
try {
return getCachedConnection(waitForMeta(defaultTimeout));
@ -546,19 +548,19 @@ public class CatalogTracker {
/**
* @param sn ServerName to get a connection against.
* @return The HRegionInterface we got when we connected to <code>sn</code>
* @return The AdminProtocol we got when we connected to <code>sn</code>
* May have come from cache, may not be good, may have been setup by this
* invocation, or may be null.
* @throws IOException
*/
private HRegionInterface getCachedConnection(ServerName sn)
private AdminProtocol getCachedConnection(ServerName sn)
throws IOException {
if (sn == null) {
return null;
}
HRegionInterface protocol = null;
AdminProtocol protocol = null;
try {
protocol = connection.getHRegionConnection(sn.getHostname(), sn.getPort());
protocol = connection.getAdmin(sn.getHostname(), sn.getPort());
} catch (RetriesExhaustedException e) {
if (e.getCause() != null && e.getCause() instanceof ConnectException) {
// Catch this; presume it means the cached connection has gone bad.
@ -599,11 +601,11 @@ public class CatalogTracker {
* the Interface.
* @throws IOException
*/
// TODO: We should be able to get the ServerName from the HRegionInterface
// TODO: We should be able to get the ServerName from the AdminProtocol
// rather than have to pass it in. Its made awkward by the fact that the
// HRI is likely a proxy against remote server so the getServerName needs
// to be fixed to go to a local method or to a cache before we can do this.
private boolean verifyRegionLocation(HRegionInterface hostingServer,
private boolean verifyRegionLocation(AdminProtocol hostingServer,
final ServerName address, final byte [] regionName)
throws IOException {
if (hostingServer == null) {
@ -613,7 +615,7 @@ public class CatalogTracker {
Throwable t = null;
try {
// Try and get regioninfo from the hosting server.
return hostingServer.getRegionInfo(regionName) != null;
return ProtobufUtil.getRegionInfo(hostingServer, regionName) != null;
} catch (ConnectException e) {
t = e;
} catch (RetriesExhaustedException e) {
@ -647,7 +649,7 @@ public class CatalogTracker {
*/
public boolean verifyRootRegionLocation(final long timeout)
throws InterruptedException, IOException {
HRegionInterface connection = null;
AdminProtocol connection = null;
try {
connection = waitForRootServerConnection(timeout);
} catch (NotAllMetaRegionsOnlineException e) {
@ -672,7 +674,7 @@ public class CatalogTracker {
*/
public boolean verifyMetaRegionLocation(final long timeout)
throws InterruptedException, IOException {
HRegionInterface connection = null;
AdminProtocol connection = null;
try {
connection = waitForMetaServerConnection(timeout);
} catch (NotAllMetaRegionsOnlineException e) {

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.protobuf;
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;

View File

@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.hadoop.hbase.protobuf;
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -53,13 +53,21 @@ import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
@ -71,6 +79,7 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
/**
@ -1092,20 +1101,26 @@ public class HBaseAdmin implements Abortable, Closeable {
*/
public boolean closeRegionWithEncodedRegionName(final String encodedRegionName,
final String serverName) throws IOException {
byte[] encodedRegionNameInBytes = Bytes.toBytes(encodedRegionName);
if (null == serverName || ("").equals(serverName.trim())) {
throw new IllegalArgumentException(
"The servername cannot be null or empty.");
}
ServerName sn = new ServerName(serverName);
HRegionInterface rs = this.connection.getHRegionConnection(
AdminProtocol admin = this.connection.getAdmin(
sn.getHostname(), sn.getPort());
// Close the region without updating zk state.
boolean isRegionClosed = rs.closeRegion(encodedRegionNameInBytes, false);
CloseRegionRequest request =
RequestConverter.buildCloseRegionRequest(encodedRegionName, false);
try {
CloseRegionResponse response = admin.closeRegion(null, request);
boolean isRegionClosed = response.getClosed();
if (false == isRegionClosed) {
LOG.error("Not able to close the region " + encodedRegionName + ".");
}
return isRegionClosed;
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
/**
@ -1117,10 +1132,10 @@ public class HBaseAdmin implements Abortable, Closeable {
*/
public void closeRegion(final ServerName sn, final HRegionInfo hri)
throws IOException {
HRegionInterface rs =
this.connection.getHRegionConnection(sn.getHostname(), sn.getPort());
AdminProtocol admin =
this.connection.getAdmin(sn.getHostname(), sn.getPort());
// Close the region without updating zk state.
rs.closeRegion(hri, false);
ProtobufUtil.closeRegion(admin, hri.getRegionName(), false);
}
/**
@ -1183,9 +1198,15 @@ public class HBaseAdmin implements Abortable, Closeable {
private void flush(final ServerName sn, final HRegionInfo hri)
throws IOException {
HRegionInterface rs =
this.connection.getHRegionConnection(sn.getHostname(), sn.getPort());
rs.flushRegion(hri);
AdminProtocol admin =
this.connection.getAdmin(sn.getHostname(), sn.getPort());
FlushRegionRequest request =
RequestConverter.buildFlushRegionRequest(hri.getRegionName());
try {
admin.flushRegion(null, request);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
/**
@ -1289,9 +1310,15 @@ public class HBaseAdmin implements Abortable, Closeable {
private void compact(final ServerName sn, final HRegionInfo hri,
final boolean major)
throws IOException {
HRegionInterface rs =
this.connection.getHRegionConnection(sn.getHostname(), sn.getPort());
rs.compactRegion(hri, major);
AdminProtocol admin =
this.connection.getAdmin(sn.getHostname(), sn.getPort());
CompactRegionRequest request =
RequestConverter.buildCompactRegionRequest(hri.getRegionName(), major);
try {
admin.compactRegion(null, request);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
/**
@ -1471,9 +1498,15 @@ public class HBaseAdmin implements Abortable, Closeable {
private void split(final ServerName sn, final HRegionInfo hri,
byte[] splitPoint) throws IOException {
HRegionInterface rs =
this.connection.getHRegionConnection(sn.getHostname(), sn.getPort());
rs.splitRegion(hri, splitPoint);
AdminProtocol admin =
this.connection.getAdmin(sn.getHostname(), sn.getPort());
SplitRegionRequest request =
RequestConverter.buildSplitRegionRequest(hri.getRegionName(), splitPoint);
try {
admin.splitRegion(null, request);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
/**
@ -1572,9 +1605,15 @@ public class HBaseAdmin implements Abortable, Closeable {
throws IOException {
String hostname = Addressing.parseHostname(hostnamePort);
int port = Addressing.parsePort(hostnamePort);
HRegionInterface rs =
this.connection.getHRegionConnection(hostname, port);
rs.stop("Called by admin client " + this.connection.toString());
AdminProtocol admin =
this.connection.getAdmin(hostname, port);
StopServerRequest request = RequestConverter.buildStopServerRequest(
"Called by admin client " + this.connection.toString());
try {
admin.stopServer(null, request);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
/**
@ -1715,9 +1754,21 @@ public class HBaseAdmin implements Abortable, Closeable {
public synchronized byte[][] rollHLogWriter(String serverName)
throws IOException, FailedLogCloseException {
ServerName sn = new ServerName(serverName);
HRegionInterface rs = this.connection.getHRegionConnection(
AdminProtocol admin = this.connection.getAdmin(
sn.getHostname(), sn.getPort());
return rs.rollHLogWriter();
RollWALWriterRequest request = RequestConverter.buildRollWALWriterRequest();;
try {
RollWALWriterResponse response = admin.rollWALWriter(null, request);
int regionCount = response.getRegionToFlushCount();
byte[][] regionsToFlush = new byte[regionCount][];
for (int i = 0; i < regionCount; i++) {
ByteString region = response.getRegionToFlush(i);
regionsToFlush[i] = region.toByteArray();
}
return regionsToFlush;
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
public String[] getMasterCoprocessors() {

View File

@ -36,11 +36,11 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
@ -199,17 +199,6 @@ public interface HConnection extends Abortable, Closeable {
public List<HRegionLocation> locateRegions(byte[] tableName)
throws IOException;
/**
* Establishes a connection to the region server at the specified address.
* @param regionServer - the server to connect to
* @return proxy for HRegionServer
* @throws IOException if a remote or network exception occurs
* @deprecated Use {@link #getHRegionConnection(String, int)}
*/
@Deprecated
public HRegionInterface getHRegionConnection(HServerAddress regionServer)
throws IOException;
/**
* Establishes a connection to the region server at the specified address.
* @param hostname RegionServer hostname
@ -218,7 +207,7 @@ public interface HConnection extends Abortable, Closeable {
* @throws IOException if a remote or network exception occurs
*
*/
public HRegionInterface getHRegionConnection(final String hostname, final int port)
public AdminProtocol getAdmin(final String hostname, final int port)
throws IOException;
/**
@ -234,19 +223,6 @@ public interface HConnection extends Abortable, Closeable {
public ClientProtocol getClient(final String hostname, final int port)
throws IOException;
/**
* Establishes a connection to the region server at the specified address.
* @param regionServer - the server to connect to
* @param getMaster - do we check if master is alive
* @return proxy for HRegionServer
* @throws IOException if a remote or network exception occurs
* @deprecated Use {@link #getHRegionConnection(String, int)}
*/
@Deprecated
public HRegionInterface getHRegionConnection(HServerAddress regionServer,
boolean getMaster)
throws IOException;
/**
* Establishes a connection to the region server at the specified address.
* @param hostname RegionServer hostname
@ -255,7 +231,7 @@ public interface HConnection extends Abortable, Closeable {
* @return proxy for HRegionServer
* @throws IOException if a remote or network exception occurs
*/
public HRegionInterface getHRegionConnection(final String hostname,
public AdminProtocol getAdmin(final String hostname,
final int port, boolean getMaster)
throws IOException;

View File

@ -66,20 +66,16 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.ExecRPCInvoker;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
@ -161,6 +157,12 @@ public class HConnectionManager {
/** Default client protocol class name. */
public static final String DEFAULT_CLIENT_PROTOCOL_CLASS = ClientProtocol.class.getName();
/** Parameter name for what admin protocol to use. */
public static final String REGION_PROTOCOL_CLASS = "hbase.adminprotocol.class";
/** Default admin protocol class name. */
public static final String DEFAULT_ADMIN_PROTOCOL_CLASS = AdminProtocol.class.getName();
private static final Log LOG = LogFactory.getLog(HConnectionManager.class);
static {
@ -507,7 +509,7 @@ public class HConnectionManager {
/* Encapsulates connection to zookeeper and regionservers.*/
static class HConnectionImplementation implements HConnection, Closeable {
static final Log LOG = LogFactory.getLog(HConnectionImplementation.class);
private final Class<? extends HRegionInterface> serverInterfaceClass;
private final Class<? extends AdminProtocol> adminClass;
private final Class<? extends ClientProtocol> clientClass;
private final long pause;
private final int numRetries;
@ -535,8 +537,8 @@ public class HConnectionManager {
private final Configuration conf;
// Known region HServerAddress.toString() -> HRegionInterface
// Known region ServerName.toString() -> RegionClient/Admin
private final ConcurrentHashMap<String, Map<String, VersionedProtocol>> servers =
new ConcurrentHashMap<String, Map<String, VersionedProtocol>>();
private final ConcurrentHashMap<String, String> connectionLock =
@ -576,15 +578,15 @@ public class HConnectionManager {
throws ZooKeeperConnectionException {
this.conf = conf;
this.managed = managed;
String serverClassName = conf.get(HConstants.REGION_SERVER_CLASS,
HConstants.DEFAULT_REGION_SERVER_CLASS);
String adminClassName = conf.get(REGION_PROTOCOL_CLASS,
DEFAULT_ADMIN_PROTOCOL_CLASS);
this.closed = false;
try {
this.serverInterfaceClass =
(Class<? extends HRegionInterface>) Class.forName(serverClassName);
this.adminClass =
(Class<? extends AdminProtocol>) Class.forName(adminClassName);
} catch (ClassNotFoundException e) {
throw new UnsupportedOperationException(
"Unable to find region server interface " + serverClassName, e);
"Unable to find region server interface " + adminClassName, e);
}
String clientClassName = conf.get(CLIENT_PROTOCOL_CLASS,
DEFAULT_CLIENT_PROTOCOL_CLASS);
@ -730,9 +732,6 @@ public class HConnectionManager {
return getKeepAliveMaster();
} catch (MasterNotRunningException e) {
throw e;
} catch (IOException e) {
throw new ZooKeeperConnectionException(
"Can't create a connection to master", e);
}
}
}
@ -1057,8 +1056,8 @@ public class HConnectionManager {
metaLocation = locateRegion(parentTable, metaKey);
// If null still, go around again.
if (metaLocation == null) continue;
HRegionInterface server =
getHRegionConnection(metaLocation.getHostname(), metaLocation.getPort());
ClientProtocol server =
getClient(metaLocation.getHostname(), metaLocation.getPort());
Result regionInfoRow = null;
// This block guards against two threads trying to load the meta
@ -1086,7 +1085,7 @@ public class HConnectionManager {
}
// Query the root or meta region for the location of the meta region
regionInfoRow = server.getClosestRowBefore(
regionInfoRow = ProtobufUtil.getRowOrBefore(server,
metaLocation.getRegionInfo().getRegionName(), metaKey,
HConstants.CATALOG_FAMILY);
}
@ -1340,17 +1339,9 @@ public class HConnectionManager {
}
@Override
@Deprecated
public HRegionInterface getHRegionConnection(HServerAddress hsa)
throws IOException {
return getHRegionConnection(hsa, false);
}
@Override
public HRegionInterface getHRegionConnection(final String hostname,
final int port)
throws IOException {
return getHRegionConnection(hostname, port, false);
public AdminProtocol getAdmin(final String hostname,
final int port) throws IOException {
return getAdmin(hostname, port, false);
}
@Override
@ -1361,21 +1352,10 @@ public class HConnectionManager {
}
@Override
@Deprecated
public HRegionInterface getHRegionConnection(HServerAddress hsa,
boolean master)
throws IOException {
String hostname = hsa.getInetSocketAddress().getHostName();
int port = hsa.getInetSocketAddress().getPort();
return getHRegionConnection(hostname, port, master);
}
@Override
public HRegionInterface getHRegionConnection(final String hostname,
final int port, final boolean master)
throws IOException {
return (HRegionInterface)getProtocol(hostname, port,
serverInterfaceClass, HRegionInterface.VERSION);
public AdminProtocol getAdmin(final String hostname,
final int port, final boolean master) throws IOException {
return (AdminProtocol)getProtocol(hostname, port,
adminClass, AdminProtocol.VERSION);
}
/**
@ -1591,11 +1571,19 @@ public class HConnectionManager {
}catch (InvocationTargetException e){
// We will have this for all the exception, checked on not, sent
// by any layer, including the functional exception
if (e.getCause () == null){
Throwable cause = e.getCause();
if (cause == null){
throw new RuntimeException(
"Proxy invocation failed and getCause is null", e);
}
throw e.getCause();
if (cause instanceof UndeclaredThrowableException) {
cause = cause.getCause();
}
if (cause instanceof ServiceException) {
ServiceException se = (ServiceException)cause;
cause = ProtobufUtil.getRemoteException(se);
}
throw cause;
}
}
}
@ -1715,38 +1703,7 @@ public class HConnectionManager {
ServerCallable<MultiResponse> callable =
new ServerCallable<MultiResponse>(connection, tableName, null) {
public MultiResponse call() throws IOException {
try {
MultiResponse response = new MultiResponse();
for (Map.Entry<byte[], List<Action<R>>> e: multi.actions.entrySet()) {
byte[] regionName = e.getKey();
int rowMutations = 0;
List<Action<R>> actions = e.getValue();
for (Action<R> action: actions) {
Row row = action.getAction();
if (row instanceof RowMutations) {
MultiRequest request =
RequestConverter.buildMultiRequest(regionName, (RowMutations)row);
server.multi(null, request);
response.add(regionName, action.getOriginalIndex(), new Result());
rowMutations++;
}
}
if (actions.size() > rowMutations) {
MultiRequest request =
RequestConverter.buildMultiRequest(regionName, actions);
ClientProtos.MultiResponse
proto = server.multi(null, request);
List<Object> results = ResponseConverter.getResults(proto);
for (int i = 0, n = results.size(); i < n; i++) {
int originalIndex = actions.get(i).getOriginalIndex();
response.add(regionName, originalIndex, results.get(i));
}
}
}
return response;
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
return ProtobufUtil.multi(server, multi);
}
@Override
public void connect(boolean reload) throws IOException {

View File

@ -662,15 +662,8 @@ public class HTable implements HTableInterface {
throws IOException {
return new ServerCallable<Result>(connection, tableName, row, operationTimeout) {
public Result call() throws IOException {
try {
GetRequest request = RequestConverter.buildGetRequest(
location.getRegionInfo().getRegionName(), row, family, true);
GetResponse response = server.get(null, request);
if (!response.hasResult()) return null;
return ProtobufUtil.toResult(response.getResult());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
return ProtobufUtil.getRowOrBefore(server,
location.getRegionInfo().getRegionName(), row, family);
}
}.withRetries();
}
@ -715,14 +708,8 @@ public class HTable implements HTableInterface {
public Result get(final Get get) throws IOException {
return new ServerCallable<Result>(connection, tableName, get.getRow(), operationTimeout) {
public Result call() throws IOException {
try {
GetRequest request = RequestConverter.buildGetRequest(
return ProtobufUtil.get(server,
location.getRegionInfo().getRegionName(), get);
GetResponse response = server.get(null, request);
return ProtobufUtil.toResult(response.getResult());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
}.withRetries();
}

View File

@ -34,8 +34,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.ipc.RemoteException;

View File

@ -31,9 +31,6 @@ import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
import org.apache.hadoop.hbase.util.Bytes;
/**
@ -80,12 +77,7 @@ public class ExecRPCInvoker implements InvocationHandler {
new ServerCallable<ExecResult>(connection, table, row) {
public ExecResult call() throws Exception {
byte[] regionName = location.getRegionInfo().getRegionName();
ExecCoprocessorRequest request =
RequestConverter.buildExecCoprocessorRequest(regionName, exec);
ExecCoprocessorResponse response =
server.execCoprocessor(null, request);
Object value = ProtobufUtil.toObject(response.getValue());
return new ExecResult(regionName, value);
return ProtobufUtil.execCoprocessor(server, exec, regionName);
}
};
ExecResult result = callable.withRetries();

View File

@ -32,9 +32,10 @@ import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.protobuf.AdminProtocol;
import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.io.VersionMismatchException;
import org.apache.hadoop.io.VersionedWritable;
@ -50,7 +51,6 @@ public class Invocation extends VersionedWritable implements Configurable {
private long clientVersion;
private int clientMethodsHash;
// For generated protocol classes which don't have VERSION field,
// such as protobuf interfaces.
private static final Map<Class<?>, Long>
@ -59,6 +59,8 @@ public class Invocation extends VersionedWritable implements Configurable {
static {
PROTOCOL_VERSION.put(ClientService.BlockingInterface.class,
Long.valueOf(ClientProtocol.VERSION));
PROTOCOL_VERSION.put(AdminService.BlockingInterface.class,
Long.valueOf(AdminProtocol.VERSION));
}
// For protobuf protocols, which use ServiceException, instead of IOException

View File

@ -28,6 +28,8 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import com.google.protobuf.ServiceException;
/** An RPC implementation. */
@InterfaceAudience.Private
interface RpcEngine {

View File

@ -28,14 +28,18 @@ import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.io.*;
import java.util.HashSet;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;
import javax.net.SocketFactory;
import org.apache.commons.logging.*;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.Operation;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
@ -62,6 +66,15 @@ class WritableRpcEngine implements RpcEngine {
// DEBUG log level does NOT emit RPC-level logging.
private static final Log LOG = LogFactory.getLog("org.apache.hadoop.ipc.RPCEngine");
// For protobuf protocols, which use ServiceException, instead of IOException
protected static final Set<Class<?>>
PROTOBUF_PROTOCOLS = new HashSet<Class<?>>();
static {
PROTOBUF_PROTOCOLS.add(ClientProtocol.class);
PROTOBUF_PROTOCOLS.add(AdminProtocol.class);
}
/* Cache a client using its socket factory as the hash key */
static private class ClientCache {
private Map<SocketFactory, HBaseClient> clients =

View File

@ -72,9 +72,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
@ -489,11 +487,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
LOG.debug("Going to connect to server " + location + " for row "
+ Bytes.toStringBinary(row));
byte[] regionName = location.getRegionInfo().getRegionName();
BulkLoadHFileRequest request =
RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName);
BulkLoadHFileResponse response =
server.bulkLoadHFile(null, request);
return response.getLoaded();
return ProtobufUtil.bulkLoadHFile(server, famPaths, regionName);
}
};

View File

@ -2051,16 +2051,14 @@ public class AssignmentManager extends ZooKeeperListener {
// This never happens. Currently regionserver close always return true.
LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
region.getRegionNameAsString());
} catch (NotServingRegionException nsre) {
LOG.info("Server " + server + " returned " + nsre + " for " +
region.getRegionNameAsString());
// Presume that master has stale data. Presume remote side just split.
// Presume that the split message when it comes in will fix up the master's
// in memory cluster state.
} catch (Throwable t) {
if (t instanceof RemoteException) {
t = ((RemoteException)t).unwrapRemoteException();
}
if (t instanceof NotServingRegionException) {
// Presume that master has stale data. Presume remote side just split.
// Presume that the split message when it comes in will fix up the master's
// in memory cluster state.
if (checkIfRegionBelongsToDisabling(region)) {
// Remove from the regionsinTransition map
LOG.info("While trying to recover the table "
@ -2076,15 +2074,13 @@ public class AssignmentManager extends ZooKeeperListener {
}
deleteClosingOrClosedNode(region);
}
}
} else if (t instanceof RegionAlreadyInTransitionException) {
// RS is already processing this region, only need to update the timestamp
if (t instanceof RegionAlreadyInTransitionException) {
LOG.debug("update " + state + " the timestamp.");
state.update(state.getState());
}
}
LOG.info("Server " + server + " returned " + t + " for " +
region.getEncodedName());
region.getRegionNameAsString());
// Presume retry or server will expire.
}
}

View File

@ -44,13 +44,14 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.master.handler.MetaServerShutdownHandler;
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
/**
@ -81,8 +82,8 @@ public class ServerManager {
/**
* Map from full server-instance name to the RPC connection for this server.
*/
private final Map<ServerName, HRegionInterface> serverConnections =
new HashMap<ServerName, HRegionInterface>();
private final Map<ServerName, AdminProtocol> serverConnections =
new HashMap<ServerName, AdminProtocol>();
/**
* List of region servers <ServerName> that should not get any more new
@ -476,14 +477,13 @@ public class ServerManager {
public RegionOpeningState sendRegionOpen(final ServerName server,
HRegionInfo region, int versionOfOfflineNode)
throws IOException {
HRegionInterface hri = getServerConnection(server);
if (hri == null) {
AdminProtocol admin = getServerConnection(server);
if (admin == null) {
LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
" failed because no RPC connection found to this server");
return RegionOpeningState.FAILED_OPENING;
}
return (versionOfOfflineNode == -1) ? hri.openRegion(region) : hri
.openRegion(region, versionOfOfflineNode);
return ProtobufUtil.openRegion(admin, region, versionOfOfflineNode);
}
/**
@ -496,13 +496,13 @@ public class ServerManager {
*/
public void sendRegionOpen(ServerName server, List<HRegionInfo> regions)
throws IOException {
HRegionInterface hri = getServerConnection(server);
if (hri == null) {
AdminProtocol admin = getServerConnection(server);
if (admin == null) {
LOG.warn("Attempting to send OPEN RPC to server " + server.toString() +
" failed because no RPC connection found to this server");
return;
}
hri.openRegions(regions);
ProtobufUtil.openRegion(admin, regions);
}
/**
@ -521,14 +521,15 @@ public class ServerManager {
public boolean sendRegionClose(ServerName server, HRegionInfo region,
int versionOfClosingNode) throws IOException {
if (server == null) throw new NullPointerException("Passed server is null");
HRegionInterface hri = getServerConnection(server);
if (hri == null) {
AdminProtocol admin = getServerConnection(server);
if (admin == null) {
throw new IOException("Attempting to send CLOSE RPC to server " +
server.toString() + " for region " +
region.getRegionNameAsString() +
" failed because no RPC connection found to this server");
}
return hri.closeRegion(region, versionOfClosingNode);
return ProtobufUtil.closeRegion(admin, region.getRegionName(),
versionOfClosingNode);
}
/**
@ -538,15 +539,15 @@ public class ServerManager {
* @throws RetriesExhaustedException wrapping a ConnectException if failed
* putting up proxy.
*/
private HRegionInterface getServerConnection(final ServerName sn)
private AdminProtocol getServerConnection(final ServerName sn)
throws IOException {
HRegionInterface hri = this.serverConnections.get(sn);
if (hri == null) {
AdminProtocol admin = this.serverConnections.get(sn.toString());
if (admin == null) {
LOG.debug("New connection to " + sn.toString());
hri = this.connection.getHRegionConnection(sn.getHostname(), sn.getPort());
this.serverConnections.put(sn, hri);
admin = this.connection.getAdmin(sn.getHostname(), sn.getPort());
this.serverConnections.put(sn, admin);
}
return hri;
return admin;
}
/**

View File

@ -39,24 +39,52 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.MultiAction;
import org.apache.hadoop.hbase.client.MultiResponse;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowLock;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UUID;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.WALEdit.FamilyScope;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.WALKey;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue.QualifierValue;
@ -66,10 +94,12 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
@ -217,6 +247,27 @@ public final class ProtobufUtil {
return builder.build();
}
/**
* Convert a protocol buffer ServerName to a ServerName
*
* @param proto the protocol buffer ServerName to convert
* @return the converted ServerName
*/
public static ServerName toServerName(
final HBaseProtos.ServerName proto) {
if (proto == null) return null;
String hostName = proto.getHostName();
long startCode = -1;
int port = -1;
if (proto.hasPort()) {
port = proto.getPort();
}
if (proto.hasStartCode()) {
startCode = proto.getStartCode();
}
return new ServerName(hostName, port, startCode);
}
/**
* Convert a RegionInfo to a HRegionInfo
*
@ -227,6 +278,11 @@ public final class ProtobufUtil {
toRegionInfo(final RegionInfo proto) {
if (proto == null) return null;
byte[] tableName = proto.getTableName().toByteArray();
if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
return HRegionInfo.ROOT_REGIONINFO;
} else if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
return HRegionInfo.FIRST_META_REGIONINFO;
}
long regionId = proto.getRegionId();
byte[] startKey = null;
byte[] endKey = null;
@ -236,9 +292,16 @@ public final class ProtobufUtil {
if (proto.hasEndKey()) {
endKey = proto.getEndKey().toByteArray();
}
return new HRegionInfo(tableName,
startKey, endKey, false, regionId);
boolean split = false;
if (proto.hasSplit()) {
split = proto.getSplit();
}
HRegionInfo hri = new HRegionInfo(tableName,
startKey, endKey, split, regionId);
if (proto.hasOffline()) {
hri.setOffline(proto.getOffline());
}
return hri;
}
/**
@ -259,6 +322,8 @@ public final class ProtobufUtil {
if (info.getEndKey() != null) {
builder.setEndKey(ByteString.copyFrom(info.getEndKey()));
}
builder.setOffline(info.isOffline());
builder.setSplit(info.isSplit());
return builder.build();
}
@ -596,7 +661,7 @@ public final class ProtobufUtil {
toHLogEntries(final List<WALEntry> protoList) {
List<HLog.Entry> entries = new ArrayList<HLog.Entry>();
for (WALEntry entry: protoList) {
WALKey walKey = entry.getWalKey();
WALKey walKey = entry.getKey();
java.util.UUID clusterId = HConstants.DEFAULT_CLUSTER_ID;
if (walKey.hasClusterId()) {
UUID protoUuid = walKey.getClusterId();
@ -608,7 +673,7 @@ public final class ProtobufUtil {
walKey.getWriteTime(), clusterId);
WALEntry.WALEdit walEdit = entry.getEdit();
WALEdit edit = new WALEdit();
for (ByteString keyValue: walEdit.getKeyValueList()) {
for (ByteString keyValue: walEdit.getKeyValueBytesList()) {
edit.add(new KeyValue(keyValue.toByteArray()));
}
if (walEdit.getFamilyScopeCount() > 0) {
@ -721,4 +786,333 @@ public final class ProtobufUtil {
}
return builder.build();
}
// Start helpers for Client
/**
* A helper to invoke a Get using client protocol.
*
* @param client
* @param regionName
* @param get
* @return the result of the Get
* @throws IOException
*/
public static Result get(final ClientProtocol client,
final byte[] regionName, final Get get) throws IOException {
GetRequest request =
RequestConverter.buildGetRequest(regionName, get);
try {
GetResponse response = client.get(null, request);
return toResult(response.getResult());
} catch (ServiceException se) {
throw getRemoteException(se);
}
}
/**
* A helper to get a row of the closet one before using client protocol.
*
* @param client
* @param regionName
* @param row
* @param family
* @return the row or the closestRowBefore if it doesn't exist
* @throws IOException
*/
public static Result getRowOrBefore(final ClientProtocol client,
final byte[] regionName, final byte[] row,
final byte[] family) throws IOException {
GetRequest request =
RequestConverter.buildGetRowOrBeforeRequest(
regionName, row, family);
try {
GetResponse response = client.get(null, request);
if (!response.hasResult()) return null;
return toResult(response.getResult());
} catch (ServiceException se) {
throw getRemoteException(se);
}
}
/**
* A helper to invoke a multi action using client protocol.
*
* @param client
* @param multi
* @return a multi response
* @throws IOException
*/
public static <R> MultiResponse multi(final ClientProtocol client,
final MultiAction<R> multi) throws IOException {
try {
MultiResponse response = new MultiResponse();
for (Map.Entry<byte[], List<Action<R>>> e: multi.actions.entrySet()) {
byte[] regionName = e.getKey();
int rowMutations = 0;
List<Action<R>> actions = e.getValue();
for (Action<R> action: actions) {
Row row = action.getAction();
if (row instanceof RowMutations) {
MultiRequest request =
RequestConverter.buildMultiRequest(regionName, (RowMutations)row);
client.multi(null, request);
response.add(regionName, action.getOriginalIndex(), new Result());
rowMutations++;
}
}
if (actions.size() > rowMutations) {
MultiRequest request =
RequestConverter.buildMultiRequest(regionName, actions);
ClientProtos.MultiResponse
proto = client.multi(null, request);
List<Object> results = ResponseConverter.getResults(proto);
for (int i = 0, n = results.size(); i < n; i++) {
int originalIndex = actions.get(i).getOriginalIndex();
response.add(regionName, originalIndex, results.get(i));
}
}
}
return response;
} catch (ServiceException se) {
throw getRemoteException(se);
}
}
/**
* A helper to bulk load a list of HFiles using client protocol.
*
* @param client
* @param familyPaths
* @param regionName
* @return true if all are loaded
* @throws IOException
*/
public static boolean bulkLoadHFile(final ClientProtocol client,
final List<Pair<byte[], String>> familyPaths,
final byte[] regionName) throws IOException {
BulkLoadHFileRequest request =
RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName);
try {
BulkLoadHFileResponse response =
client.bulkLoadHFile(null, request);
return response.getLoaded();
} catch (ServiceException se) {
throw getRemoteException(se);
}
}
/**
* A helper to exec a coprocessor Exec using client protocol.
*
* @param client
* @param exec
* @param regionName
* @return the exec result
* @throws IOException
*/
public static ExecResult execCoprocessor(final ClientProtocol client,
final Exec exec, final byte[] regionName) throws IOException {
ExecCoprocessorRequest request =
RequestConverter.buildExecCoprocessorRequest(regionName, exec);
try {
ExecCoprocessorResponse response =
client.execCoprocessor(null, request);
Object value = ProtobufUtil.toObject(response.getValue());
return new ExecResult(regionName, value);
} catch (ServiceException se) {
throw getRemoteException(se);
}
}
// End helpers for Client
// Start helpers for Admin
/**
* A helper to retrieve region info given a region name
* using admin protocol.
*
* @param admin
* @param regionName
* @return the retrieved region info
* @throws IOException
*/
public static HRegionInfo getRegionInfo(final AdminProtocol admin,
final byte[] regionName) throws IOException {
try {
GetRegionInfoRequest request =
RequestConverter.buildGetRegionInfoRequest(regionName);
GetRegionInfoResponse response =
admin.getRegionInfo(null, request);
return toRegionInfo(response.getRegionInfo());
} catch (ServiceException se) {
throw getRemoteException(se);
}
}
/**
* A helper to close a region given a region name
* using admin protocol.
*
* @param admin
* @param regionName
* @param transitionInZK
* @throws IOException
*/
public static void closeRegion(final AdminProtocol admin,
final byte[] regionName, final boolean transitionInZK) throws IOException {
CloseRegionRequest closeRegionRequest =
RequestConverter.buildCloseRegionRequest(regionName, transitionInZK);
try {
admin.closeRegion(null, closeRegionRequest);
} catch (ServiceException se) {
throw getRemoteException(se);
}
}
/**
* A helper to close a region given a region name
* using admin protocol.
*
* @param admin
* @param regionName
* @param versionOfClosingNode
* @return true if the region is closed
* @throws IOException
*/
public static boolean closeRegion(final AdminProtocol admin,
final byte[] regionName, final int versionOfClosingNode) throws IOException {
CloseRegionRequest closeRegionRequest =
RequestConverter.buildCloseRegionRequest(regionName, versionOfClosingNode);
try {
CloseRegionResponse response = admin.closeRegion(null, closeRegionRequest);
return ResponseConverter.isClosed(response);
} catch (ServiceException se) {
throw getRemoteException(se);
}
}
/**
* A helper to open a region using admin protocol.
*
* @param admin
* @param region
* @param versionOfOfflineNode
* @return the region opening state
* @throws IOException
*/
public static RegionOpeningState openRegion(final AdminProtocol admin,
final HRegionInfo region, final int versionOfOfflineNode) throws IOException {
OpenRegionRequest request =
RequestConverter.buildOpenRegionRequest(region, versionOfOfflineNode);
try {
OpenRegionResponse response = admin.openRegion(null, request);
return ResponseConverter.getRegionOpeningState(response);
} catch (ServiceException se) {
throw getRemoteException(se);
}
}
/**
* A helper to open a list of regions using admin protocol.
*
* @param admin
* @param regions
* @throws IOException
*/
public static void openRegion(final AdminProtocol admin,
final List<HRegionInfo> regions) throws IOException {
OpenRegionRequest request =
RequestConverter.buildOpenRegionRequest(regions);
try {
admin.openRegion(null, request);
} catch (ServiceException se) {
throw getRemoteException(se);
}
}
/**
* A helper to get the all the online regions on a region
* server using admin protocol.
*
* @param admin
* @return a list of online region info
* @throws IOException
*/
public static List<HRegionInfo> getOnlineRegions(
final AdminProtocol admin) throws IOException {
GetOnlineRegionRequest request = RequestConverter.buildGetOnlineRegionRequest();
List<HRegionInfo> regions = null;
try {
GetOnlineRegionResponse response =
admin.getOnlineRegion(null, request);
regions = new ArrayList<HRegionInfo>();
for (RegionInfo regionInfo: response.getRegionInfoList()) {
regions.add(toRegionInfo(regionInfo));
}
return regions;
} catch (ServiceException se) {
throw getRemoteException(se);
}
}
/**
* A helper to get the info of a region server using admin protocol.
*
* @param admin
* @return the server name
* @throws IOException
*/
public static ServerName getServerInfo(
final AdminProtocol admin) throws IOException {
GetServerInfoRequest request = RequestConverter.buildGetServerInfoRequest();
try {
GetServerInfoResponse response = admin.getServerInfo(null, request);
return toServerName(response.getServerName());
} catch (ServiceException se) {
throw getRemoteException(se);
}
}
/**
* A helper to replicate a list of HLog entries using admin protocol.
*
* @param admin
* @param entries
* @throws IOException
*/
public static void replicateWALEntry(final AdminProtocol admin,
final HLog.Entry[] entries) throws IOException {
ReplicateWALEntryRequest request =
RequestConverter.buildReplicateWALEntryRequest(entries);
try {
admin.replicateWALEntry(null, request);
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
/**
* A helper to get the list of files of a column family
* on a given region using admin protocol.
*
* @param admin
* @param regionName
* @param family
* @return the list of store files
* @throws IOException
*/
public static List<String> getStoreFiles(final AdminProtocol admin,
final byte[] regionName, final byte[] family) throws IOException {
GetStoreFileRequest request =
RequestConverter.buildGetStoreFileRequest(regionName, family);
try {
GetStoreFileResponse response = admin.getStoreFile(null, request);
return response.getStoreFileList();
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
// End helpers for Admin
}

View File

@ -24,10 +24,12 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.UUID;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.Append;
@ -42,6 +44,23 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.WALEdit.ScopeType;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.WALKey;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.WALEdit.FamilyScope;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
@ -65,6 +84,9 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@ -90,16 +112,15 @@ public final class RequestConverter {
* @param regionName the name of the region to get
* @param row the row to get
* @param family the column family to get
* @param closestRowBefore if the requested row doesn't exist,
* should return the immediate row before
* @return a protocol buffer GetReuqest
*/
public static GetRequest buildGetRequest(final byte[] regionName,
final byte[] row, final byte[] family, boolean closestRowBefore) {
public static GetRequest buildGetRowOrBeforeRequest(
final byte[] regionName, final byte[] row, final byte[] family) {
GetRequest.Builder builder = GetRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setClosestRowBefore(closestRowBefore);
builder.setClosestRowBefore(true);
builder.setRegion(region);
Column.Builder columnBuilder = Column.newBuilder();
@ -542,6 +563,294 @@ public final class RequestConverter {
}
// End utilities for Client
//Start utilities for Admin
/**
* Create a protocol buffer GetRegionInfoRequest for a given region name
*
* @param regionName the name of the region to get info
* @return a protocol buffer GetRegionInfoRequest
*/
public static GetRegionInfoRequest
buildGetRegionInfoRequest(final byte[] regionName) {
GetRegionInfoRequest.Builder builder = GetRegionInfoRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
return builder.build();
}
/**
* Create a protocol buffer GetStoreFileRequest for a given region name
*
* @param regionName the name of the region to get info
* @param family the family to get store file list
* @return a protocol buffer GetStoreFileRequest
*/
public static GetStoreFileRequest
buildGetStoreFileRequest(final byte[] regionName, final byte[] family) {
GetStoreFileRequest.Builder builder = GetStoreFileRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.addFamily(ByteString.copyFrom(family));
return builder.build();
}
/**
* Create a protocol buffer GetOnlineRegionRequest
*
* @return a protocol buffer GetOnlineRegionRequest
*/
public static GetOnlineRegionRequest buildGetOnlineRegionRequest() {
return GetOnlineRegionRequest.newBuilder().build();
}
/**
* Create a protocol buffer FlushRegionRequest for a given region name
*
* @param regionName the name of the region to get info
* @return a protocol buffer FlushRegionRequest
*/
public static FlushRegionRequest
buildFlushRegionRequest(final byte[] regionName) {
FlushRegionRequest.Builder builder = FlushRegionRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
return builder.build();
}
/**
* Create a protocol buffer OpenRegionRequest to open a list of regions
*
* @param regions the list of regions to open
* @return a protocol buffer OpenRegionRequest
*/
public static OpenRegionRequest
buildOpenRegionRequest(final List<HRegionInfo> regions) {
OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
for (HRegionInfo region: regions) {
builder.addRegion(ProtobufUtil.toRegionInfo(region));
}
return builder.build();
}
/**
* Create a protocol buffer OpenRegionRequest for a given region
*
* @param region the region to open
* @return a protocol buffer OpenRegionRequest
*/
public static OpenRegionRequest
buildOpenRegionRequest(final HRegionInfo region) {
return buildOpenRegionRequest(region, -1);
}
/**
* Create a protocol buffer OpenRegionRequest for a given region
*
* @param region the region to open
* @param versionOfOfflineNode that needs to be present in the offline node
* @return a protocol buffer OpenRegionRequest
*/
public static OpenRegionRequest buildOpenRegionRequest(
final HRegionInfo region, final int versionOfOfflineNode) {
OpenRegionRequest.Builder builder = OpenRegionRequest.newBuilder();
builder.addRegion(ProtobufUtil.toRegionInfo(region));
if (versionOfOfflineNode >= 0) {
builder.setVersionOfOfflineNode(versionOfOfflineNode);
}
return builder.build();
}
/**
* Create a CloseRegionRequest for a given region name
*
* @param regionName the name of the region to close
* @param transitionInZK indicator if to transition in ZK
* @return a CloseRegionRequest
*/
public static CloseRegionRequest buildCloseRegionRequest(
final byte[] regionName, final boolean transitionInZK) {
CloseRegionRequest.Builder builder = CloseRegionRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.setTransitionInZK(transitionInZK);
return builder.build();
}
/**
* Create a CloseRegionRequest for a given region name
*
* @param regionName the name of the region to close
* @param versionOfClosingNode
* the version of znode to compare when RS transitions the znode from
* CLOSING state.
* @return a CloseRegionRequest
*/
public static CloseRegionRequest buildCloseRegionRequest(
final byte[] regionName, final int versionOfClosingNode) {
CloseRegionRequest.Builder builder = CloseRegionRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.setVersionOfClosingNode(versionOfClosingNode);
return builder.build();
}
/**
* Create a CloseRegionRequest for a given encoded region name
*
* @param encodedRegionName the name of the region to close
* @param transitionInZK indicator if to transition in ZK
* @return a CloseRegionRequest
*/
public static CloseRegionRequest
buildCloseRegionRequest(final String encodedRegionName,
final boolean transitionInZK) {
CloseRegionRequest.Builder builder = CloseRegionRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.ENCODED_REGION_NAME,
Bytes.toBytes(encodedRegionName));
builder.setRegion(region);
builder.setTransitionInZK(transitionInZK);
return builder.build();
}
/**
* Create a SplitRegionRequest for a given region name
*
* @param regionName the name of the region to split
* @param splitPoint the split point
* @return a SplitRegionRequest
*/
public static SplitRegionRequest buildSplitRegionRequest(
final byte[] regionName, final byte[] splitPoint) {
SplitRegionRequest.Builder builder = SplitRegionRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
if (splitPoint != null) {
builder.setSplitPoint(ByteString.copyFrom(splitPoint));
}
return builder.build();
}
/**
* Create a CompactRegionRequest for a given region name
*
* @param regionName the name of the region to get info
* @param major indicator if it is a major compaction
* @return a CompactRegionRequest
*/
public static CompactRegionRequest buildCompactRegionRequest(
final byte[] regionName, final boolean major) {
CompactRegionRequest.Builder builder = CompactRegionRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
builder.setRegion(region);
builder.setMajor(major);
return builder.build();
}
/**
* Create a new ReplicateWALEntryRequest from a list of HLog entries
*
* @param entries the HLog entries to be replicated
* @return a ReplicateWALEntryRequest
*/
public static ReplicateWALEntryRequest
buildReplicateWALEntryRequest(final HLog.Entry[] entries) {
FamilyScope.Builder scopeBuilder = FamilyScope.newBuilder();
WALEntry.Builder entryBuilder = WALEntry.newBuilder();
ReplicateWALEntryRequest.Builder builder =
ReplicateWALEntryRequest.newBuilder();
for (HLog.Entry entry: entries) {
entryBuilder.clear();
WALKey.Builder keyBuilder = entryBuilder.getKeyBuilder();
HLogKey key = entry.getKey();
keyBuilder.setEncodedRegionName(
ByteString.copyFrom(key.getEncodedRegionName()));
keyBuilder.setTableName(ByteString.copyFrom(key.getTablename()));
keyBuilder.setLogSequenceNumber(key.getLogSeqNum());
keyBuilder.setWriteTime(key.getWriteTime());
UUID clusterId = key.getClusterId();
if (clusterId != null) {
AdminProtos.UUID.Builder uuidBuilder = keyBuilder.getClusterIdBuilder();
uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
}
WALEdit edit = entry.getEdit();
WALEntry.WALEdit.Builder editBuilder = entryBuilder.getEditBuilder();
NavigableMap<byte[], Integer> scopes = edit.getScopes();
if (scopes != null && !scopes.isEmpty()) {
for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) {
scopeBuilder.setFamily(ByteString.copyFrom(scope.getKey()));
ScopeType scopeType = ScopeType.valueOf(scope.getValue().intValue());
scopeBuilder.setScopeType(scopeType);
editBuilder.addFamilyScope(scopeBuilder.build());
}
}
List<KeyValue> keyValues = edit.getKeyValues();
for (KeyValue value: keyValues) {
editBuilder.addKeyValueBytes(ByteString.copyFrom(
value.getBuffer(), value.getOffset(), value.getLength()));
}
builder.addEntry(entryBuilder.build());
}
return builder.build();
}
/**
* Create a new RollWALWriterRequest
*
* @return a ReplicateWALEntryRequest
*/
public static RollWALWriterRequest buildRollWALWriterRequest() {
RollWALWriterRequest.Builder builder = RollWALWriterRequest.newBuilder();
return builder.build();
}
/**
* Create a new GetServerInfoRequest
*
* @return a GetServerInfoRequest
*/
public static GetServerInfoRequest buildGetServerInfoRequest() {
GetServerInfoRequest.Builder builder = GetServerInfoRequest.newBuilder();
return builder.build();
}
/**
* Create a new StopServerRequest
*
* @param reason the reason to stop the server
* @return a StopServerRequest
*/
public static StopServerRequest buildStopServerRequest(final String reason) {
StopServerRequest.Builder builder = StopServerRequest.newBuilder();
builder.setReason(reason);
return builder.build();
}
//End utilities for Admin
/**
* Convert a byte array to a protocol buffer RegionSpecifier
*
* @param type the region specifier type
* @param value the region specifier byte array value
* @return a protocol buffer RegionSpecifier
*/
public static RegionSpecifier buildRegionSpecifier(
final RegionSpecifierType type, final byte[] value) {
RegionSpecifier.Builder regionBuilder = RegionSpecifier.newBuilder();
regionBuilder.setValue(ByteString.copyFrom(value));
regionBuilder.setType(type);
return regionBuilder.build();
}
/**
* Create a protocol buffer Condition
@ -744,21 +1053,6 @@ public final class RequestConverter {
return mutateBuilder.build();
}
/**
* Convert a byte array to a protocol buffer RegionSpecifier
*
* @param type the region specifier type
* @param value the region specifier byte array value
* @return a protocol buffer RegionSpecifier
*/
private static RegionSpecifier buildRegionSpecifier(
final RegionSpecifierType type, final byte[] value) {
RegionSpecifier.Builder regionBuilder = RegionSpecifier.newBuilder();
regionBuilder.setValue(ByteString.copyFrom(value));
regionBuilder.setType(type);
return regionBuilder.build();
}
/**
* Convert a delete KeyValue type to protocol buffer DeleteType.
*

View File

@ -24,16 +24,15 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.util.StringUtils;
@ -146,18 +145,6 @@ public final class ResponseConverter {
return regionInfos;
}
/**
* Get the region info from a GetRegionInfoResponse
*
* @param proto the GetRegionInfoResponse
* @return the region info
*/
public static HRegionInfo getRegionInfo
(final GetRegionInfoResponse proto) {
if (proto == null || proto.getRegionInfo() == null) return null;
return ProtobufUtil.toRegionInfo(proto.getRegionInfo());
}
/**
* Get the region opening state from a OpenRegionResponse
*

View File

@ -74,7 +74,6 @@ import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.UnknownRowLockException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.YouAreDeadException;
@ -82,7 +81,9 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Action;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnectionManager;
@ -117,11 +118,7 @@ import org.apache.hadoop.hbase.ipc.Invocation;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
@ -185,12 +182,6 @@ public class HRegionServer extends RegionServer
private boolean useHBaseChecksum; // verify hbase checksums?
private Path rootDir;
//RegionName vs current action in progress
//true - if open region action in progress
//false - if close region action in progress
private final ConcurrentSkipListMap<byte[], Boolean> regionsInTransitionInRS =
new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
final int numRetries;
@ -228,9 +219,6 @@ public class HRegionServer extends RegionServer
@SuppressWarnings("unused")
private RegionServerDynamicMetrics dynamicMetrics;
// Compactions
public CompactSplitThread compactSplitThread;
/*
* Check for compactions requests.
*/
@ -250,9 +238,6 @@ public class HRegionServer extends RegionServer
// master address manager and watcher
private MasterAddressTracker masterAddressManager;
// catalog tracker
private CatalogTracker catalogTracker;
// Cluster Status Tracker
private ClusterStatusTracker clusterStatusTracker;
@ -264,14 +249,6 @@ public class HRegionServer extends RegionServer
private final int rpcTimeout;
// Instance of the hbase executor service.
private ExecutorService service;
@SuppressWarnings("unused")
// Replication services. If no replication, this handler will be null.
private ReplicationSourceService replicationSourceHandler;
private ReplicationSinkService replicationSinkHandler;
private final RegionServerAccounting regionServerAccounting;
// Cache configuration and block cache reference
@ -296,18 +273,6 @@ public class HRegionServer extends RegionServer
*/
private final long startcode;
/**
* Go here to get table descriptors.
*/
private TableDescriptors tableDescriptors;
/*
* Strings to be used in forming the exception message for
* RegionsAlreadyInTransitionException.
*/
private static final String OPEN = "OPEN";
private static final String CLOSE = "CLOSE";
/**
* MX Bean for RegionServerInfo
*/
@ -370,7 +335,7 @@ public class HRegionServer extends RegionServer
this.rpcServer = HBaseRPC.getServer(this,
new Class<?>[]{HRegionInterface.class, ClientProtocol.class,
HBaseRPCErrorHandler.class,
AdminProtocol.class, HBaseRPCErrorHandler.class,
OnlineRegions.class},
initialIsa.getHostName(), // BindAddress is IP we got for this server.
initialIsa.getPort(),
@ -2490,19 +2455,6 @@ public class HRegionServer extends RegionServer
return RegionOpeningState.OPENED;
}
private void checkIfRegionInTransition(HRegionInfo region,
String currentAction) throws RegionAlreadyInTransitionException {
byte[] encodedName = region.getEncodedNameAsBytes();
if (this.regionsInTransitionInRS.containsKey(encodedName)) {
boolean openAction = this.regionsInTransitionInRS.get(encodedName);
// The below exception message will be used in master.
throw new RegionAlreadyInTransitionException("Received:" + currentAction +
" for the region:" + region.getRegionNameAsString() +
" ,which we are already trying to " +
(openAction ? OPEN : CLOSE)+ ".");
}
}
@Override
@QosPriority(priority=HIGH_QOS)
public void openRegions(List<HRegionInfo> regions)
@ -2559,54 +2511,6 @@ public class HRegionServer extends RegionServer
return closeRegion(encodedRegionName, false, zk);
}
/**
* @param region Region to close
* @param abort True if we are aborting
* @param zk True if we are to update zk about the region close; if the close
* was orchestrated by master, then update zk. If the close is being run by
* the regionserver because its going down, don't update zk.
* @return True if closed a region.
*/
protected boolean closeRegion(HRegionInfo region, final boolean abort,
final boolean zk) {
return closeRegion(region, abort, zk, -1);
}
/**
* @param region Region to close
* @param abort True if we are aborting
* @param zk True if we are to update zk about the region close; if the close
* was orchestrated by master, then update zk. If the close is being run by
* the regionserver because its going down, don't update zk.
* @param versionOfClosingNode
* the version of znode to compare when RS transitions the znode from
* CLOSING state.
* @return True if closed a region.
*/
protected boolean closeRegion(HRegionInfo region, final boolean abort,
final boolean zk, final int versionOfClosingNode) {
if (this.regionsInTransitionInRS.containsKey(region.getEncodedNameAsBytes())) {
LOG.warn("Received close for region we are already opening or closing; " +
region.getEncodedName());
return false;
}
this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), false);
CloseRegionHandler crh = null;
if (region.isRootRegion()) {
crh = new CloseRootHandler(this, this, region, abort, zk,
versionOfClosingNode);
} else if (region.isMetaRegion()) {
crh = new CloseMetaHandler(this, this, region, abort, zk,
versionOfClosingNode);
} else {
crh = new CloseRegionHandler(this, this, region, abort, zk,
versionOfClosingNode);
}
this.service.submit(crh);
return true;
}
/**
* @param encodedRegionName
* encodedregionName to close
@ -2804,13 +2708,6 @@ public class HRegionServer extends RegionServer
return sortedRegions;
}
@Override
public HRegion getFromOnlineRegions(final String encodedRegionName) {
HRegion r = null;
r = this.onlineRegions.get(encodedRegionName);
return r;
}
/** @return the request count */
public AtomicInteger getRequestCount() {
return this.requestCount;
@ -2858,6 +2755,8 @@ public class HRegionServer extends RegionServer
return new ProtocolSignature(HRegionInterface.VERSION, null);
} else if (protocol.equals(ClientProtocol.class.getName())) {
return new ProtocolSignature(ClientProtocol.VERSION, null);
} else if (protocol.equals(AdminProtocol.class.getName())) {
return new ProtocolSignature(AdminProtocol.VERSION, null);
}
throw new IOException("Unknown protocol: " + protocol);
}
@ -2870,6 +2769,8 @@ public class HRegionServer extends RegionServer
return HRegionInterface.VERSION;
} else if (protocol.equals(ClientProtocol.class.getName())) {
return ClientProtocol.VERSION;
} else if (protocol.equals(AdminProtocol.class.getName())) {
return AdminProtocol.VERSION;
}
throw new IOException("Unknown protocol: " + protocol);
}

View File

@ -37,16 +37,11 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.thrift.ThriftServerRunner;
import org.apache.hadoop.hbase.thrift.ThriftUtilities;
import org.apache.hadoop.hbase.thrift.generated.IOError;
import org.apache.hadoop.hbase.thrift.generated.TRowResult;
import com.google.protobuf.ServiceException;
/**
* HRegionThriftServer - this class starts up a Thrift server in the same
* JVM where the RegionServer is running. It inherits most of the
@ -136,10 +131,7 @@ public class HRegionThriftServer extends Thread {
if (columns == null) {
Get get = new Get(row);
get.setTimeRange(Long.MIN_VALUE, timestamp);
GetRequest request =
RequestConverter.buildGetRequest(regionName, get);
GetResponse response = rs.get(null, request);
Result result = ProtobufUtil.toResult(response.getResult());
Result result = ProtobufUtil.get(rs, regionName, get);
return ThriftUtilities.rowResultFromHBase(result);
}
Get get = new Get(row);
@ -152,10 +144,7 @@ public class HRegionThriftServer extends Thread {
}
}
get.setTimeRange(Long.MIN_VALUE, timestamp);
GetRequest request =
RequestConverter.buildGetRequest(regionName, get);
GetResponse response = rs.get(null, request);
Result result = ProtobufUtil.toResult(response.getResult());
Result result = ProtobufUtil.get(rs, regionName, get);
return ThriftUtilities.rowResultFromHBase(result);
} catch (NotServingRegionException e) {
if (!redirect) {
@ -165,10 +154,6 @@ public class HRegionThriftServer extends Thread {
LOG.debug("ThriftServer redirecting getRowWithColumnsTs");
return super.getRowWithColumnsTs(tableName, rowb, columns, timestamp,
attributes);
} catch (ServiceException se) {
IOException e = ProtobufUtil.getRemoteException(se);
LOG.warn(e.getMessage(), e);
throw new IOError(e.getMessage());
} catch (IOException e) {
LOG.warn(e.getMessage(), e);
throw new IOError(e.getMessage());

View File

@ -21,10 +21,14 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@ -33,12 +37,19 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.UnknownRowLockException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
@ -48,13 +59,38 @@ import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ActionResult;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
@ -78,15 +114,24 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.regionserver.HRegionServer.QosPriority;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@ -101,17 +146,45 @@ import com.google.protobuf.ServiceException;
*/
@InterfaceAudience.Private
public abstract class RegionServer implements
ClientProtocol, Runnable, RegionServerServices {
ClientProtocol, AdminProtocol, Runnable, RegionServerServices {
private static final Log LOG = LogFactory.getLog(RegionServer.class);
private final Random rand = new Random();
/*
* Strings to be used in forming the exception message for
* RegionsAlreadyInTransitionException.
*/
protected static final String OPEN = "OPEN";
protected static final String CLOSE = "CLOSE";
//RegionName vs current action in progress
//true - if open region action in progress
//false - if close region action in progress
protected final ConcurrentSkipListMap<byte[], Boolean> regionsInTransitionInRS =
new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
protected long maxScannerResultSize;
// Cache flushing
protected MemStoreFlusher cacheFlusher;
// catalog tracker
protected CatalogTracker catalogTracker;
/**
* Go here to get table descriptors.
*/
protected TableDescriptors tableDescriptors;
// Replication services. If no replication, this handler will be null.
protected ReplicationSourceService replicationSourceHandler;
protected ReplicationSinkService replicationSinkHandler;
// Compactions
public CompactSplitThread compactSplitThread;
final Map<String, RegionScanner> scanners =
new ConcurrentHashMap<String, RegionScanner>();
@ -125,6 +198,9 @@ public abstract class RegionServer implements
// Leases
protected Leases leases;
// Instance of the hbase executor service.
protected ExecutorService service;
// Request counter.
// Do we need this? Can't we just sum region counters? St.Ack 20110412
protected AtomicInteger requestCount = new AtomicInteger();
@ -244,6 +320,67 @@ public abstract class RegionServer implements
}
}
protected void checkIfRegionInTransition(HRegionInfo region,
String currentAction) throws RegionAlreadyInTransitionException {
byte[] encodedName = region.getEncodedNameAsBytes();
if (this.regionsInTransitionInRS.containsKey(encodedName)) {
boolean openAction = this.regionsInTransitionInRS.get(encodedName);
// The below exception message will be used in master.
throw new RegionAlreadyInTransitionException("Received:" + currentAction +
" for the region:" + region.getRegionNameAsString() +
" ,which we are already trying to " +
(openAction ? OPEN : CLOSE)+ ".");
}
}
/**
* @param region Region to close
* @param abort True if we are aborting
* @param zk True if we are to update zk about the region close; if the close
* was orchestrated by master, then update zk. If the close is being run by
* the regionserver because its going down, don't update zk.
* @return True if closed a region.
*/
protected boolean closeRegion(HRegionInfo region, final boolean abort,
final boolean zk) {
return closeRegion(region, abort, zk, -1);
}
/**
* @param region Region to close
* @param abort True if we are aborting
* @param zk True if we are to update zk about the region close; if the close
* was orchestrated by master, then update zk. If the close is being run by
* the regionserver because its going down, don't update zk.
* @param versionOfClosingNode
* the version of znode to compare when RS transitions the znode from
* CLOSING state.
* @return True if closed a region.
*/
protected boolean closeRegion(HRegionInfo region, final boolean abort,
final boolean zk, final int versionOfClosingNode) {
if (this.regionsInTransitionInRS.containsKey(region.getEncodedNameAsBytes())) {
LOG.warn("Received close for region we are already opening or closing; " +
region.getEncodedName());
return false;
}
this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), false);
CloseRegionHandler crh = null;
if (region.isRootRegion()) {
crh = new CloseRootHandler(this, this, region, abort, zk,
versionOfClosingNode);
} else if (region.isMetaRegion()) {
crh = new CloseMetaHandler(this, this, region, abort, zk,
versionOfClosingNode);
} else {
crh = new CloseRegionHandler(this, this, region, abort, zk,
versionOfClosingNode);
}
this.service.submit(crh);
return true;
}
/**
* @param regionName
* @return HRegion for the passed binary <code>regionName</code> or null if
@ -254,6 +391,11 @@ public abstract class RegionServer implements
return this.onlineRegions.get(encodedRegionName);
}
@Override
public HRegion getFromOnlineRegions(final String encodedRegionName) {
return this.onlineRegions.get(encodedRegionName);
}
/**
* Protected utility method for safely obtaining an HRegion handle.
*
@ -1002,6 +1144,352 @@ public abstract class RegionServer implements
}
// End Client methods
// Start Admin methods
@Override
@QosPriority(priority=HIGH_QOS)
public GetRegionInfoResponse getRegionInfo(final RpcController controller,
final GetRegionInfoRequest request) throws ServiceException {
try {
checkOpen();
requestCount.incrementAndGet();
HRegion region = getRegion(request.getRegion());
HRegionInfo info = region.getRegionInfo();
GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
builder.setRegionInfo(ProtobufUtil.toRegionInfo(info));
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
@Override
public GetStoreFileResponse getStoreFile(final RpcController controller,
final GetStoreFileRequest request) throws ServiceException {
try {
HRegion region = getRegion(request.getRegion());
requestCount.incrementAndGet();
Set<byte[]> columnFamilies = null;
if (request.getFamilyCount() == 0) {
columnFamilies = region.getStores().keySet();
} else {
columnFamilies = new HashSet<byte[]>();
for (ByteString cf: request.getFamilyList()) {
columnFamilies.add(cf.toByteArray());
}
}
int nCF = columnFamilies.size();
List<String> fileList = region.getStoreFileList(
columnFamilies.toArray(new byte[nCF][]));
GetStoreFileResponse.Builder builder = GetStoreFileResponse.newBuilder();
builder.addAllStoreFile(fileList);
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
@Override
@QosPriority(priority=HIGH_QOS)
public GetOnlineRegionResponse getOnlineRegion(final RpcController controller,
final GetOnlineRegionRequest request) throws ServiceException {
try {
checkOpen();
requestCount.incrementAndGet();
List<HRegionInfo> list = new ArrayList<HRegionInfo>(onlineRegions.size());
for (Map.Entry<String,HRegion> e: this.onlineRegions.entrySet()) {
list.add(e.getValue().getRegionInfo());
}
Collections.sort(list);
GetOnlineRegionResponse.Builder builder = GetOnlineRegionResponse.newBuilder();
for (HRegionInfo region: list) {
builder.addRegionInfo(ProtobufUtil.toRegionInfo(region));
}
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
// Region open/close direct RPCs
/**
* Open a region on the region server.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority=HIGH_QOS)
public OpenRegionResponse openRegion(final RpcController controller,
final OpenRegionRequest request) throws ServiceException {
int versionOfOfflineNode = -1;
if (request.hasVersionOfOfflineNode()) {
versionOfOfflineNode = request.getVersionOfOfflineNode();
}
try {
checkOpen();
requestCount.incrementAndGet();
OpenRegionResponse.Builder
builder = OpenRegionResponse.newBuilder();
for (RegionInfo regionInfo: request.getRegionList()) {
HRegionInfo region = ProtobufUtil.toRegionInfo(regionInfo);
checkIfRegionInTransition(region, OPEN);
HRegion onlineRegion = getFromOnlineRegions(region.getEncodedName());
if (null != onlineRegion) {
// See HBASE-5094. Cross check with META if still this RS is owning the
// region.
Pair<HRegionInfo, ServerName> p = MetaReader.getRegion(
this.catalogTracker, region.getRegionName());
if (this.getServerName().equals(p.getSecond())) {
LOG.warn("Attempted open of " + region.getEncodedName()
+ " but already online on this server");
builder.addOpeningState(RegionOpeningState.ALREADY_OPENED);
continue;
} else {
LOG.warn("The region " + region.getEncodedName()
+ " is online on this server but META does not have this server.");
removeFromOnlineRegions(region.getEncodedName());
}
}
LOG.info("Received request to open region: " + region.getEncodedName());
this.regionsInTransitionInRS.putIfAbsent(region.getEncodedNameAsBytes(), true);
HTableDescriptor htd = this.tableDescriptors.get(region.getTableName());
// Need to pass the expected version in the constructor.
if (region.isRootRegion()) {
this.service.submit(new OpenRootHandler(this, this, region, htd,
versionOfOfflineNode));
} else if (region.isMetaRegion()) {
this.service.submit(new OpenMetaHandler(this, this, region, htd,
versionOfOfflineNode));
} else {
this.service.submit(new OpenRegionHandler(this, this, region, htd,
versionOfOfflineNode));
}
builder.addOpeningState(RegionOpeningState.OPENED);
}
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
/**
* Close a region on the region server.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority=HIGH_QOS)
public CloseRegionResponse closeRegion(final RpcController controller,
final CloseRegionRequest request) throws ServiceException {
int versionOfClosingNode = -1;
if (request.hasVersionOfClosingNode()) {
versionOfClosingNode = request.getVersionOfClosingNode();
}
boolean zk = request.getTransitionInZK();
try {
checkOpen();
requestCount.incrementAndGet();
HRegion region = getRegion(request.getRegion());
CloseRegionResponse.Builder
builder = CloseRegionResponse.newBuilder();
LOG.info("Received close region: " + region.getRegionNameAsString() +
". Version of ZK closing node:" + versionOfClosingNode);
HRegionInfo regionInfo = region.getRegionInfo();
checkIfRegionInTransition(regionInfo, CLOSE);
boolean closed = closeRegion(
regionInfo, false, zk, versionOfClosingNode);
builder.setClosed(closed);
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
/**
* Flush a region on the region server.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority=HIGH_QOS)
public FlushRegionResponse flushRegion(final RpcController controller,
final FlushRegionRequest request) throws ServiceException {
try {
checkOpen();
requestCount.incrementAndGet();
HRegion region = getRegion(request.getRegion());
LOG.info("Flushing " + region.getRegionNameAsString());
boolean shouldFlush = true;
if (request.hasIfOlderThanTs()) {
shouldFlush = region.getLastFlushTime() < request.getIfOlderThanTs();
}
FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
if (shouldFlush) {
builder.setFlushed(region.flushcache());
}
builder.setLastFlushTime(region.getLastFlushTime());
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
/**
* Split a region on the region server.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority=HIGH_QOS)
public SplitRegionResponse splitRegion(final RpcController controller,
final SplitRegionRequest request) throws ServiceException {
try {
checkOpen();
requestCount.incrementAndGet();
HRegion region = getRegion(request.getRegion());
LOG.info("Splitting " + region.getRegionNameAsString());
region.flushcache();
byte[] splitPoint = null;
if (request.hasSplitPoint()) {
splitPoint = request.getSplitPoint().toByteArray();
}
region.forceSplit(splitPoint);
compactSplitThread.requestSplit(region, region.checkSplit());
return SplitRegionResponse.newBuilder().build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
/**
* Compact a region on the region server.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority=HIGH_QOS)
public CompactRegionResponse compactRegion(final RpcController controller,
final CompactRegionRequest request) throws ServiceException {
try {
checkOpen();
requestCount.incrementAndGet();
HRegion region = getRegion(request.getRegion());
LOG.info("Compacting " + region.getRegionNameAsString());
boolean major = false;
if (request.hasMajor()) {
major = request.getMajor();
}
if (major) {
region.triggerMajorCompaction();
}
compactSplitThread.requestCompaction(region,
"User-triggered " + (major ? "major " : "") + "compaction",
CompactSplitThread.PRIORITY_USER);
return CompactRegionResponse.newBuilder().build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
/**
* Replicate WAL entries on the region server.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
@QosPriority(priority=HIGH_QOS)
public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
final ReplicateWALEntryRequest request) throws ServiceException {
try {
if (replicationSinkHandler != null) {
checkOpen();
requestCount.incrementAndGet();
HLog.Entry[] entries = ProtobufUtil.toHLogEntries(request.getEntryList());
if (entries != null && entries.length > 0) {
replicationSinkHandler.replicateLogEntries(entries);
}
}
return ReplicateWALEntryResponse.newBuilder().build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
/**
* Roll the WAL writer of the region server.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
public RollWALWriterResponse rollWALWriter(final RpcController controller,
final RollWALWriterRequest request) throws ServiceException {
try {
requestCount.incrementAndGet();
HLog wal = this.getWAL();
byte[][] regionsToFlush = wal.rollWriter(true);
RollWALWriterResponse.Builder builder = RollWALWriterResponse.newBuilder();
if (regionsToFlush != null) {
for (byte[] region: regionsToFlush) {
builder.addRegionToFlush(ByteString.copyFrom(region));
}
}
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
/**
* Stop the region server.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
public StopServerResponse stopServer(final RpcController controller,
final StopServerRequest request) throws ServiceException {
requestCount.incrementAndGet();
String reason = request.getReason();
stop(reason);
return StopServerResponse.newBuilder().build();
}
/**
* Get some information of the region server.
*
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
*/
@Override
public GetServerInfoResponse getServerInfo(final RpcController controller,
final GetServerInfoRequest request) throws ServiceException {
ServerName serverName = getServerName();
requestCount.incrementAndGet();
GetServerInfoResponse.Builder builder = GetServerInfoResponse.newBuilder();
builder.setServerName(ProtobufUtil.toServerName(serverName));
return builder.build();
}
// End Admin methods
/**
* Find the HRegion based on a region specifier

View File

@ -48,9 +48,10 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -605,9 +606,10 @@ public class ReplicationSource extends Thread
continue;
}
try {
HRegionInterface rrs = getRS();
AdminProtocol rrs = getRS();
LOG.debug("Replicating " + currentNbEntries);
rrs.replicateLogEntries(Arrays.copyOf(this.entriesArray, currentNbEntries));
ProtobufUtil.replicateWALEntry(rrs,
Arrays.copyOf(this.entriesArray, currentNbEntries));
if (this.lastLoggedPosition != this.position) {
this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.position, queueRecovered);
@ -727,13 +729,13 @@ public class ReplicationSource extends Thread
* @return
* @throws IOException
*/
private HRegionInterface getRS() throws IOException {
private AdminProtocol getRS() throws IOException {
if (this.currentPeers.size() == 0) {
throw new IOException(this.peerClusterZnode + " has 0 region servers");
}
ServerName address =
currentPeers.get(random.nextInt(this.currentPeers.size()));
return this.conn.getHRegionConnection(address.getHostname(), address.getPort());
return this.conn.getAdmin(address.getHostname(), address.getPort());
}
/**
@ -746,9 +748,9 @@ public class ReplicationSource extends Thread
Thread pingThread = new Thread() {
public void run() {
try {
HRegionInterface rrs = getRS();
AdminProtocol rrs = getRS();
// Dummy call which should fail
rrs.getHServerInfo();
ProtobufUtil.getServerInfo(rrs);
latch.countDown();
} catch (IOException ex) {
if (ex instanceof RemoteException) {

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@ -70,8 +71,8 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
@ -2620,11 +2621,11 @@ public class HBaseFsck {
public synchronized void run() {
errors.progress();
try {
HRegionInterface server =
connection.getHRegionConnection(rsinfo.getHostname(), rsinfo.getPort());
AdminProtocol server =
connection.getAdmin(rsinfo.getHostname(), rsinfo.getPort());
// list all online regions from this region server
List<HRegionInfo> regions = server.getOnlineRegions();
List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(server);
if (hbck.checkMetaOnly) {
regions = filterOnlyMetaRegions(regions);
}

View File

@ -34,12 +34,13 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.zookeeper.KeeperException;
@ -149,17 +150,16 @@ public class HBaseFsckRepair {
public static void closeRegionSilentlyAndWait(HBaseAdmin admin,
ServerName server, HRegionInfo region) throws IOException, InterruptedException {
HConnection connection = admin.getConnection();
HRegionInterface rs = connection.getHRegionConnection(server.getHostname(),
server.getPort());
rs.closeRegion(region, false);
AdminProtocol rs = connection.getAdmin(server.getHostname(), server.getPort());
ProtobufUtil.closeRegion(rs, region.getRegionName(), false);
long timeout = admin.getConfiguration()
.getLong("hbase.hbck.close.timeout", 120000);
long expiration = timeout + System.currentTimeMillis();
while (System.currentTimeMillis() < expiration) {
try {
HRegionInfo rsRegion = rs.getRegionInfo(region.getRegionName());
if (rsRegion == null)
return;
HRegionInfo rsRegion =
ProtobufUtil.getRegionInfo(rs, region.getRegionName());
if (rsRegion == null) return;
} catch (IOException ioe) {
return;
}

View File

@ -49,7 +49,7 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SortedCopyOnWriteSet<E> implements SortedSet<E> {
private SortedSet<E> internalSet;
private volatile SortedSet<E> internalSet;
public SortedCopyOnWriteSet() {
this.internalSet = new TreeSet<E>();

View File

@ -38,12 +38,12 @@ message GetRegionInfoResponse {
* Get a list of store files for a set of column families in a particular region.
* If no column family is specified, get the store files for all column families.
*/
message GetStoreFileListRequest {
message GetStoreFileRequest {
required RegionSpecifier region = 1;
repeated bytes columnFamily = 2;
repeated bytes family = 2;
}
message GetStoreFileListResponse {
message GetStoreFileResponse {
repeated string storeFile = 1;
}
@ -55,7 +55,7 @@ message GetOnlineRegionResponse {
}
message OpenRegionRequest {
repeated RegionSpecifier region = 1;
repeated RegionInfo region = 1;
optional uint32 versionOfOfflineNode = 2;
}
@ -133,7 +133,7 @@ message UUID {
// Protocol buffer version of HLog
message WALEntry {
required WALKey walKey = 1;
required WALKey key = 1;
required WALEdit edit = 2;
// Protocol buffer version of HLogKey
@ -146,7 +146,7 @@ message WALEntry {
}
message WALEdit {
repeated bytes keyValue = 1;
repeated bytes keyValueBytes = 1;
repeated FamilyScope familyScope = 2;
enum ScopeType {
@ -168,7 +168,7 @@ message WALEntry {
* hbase.replication has to be set to true for this to work.
*/
message ReplicateWALEntryRequest {
repeated WALEntry walEntry = 1;
repeated WALEntry entry = 1;
}
message ReplicateWALEntryResponse {
@ -201,8 +201,8 @@ service AdminService {
rpc getRegionInfo(GetRegionInfoRequest)
returns(GetRegionInfoResponse);
rpc getStoreFileList(GetStoreFileListRequest)
returns(GetStoreFileListResponse);
rpc getStoreFile(GetStoreFileRequest)
returns(GetStoreFileResponse);
rpc getOnlineRegion(GetOnlineRegionRequest)
returns(GetOnlineRegionResponse);

View File

@ -32,8 +32,18 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
@ -41,7 +51,9 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.util.Bytes;
@ -186,16 +198,19 @@ public class TestCatalogTracker {
@Test
public void testServerNotRunningIOException()
throws IOException, InterruptedException, KeeperException, ServiceException {
// Mock an HRegionInterface.
final HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
// Mock an Admin and a Client.
final AdminProtocol admin = Mockito.mock(AdminProtocol.class);
final ClientProtocol client = Mockito.mock(ClientProtocol.class);
HConnection connection = mockConnection(implementation, client);
HConnection connection = mockConnection(admin, client);
try {
// If a 'getRegionInfo' is called on mocked HRegionInterface, throw IOE
// If a 'getRegionInfo' is called on mocked AdminProtocol, throw IOE
// the first time. 'Succeed' the second time we are called.
Mockito.when(implementation.getRegionInfo((byte[]) Mockito.any())).
thenThrow(new IOException("Server not running, aborting")).
thenReturn(new HRegionInfo());
GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
builder.setRegionInfo(ProtobufUtil.toRegionInfo(new HRegionInfo(Bytes.toBytes("test"))));
Mockito.when(admin.getRegionInfo((RpcController)Mockito.any(),
(GetRegionInfoRequest)Mockito.any())).thenThrow(
new ServiceException(new IOException("Server not running, aborting"))).
thenReturn(builder.build());
// After we encounter the above 'Server not running', we should catch the
// IOE and go into retrying for the meta mode. We'll do gets on -ROOT- to
@ -292,18 +307,19 @@ public class TestCatalogTracker {
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
* @throws ServiceException
*/
@Test
public void testVerifyRootRegionLocationFails()
throws IOException, InterruptedException, KeeperException {
throws IOException, InterruptedException, KeeperException, ServiceException {
HConnection connection = Mockito.mock(HConnection.class);
ConnectException connectException =
new ConnectException("Connection refused");
final HRegionInterface implementation =
Mockito.mock(HRegionInterface.class);
Mockito.when(implementation.getRegionInfo((byte [])Mockito.any())).
thenThrow(connectException);
Mockito.when(connection.getHRegionConnection(Mockito.anyString(),
ServiceException connectException =
new ServiceException(new ConnectException("Connection refused"));
final AdminProtocol implementation =
Mockito.mock(AdminProtocol.class);
Mockito.when(implementation.getRegionInfo((RpcController)Mockito.any(),
(GetRegionInfoRequest)Mockito.any())).thenThrow(connectException);
Mockito.when(connection.getAdmin(Mockito.anyString(),
Mockito.anyInt(), Mockito.anyBoolean())).
thenReturn(implementation);
final CatalogTracker ct = constructAndStartCatalogTracker(connection);
@ -379,11 +395,11 @@ public class TestCatalogTracker {
// that ... and so one.
@Test public void testNoTimeoutWaitForMeta()
throws Exception {
// Mock an HConnection and a HRegionInterface implementation. Have the
// Mock an HConnection and a AdminProtocol implementation. Have the
// HConnection return the HRI. Have the HRI return a few mocked up responses
// to make our test work.
// Mock an HRegionInterface.
final HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
// Mock an AdminProtocol.
final AdminProtocol implementation = Mockito.mock(AdminProtocol.class);
HConnection connection = mockConnection(implementation, null);
try {
// Now the ct is up... set into the mocks some answers that make it look
@ -396,8 +412,10 @@ public class TestCatalogTracker {
// It works for now but has been deprecated.
Mockito.when(connection.getRegionServerWithRetries((ServerCallable<Result>)Mockito.any())).
thenReturn(result);
Mockito.when(implementation.getRegionInfo((byte[]) Mockito.any())).
thenReturn(HRegionInfo.FIRST_META_REGIONINFO);
GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
builder.setRegionInfo(ProtobufUtil.toRegionInfo(HRegionInfo.FIRST_META_REGIONINFO));
Mockito.when(implementation.getRegionInfo((RpcController)Mockito.any(),
(GetRegionInfoRequest)Mockito.any())).thenReturn(builder.build());
final CatalogTracker ct = constructAndStartCatalogTracker(connection);
ServerName hsa = ct.getMetaLocation();
Assert.assertNull(hsa);
@ -430,7 +448,7 @@ public class TestCatalogTracker {
}
/**
* @param implementation An {@link HRegionInterface} instance; you'll likely
* @param admin An {@link AdminProtocol} instance; you'll likely
* want to pass a mocked HRS; can be null.
* @param client A mocked ClientProtocol instance, can be null
* @return Mock up a connection that returns a {@link Configuration} when
@ -443,9 +461,8 @@ public class TestCatalogTracker {
* when done with this mocked Connection.
* @throws IOException
*/
private HConnection mockConnection(
final HRegionInterface implementation, final ClientProtocol client)
throws IOException {
private HConnection mockConnection(final AdminProtocol admin,
final ClientProtocol client) throws IOException {
HConnection connection =
HConnectionTestingUtility.getMockedConnection(UTIL.getConfiguration());
Mockito.doNothing().when(connection).close();
@ -459,10 +476,10 @@ public class TestCatalogTracker {
Mockito.when(connection.locateRegion((byte[]) Mockito.any(),
(byte[]) Mockito.any())).
thenReturn(anyLocation);
if (implementation != null) {
if (admin != null) {
// If a call to getHRegionConnection, return this implementation.
Mockito.when(connection.getHRegionConnection(Mockito.anyString(), Mockito.anyInt())).
thenReturn(implementation);
Mockito.when(connection.getAdmin(Mockito.anyString(), Mockito.anyInt())).
thenReturn(admin);
}
if (client != null) {
// If a call to getClient, return this implementation.

View File

@ -27,12 +27,12 @@ import java.util.NavigableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.util.Bytes;

View File

@ -24,10 +24,11 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.mockito.Mockito;
/**
@ -72,14 +73,14 @@ public class HConnectionTestingUtility {
* connection when done by calling
* {@link HConnectionManager#deleteConnection(Configuration, boolean)} else it
* will stick around; this is probably not what you want.
* @param implementation An {@link HRegionInterface} instance; you'll likely
* want to pass a mocked HRS; can be null.
*
* @param conf Configuration to use
* @param implementation An HRegionInterface; can be null but is usually
* @param admin An AdminProtocol; can be null but is usually
* itself a mock.
* @param client A ClientProtocol; can be null but is usually
* itself a mock.
* @param sn ServerName to include in the region location returned by this
* <code>implementation</code>
* <code>connection</code>
* @param hri HRegionInfo to include in the location returned when
* getRegionLocation is called on the mocked connection
* @return Mock up a connection that returns a {@link Configuration} when
@ -93,7 +94,7 @@ public class HConnectionTestingUtility {
* @throws IOException
*/
public static HConnection getMockedConnectionAndDecorate(final Configuration conf,
final HRegionInterface implementation, final ClientProtocol client,
final AdminProtocol admin, final ClientProtocol client,
final ServerName sn, final HRegionInfo hri)
throws IOException {
HConnection c = HConnectionTestingUtility.getMockedConnection(conf);
@ -105,10 +106,10 @@ public class HConnectionTestingUtility {
thenReturn(loc);
Mockito.when(c.locateRegion((byte[]) Mockito.any(), (byte[]) Mockito.any())).
thenReturn(loc);
if (implementation != null) {
// If a call to getHRegionConnection, return this implementation.
Mockito.when(c.getHRegionConnection(Mockito.anyString(), Mockito.anyInt())).
thenReturn(implementation);
if (admin != null) {
// If a call to getAdmin, return this implementation.
Mockito.when(c.getAdmin(Mockito.anyString(), Mockito.anyInt())).
thenReturn(admin);
}
if (client != null) {
// If a call to getClient, return this client.

View File

@ -1220,8 +1220,12 @@ public class TestAdmin {
if (!regionInfo.isMetaTable()) {
if (regionInfo.getRegionNameAsString().contains("TestHBACloseRegion1")) {
info = regionInfo;
try {
admin.closeRegionWithEncodedRegionName("sample", rs.getServerName()
.getServerName());
} catch (NotServingRegionException nsre) {
// expected, ignore it
}
}
}
}
@ -1320,8 +1324,12 @@ public class TestAdmin {
if (!regionInfo.isMetaTable()) {
if (regionInfo.getRegionNameAsString().contains("TestHBACloseRegion4")) {
info = regionInfo;
try {
admin.closeRegionWithEncodedRegionName(regionInfo
.getRegionNameAsString(), rs.getServerName().getServerName());
} catch (NotServingRegionException nsre) {
// expected, ignore it.
}
}
}
}

View File

@ -22,7 +22,7 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.commons.logging.Log;
@ -32,7 +32,8 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.After;
@ -42,17 +43,12 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists;
@Category(LargeTests.class)
public class TestFromClientSide3 {
final Log LOG = LogFactory.getLog(getClass());
private final static HBaseTestingUtility TEST_UTIL
= new HBaseTestingUtility();
private static byte[] ROW = Bytes.toBytes("testRow");
private static byte[] FAMILY = Bytes.toBytes("testFamily");
private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte[] VALUE = Bytes.toBytes("testValue");
private static Random random = new Random();
private static int SLAVES = 3;
@ -108,19 +104,21 @@ public class TestFromClientSide3 {
HConnection conn = HConnectionManager.getConnection(TEST_UTIL
.getConfiguration());
HRegionLocation loc = table.getRegionLocation(row, true);
HRegionInterface server = conn.getHRegionConnection(loc.getHostname(), loc
AdminProtocol server = conn.getAdmin(loc.getHostname(), loc
.getPort());
byte[] regName = loc.getRegionInfo().getRegionName();
for (int i = 0; i < nFlushes; i++) {
randomCFPuts(table, row, family, nPuts);
int sfCount = server.getStoreFileList(regName, FAMILY).size();
List<String> sf = ProtobufUtil.getStoreFiles(server, regName, FAMILY);
int sfCount = sf.size();
// TODO: replace this api with a synchronous flush after HBASE-2949
admin.flush(table.getTableName());
// synchronously poll wait for a new storefile to appear (flush happened)
while (server.getStoreFileList(regName, FAMILY).size() == sfCount) {
while (ProtobufUtil.getStoreFiles(
server, regName, FAMILY).size() == sfCount) {
Thread.sleep(40);
}
}
@ -154,9 +152,10 @@ public class TestFromClientSide3 {
// Verify we have multiple store files.
HRegionLocation loc = hTable.getRegionLocation(row, true);
byte[] regionName = loc.getRegionInfo().getRegionName();
HRegionInterface server = connection.getHRegionConnection(
AdminProtocol server = connection.getAdmin(
loc.getHostname(), loc.getPort());
assertTrue(server.getStoreFileList(regionName, FAMILY).size() > 1);
assertTrue(ProtobufUtil.getStoreFiles(
server, regionName, FAMILY).size() > 1);
// Issue a compaction request
admin.compact(TABLE);
@ -167,16 +166,17 @@ public class TestFromClientSide3 {
loc = hTable.getRegionLocation(row, true);
if (!loc.getRegionInfo().isOffline()) {
regionName = loc.getRegionInfo().getRegionName();
server = connection.getHRegionConnection(loc.getHostname(), loc
.getPort());
if (server.getStoreFileList(regionName, FAMILY).size() <= 1) {
server = connection.getAdmin(loc.getHostname(), loc.getPort());
if (ProtobufUtil.getStoreFiles(
server, regionName, FAMILY).size() <= 1) {
break;
}
}
Thread.sleep(40);
}
// verify the compactions took place and that we didn't just time out
assertTrue(server.getStoreFileList(regionName, FAMILY).size() <= 1);
assertTrue(ProtobufUtil.getStoreFiles(
server, regionName, FAMILY).size() <= 1);
// change the compaction.min config option for this table to 5
LOG.info("hbase.hstore.compaction.min should now be 5");
@ -198,11 +198,11 @@ public class TestFromClientSide3 {
// This time, the compaction request should not happen
Thread.sleep(10 * 1000);
int sfCount = 0;
loc = hTable.getRegionLocation(row, true);
regionName = loc.getRegionInfo().getRegionName();
server = connection.getHRegionConnection(loc.getHostname(), loc.getPort());
sfCount = server.getStoreFileList(regionName, FAMILY).size();
server = connection.getAdmin(loc.getHostname(), loc.getPort());
int sfCount = ProtobufUtil.getStoreFiles(
server, regionName, FAMILY).size();
assertTrue(sfCount > 1);
// change an individual CF's config option to 2 & online schema update
@ -225,9 +225,10 @@ public class TestFromClientSide3 {
loc = hTable.getRegionLocation(row, true);
regionName = loc.getRegionInfo().getRegionName();
try {
server = connection.getHRegionConnection(loc.getHostname(), loc
server = connection.getAdmin(loc.getHostname(), loc
.getPort());
if (server.getStoreFileList(regionName, FAMILY).size() < sfCount) {
if (ProtobufUtil.getStoreFiles(
server, regionName, FAMILY).size() < sfCount) {
break;
}
} catch (Exception e) {
@ -236,7 +237,8 @@ public class TestFromClientSide3 {
Thread.sleep(40);
}
// verify the compaction took place and that we didn't just time out
assertTrue(server.getStoreFileList(regionName, FAMILY).size() < sfCount);
assertTrue(ProtobufUtil.getStoreFiles(
server, regionName, FAMILY).size() < sfCount);
// Finally, ensure that we can remove a custom config value after we made it
LOG.info("Removing CF config value");

View File

@ -42,7 +42,6 @@ import org.junit.experimental.categories.Category;
public class TestHTableUtil {
final Log LOG = LogFactory.getLog(getClass());
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static byte [] ROW = Bytes.toBytes("testRow");
private static byte [] FAMILY = Bytes.toBytes("testFamily");
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte [] VALUE = Bytes.toBytes("testValue");

View File

@ -42,13 +42,13 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
import org.apache.hadoop.hbase.util.Bytes;
@ -60,6 +60,8 @@ import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import com.google.common.collect.Multimap;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* Test cases for the atomic load error handling of the bulk load functionality.
@ -259,7 +261,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
}
private HConnection getMockedConnection(final Configuration conf)
throws IOException {
throws IOException, ServiceException {
HConnection c = Mockito.mock(HConnection.class);
Mockito.when(c.getConfiguration()).thenReturn(conf);
Mockito.doNothing().when(c).close();
@ -271,10 +273,10 @@ public class TestLoadIncrementalHFilesSplitRecovery {
thenReturn(loc);
Mockito.when(c.locateRegion((byte[]) Mockito.any(), (byte[]) Mockito.any())).
thenReturn(loc);
HRegionInterface hri = Mockito.mock(HRegionInterface.class);
Mockito.when(hri.bulkLoadHFiles(Mockito.anyList(), (byte [])Mockito.any())).
thenThrow(new IOException("injecting bulk load error"));
Mockito.when(c.getHRegionConnection(Mockito.anyString(), Mockito.anyInt())).
ClientProtocol hri = Mockito.mock(ClientProtocol.class);
Mockito.when(hri.bulkLoadHFile((RpcController)Mockito.any(), (BulkLoadHFileRequest)Mockito.any())).
thenThrow(new ServiceException(new IOException("injecting bulk load error")));
Mockito.when(c.getClient(Mockito.anyString(), Mockito.anyInt())).
thenReturn(hri);
return c;
}

View File

@ -27,31 +27,41 @@ import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.MultiAction;
import org.apache.hadoop.hbase.client.MultiResponse;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.Exec;
import org.apache.hadoop.hbase.client.coprocessor.ExecResult;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.io.hfile.BlockCacheColumnFamilySummary;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetServerInfoResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ExecCoprocessorRequest;
@ -70,14 +80,10 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowResponse
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@ -92,7 +98,7 @@ import com.google.protobuf.ServiceException;
* {@link #setGetResult(byte[], byte[], Result)} for how to fill the backing data
* store that the get pulls from.
*/
class MockRegionServer implements HRegionInterface, ClientProtocol, RegionServerServices {
class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerServices {
private final ServerName sn;
private final ZooKeeperWatcher zkw;
private final Configuration conf;
@ -209,138 +215,12 @@ class MockRegionServer implements HRegionInterface, ClientProtocol, RegionServer
return false;
}
@Override
public HRegionInfo getRegionInfo(byte[] regionName) {
// Just return this. Calls to getRegionInfo are usually to test connection
// to regionserver does reasonable things so should be safe to return
// anything.
return HRegionInfo.ROOT_REGIONINFO;
}
@Override
public void flushRegion(byte[] regionName) throws IllegalArgumentException,
IOException {
// TODO Auto-generated method stub
}
@Override
public void flushRegion(byte[] regionName, long ifOlderThanTS)
throws IllegalArgumentException, IOException {
// TODO Auto-generated method stub
}
@Override
public long getLastFlushTime(byte[] regionName) {
// TODO Auto-generated method stub
return 0;
}
@Override
public List<String> getStoreFileList(byte[] regionName, byte[] columnFamily)
throws IllegalArgumentException {
// TODO Auto-generated method stub
return null;
}
@Override
public List<String> getStoreFileList(byte[] regionName,
byte[][] columnFamilies) throws IllegalArgumentException {
// TODO Auto-generated method stub
return null;
}
@Override
public List<String> getStoreFileList(byte[] regionName)
throws IllegalArgumentException {
// TODO Auto-generated method stub
return null;
}
@Override
public Result getClosestRowBefore(byte[] regionName, byte[] row,
byte[] family) throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public Result get(byte[] regionName, Get get) throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public boolean exists(byte[] regionName, Get get) throws IOException {
// TODO Auto-generated method stub
return false;
}
@Override
public void put(byte[] regionName, Put put) throws IOException {
// TODO Auto-generated method stub
}
@Override
public int put(byte[] regionName, List<Put> puts) throws IOException {
// TODO Auto-generated method stub
return 0;
}
@Override
public void delete(byte[] regionName, Delete delete) throws IOException {
// TODO Auto-generated method stub
}
@Override
public int delete(byte[] regionName, List<Delete> deletes)
throws IOException {
// TODO Auto-generated method stub
return 0;
}
@Override
public boolean checkAndPut(byte[] regionName, byte[] row, byte[] family,
byte[] qualifier, byte[] value, Put put) throws IOException {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean checkAndDelete(byte[] regionName, byte[] row, byte[] family,
byte[] qualifier, byte[] value, Delete delete) throws IOException {
// TODO Auto-generated method stub
return false;
}
@Override
public long incrementColumnValue(byte[] regionName, byte[] row,
byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
throws IOException {
// TODO Auto-generated method stub
return 0;
}
@Override
public Result append(byte[] regionName, Append append) throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public Result increment(byte[] regionName, Increment increment)
throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public long openScanner(byte[] regionName, Scan scan) throws IOException {
long scannerId = this.random.nextLong();
this.scannersAndOffsets.put(scannerId, new RegionNameAndIndex(regionName));
return scannerId;
}
@Override
public Result next(long scannerId) throws IOException {
RegionNameAndIndex rnai = this.scannersAndOffsets.get(scannerId);
int index = rnai.getThenIncrement();
@ -349,173 +229,16 @@ class MockRegionServer implements HRegionInterface, ClientProtocol, RegionServer
return index < results.length? results[index]: null;
}
@Override
public Result [] next(long scannerId, int numberOfRows) throws IOException {
// Just return one result whatever they ask for.
Result r = next(scannerId);
return r == null? null: new Result [] {r};
}
@Override
public void close(final long scannerId) throws IOException {
this.scannersAndOffsets.remove(scannerId);
}
@Override
public long lockRow(byte[] regionName, byte[] row) throws IOException {
// TODO Auto-generated method stub
return 0;
}
@Override
public void unlockRow(byte[] regionName, long lockId) throws IOException {
// TODO Auto-generated method stub
}
@Override
public List<HRegionInfo> getOnlineRegions() throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public List<HRegion> getOnlineRegions(byte[] tableName) throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public HServerInfo getHServerInfo() throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public <R> MultiResponse multi(MultiAction<R> multi) throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public boolean bulkLoadHFiles(List<Pair<byte[], String>> familyPaths,
byte[] regionName) throws IOException {
// TODO Auto-generated method stub
return false;
}
@Override
public RegionOpeningState openRegion(HRegionInfo region) throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public RegionOpeningState openRegion(HRegionInfo region,
int versionOfOfflineNode) throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public void openRegions(List<HRegionInfo> regions) throws IOException {
// TODO Auto-generated method stub
}
@Override
public boolean closeRegion(HRegionInfo region) throws IOException {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean closeRegion(HRegionInfo region, int versionOfClosingNode)
throws IOException {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean closeRegion(HRegionInfo region, boolean zk)
throws IOException {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean closeRegion(byte[] encodedRegionName, boolean zk)
throws IOException {
// TODO Auto-generated method stub
return false;
}
@Override
public void flushRegion(HRegionInfo regionInfo)
throws NotServingRegionException, IOException {
// TODO Auto-generated method stub
}
@Override
public void splitRegion(HRegionInfo regionInfo)
throws NotServingRegionException, IOException {
// TODO Auto-generated method stub
}
@Override
public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint)
throws NotServingRegionException, IOException {
// TODO Auto-generated method stub
}
@Override
public void compactRegion(HRegionInfo regionInfo, boolean major)
throws NotServingRegionException, IOException {
// TODO Auto-generated method stub
}
@Override
public void replicateLogEntries(Entry[] entries) throws IOException {
// TODO Auto-generated method stub
}
@Override
public ExecResult execCoprocessor(byte[] regionName, Exec call)
throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public boolean checkAndPut(byte[] regionName, byte[] row, byte[] family,
byte[] qualifier, CompareOp compareOp,
WritableByteArrayComparable comparator, Put put) throws IOException {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean checkAndDelete(byte[] regionName, byte[] row, byte[] family,
byte[] qualifier, CompareOp compareOp,
WritableByteArrayComparable comparator, Delete delete)
throws IOException {
// TODO Auto-generated method stub
return false;
}
@Override
public List<BlockCacheColumnFamilySummary> getBlockCacheColumnFamilySummaries()
throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public byte[][] rollHLogWriter() throws IOException,
FailedLogCloseException {
// TODO Auto-generated method stub
return null;
}
@Override
public void stop(String why) {
this.zkw.close();
@ -612,11 +335,6 @@ class MockRegionServer implements HRegionInterface, ClientProtocol, RegionServer
return null;
}
@Override
public void mutateRow(byte[] regionName, RowMutations rm) throws IOException {
// TODO Auto-generated method stub
}
@Override
public GetResponse get(RpcController controller, GetRequest request)
throws ServiceException {
@ -699,4 +417,95 @@ class MockRegionServer implements HRegionInterface, ClientProtocol, RegionServer
// TODO Auto-generated method stub
return null;
}
@Override
public GetRegionInfoResponse getRegionInfo(RpcController controller,
GetRegionInfoRequest request) throws ServiceException {
GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
builder.setRegionInfo(ProtobufUtil.toRegionInfo(HRegionInfo.ROOT_REGIONINFO));
return builder.build();
}
@Override
public GetStoreFileResponse getStoreFile(RpcController controller,
GetStoreFileRequest request) throws ServiceException {
// TODO Auto-generated method stub
return null;
}
@Override
public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
GetOnlineRegionRequest request) throws ServiceException {
// TODO Auto-generated method stub
return null;
}
@Override
public OpenRegionResponse openRegion(RpcController controller,
OpenRegionRequest request) throws ServiceException {
// TODO Auto-generated method stub
return null;
}
@Override
public CloseRegionResponse closeRegion(RpcController controller,
CloseRegionRequest request) throws ServiceException {
// TODO Auto-generated method stub
return null;
}
@Override
public FlushRegionResponse flushRegion(RpcController controller,
FlushRegionRequest request) throws ServiceException {
// TODO Auto-generated method stub
return null;
}
@Override
public SplitRegionResponse splitRegion(RpcController controller,
SplitRegionRequest request) throws ServiceException {
// TODO Auto-generated method stub
return null;
}
@Override
public CompactRegionResponse compactRegion(RpcController controller,
CompactRegionRequest request) throws ServiceException {
// TODO Auto-generated method stub
return null;
}
@Override
public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
ReplicateWALEntryRequest request) throws ServiceException {
// TODO Auto-generated method stub
return null;
}
@Override
public RollWALWriterResponse rollWALWriter(RpcController controller,
RollWALWriterRequest request) throws ServiceException {
// TODO Auto-generated method stub
return null;
}
@Override
public GetServerInfoResponse getServerInfo(RpcController controller,
GetServerInfoRequest request) throws ServiceException {
// TODO Auto-generated method stub
return null;
}
@Override
public StopServerResponse stopServer(RpcController controller,
StopServerRequest request) throws ServiceException {
// TODO Auto-generated method stub
return null;
}
@Override
public List<HRegion> getOnlineRegions(byte[] tableName) throws IOException {
// TODO Auto-generated method stub
return null;
}
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result;
@ -48,7 +49,6 @@ import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;

View File

@ -47,15 +47,15 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ClientProtocol;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.regionserver.Store;
@ -93,12 +93,12 @@ public class TestCatalogJanitor {
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
// Mock an HConnection and a HRegionInterface implementation. Have the
// Mock an HConnection and a AdminProtocol implementation. Have the
// HConnection return the HRI. Have the HRI return a few mocked up responses
// to make our test work.
this.connection =
HConnectionTestingUtility.getMockedConnectionAndDecorate(this.c,
Mockito.mock(HRegionInterface.class), ri,
Mockito.mock(AdminProtocol.class), ri,
new ServerName("example.org,12345,6789"),
HRegionInfo.FIRST_META_REGIONINFO);
// Set hbase.rootdir into test dir.
@ -106,7 +106,7 @@ public class TestCatalogJanitor {
Path rootdir = fs.makeQualified(new Path(this.c.get(HConstants.HBASE_DIR)));
this.c.set(HConstants.HBASE_DIR, rootdir.toString());
this.ct = Mockito.mock(CatalogTracker.class);
HRegionInterface hri = Mockito.mock(HRegionInterface.class);
AdminProtocol hri = Mockito.mock(AdminProtocol.class);
Mockito.when(this.ct.getConnection()).thenReturn(this.connection);
Mockito.when(ct.waitForMetaServerConnectionDefault()).thenReturn(hri);
}

View File

@ -23,7 +23,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventHandler.EventHandlerListener;
@ -31,7 +30,6 @@ import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

View File

@ -68,7 +68,6 @@ import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
@ -39,8 +40,8 @@ import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@ -164,9 +165,12 @@ public class TestHRegionServerBulkLoad {
public Void call() throws Exception {
LOG.debug("compacting " + location + " for row "
+ Bytes.toStringBinary(row));
HRegionInterface server = connection.getHRegionConnection(
AdminProtocol server = connection.getAdmin(
location.getHostname(), location.getPort());
server.compactRegion(location.getRegionInfo(), true);
CompactRegionRequest request =
RequestConverter.buildCompactRegionRequest(
location.getRegionInfo().getRegionName(), true);
server.compactRegion(null, request);
numCompactions.incrementAndGet();
return null;
}

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
@ -59,7 +60,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
@ -400,19 +401,19 @@ public class TestHBaseFsck {
/**
* Get region info from local cluster.
*/
Map<ServerName, List<String>> getDeployedHRIs(HBaseAdmin admin)
throws IOException {
Map<ServerName, List<String>> getDeployedHRIs(
final HBaseAdmin admin) throws IOException {
ClusterStatus status = admin.getMaster().getClusterStatus();
Collection<ServerName> regionServers = status.getServers();
Map<ServerName, List<String>> mm =
new HashMap<ServerName, List<String>>();
HConnection connection = admin.getConnection();
for (ServerName hsi : regionServers) {
HRegionInterface server =
connection.getHRegionConnection(hsi.getHostname(), hsi.getPort());
AdminProtocol server =
connection.getAdmin(hsi.getHostname(), hsi.getPort());
// list all online regions from this region server
List<HRegionInfo> regions = server.getOnlineRegions();
List<HRegionInfo> regions = ProtobufUtil.getOnlineRegions(server);
List<String> regionNames = new ArrayList<String>();
for (HRegionInfo hri : regions) {
regionNames.add(hri.getRegionNameAsString());