HBASE-7186 Split Classes for Client/Server module split.

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1425198 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
eclark 2012-12-21 23:39:30 +00:00
parent dae051d664
commit 08671a4129
53 changed files with 979 additions and 839 deletions

View File

@ -35,8 +35,7 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ColumnFamilySchema;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
@ -164,7 +163,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
/**
* Default setting for whether or not to use bloomfilters.
*/
public static final String DEFAULT_BLOOMFILTER = StoreFile.BloomType.NONE.toString();
public static final String DEFAULT_BLOOMFILTER = BloomType.NONE.toString();
/**
* Default setting for whether to cache bloom filter blocks on write if block
@ -408,7 +407,7 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
setEncodeOnDisk(encodeOnDisk);
setDataBlockEncoding(DataBlockEncoding.
valueOf(dataBlockEncoding.toUpperCase()));
setBloomFilterType(StoreFile.BloomType.
setBloomFilterType(BloomType.
valueOf(bloomFilter.toUpperCase()));
setBlocksize(blocksize);
setScope(scope);
@ -760,19 +759,19 @@ public class HColumnDescriptor implements WritableComparable<HColumnDescriptor>
/**
* @return bloom filter type used for new StoreFiles in ColumnFamily
*/
public StoreFile.BloomType getBloomFilterType() {
public BloomType getBloomFilterType() {
String n = getValue(BLOOMFILTER);
if (n == null) {
n = DEFAULT_BLOOMFILTER;
}
return StoreFile.BloomType.valueOf(n.toUpperCase());
return BloomType.valueOf(n.toUpperCase());
}
/**
* @param bt bloom filter type
* @return this (for chained invocation)
*/
public HColumnDescriptor setBloomFilterType(final StoreFile.BloomType bt) {
public HColumnDescriptor setBloomFilterType(final BloomType bt) {
return setValue(BLOOMFILTER, bt.toString());
}

View File

@ -33,16 +33,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.JenkinsHash;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.hadoop.hbase.util.Pair;
@ -560,54 +556,6 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY));
}
/**
* @return the tableDesc
* @deprecated Do not use; expensive call
* use HRegionInfo.getTableNameAsString() in place of
* HRegionInfo.getTableDesc().getNameAsString()
*/
@Deprecated
public HTableDescriptor getTableDesc() {
Configuration c = HBaseConfiguration.create();
c.set("fs.defaultFS", c.get(HConstants.HBASE_DIR));
c.set("fs.default.name", c.get(HConstants.HBASE_DIR));
FileSystem fs;
try {
fs = FileSystem.get(c);
} catch (IOException e) {
throw new RuntimeException(e);
}
FSTableDescriptors fstd =
new FSTableDescriptors(fs, new Path(c.get(HConstants.HBASE_DIR)));
try {
return fstd.get(this.tableName);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* @param newDesc new table descriptor to use
* @deprecated Do not use; expensive call
*/
@Deprecated
public void setTableDesc(HTableDescriptor newDesc) {
Configuration c = HBaseConfiguration.create();
FileSystem fs;
try {
fs = FileSystem.get(c);
} catch (IOException e) {
throw new RuntimeException(e);
}
FSTableDescriptors fstd =
new FSTableDescriptors(fs, new Path(c.get(HConstants.HBASE_DIR)));
try {
fstd.add(newDesc);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/** @return true if this is the root region */
public boolean isRootRegion() {
return Bytes.equals(tableName, HRegionInfo.ROOT_REGIONINFO.getTableName());

View File

@ -72,7 +72,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@ -242,7 +242,7 @@ public class HConnectionManager {
* @param stopProxy
* Shuts down all the proxy's put up to cluster members including to
* cluster HMaster. Calls
* {@link HBaseRPC#stopProxy(org.apache.hadoop.hbase.ipc.VersionedProtocol)}
* {@link HBaseClientRPC#stopProxy(org.apache.hadoop.hbase.ipc.VersionedProtocol)}
* .
*/
public static void deleteConnection(Configuration conf, boolean stopProxy) {
@ -709,16 +709,16 @@ public class HConnectionManager {
InetSocketAddress isa =
new InetSocketAddress(sn.getHostname(), sn.getPort());
MasterProtocol tryMaster = (MasterProtocol) HBaseRPC.getProxy(
masterProtocolState.protocolClass,
masterProtocolState.version,
isa, this.conf,this.rpcTimeout);
MasterProtocol tryMaster = (MasterProtocol) HBaseClientRPC.getProxy(
masterProtocolState.protocolClass,
masterProtocolState.version,
isa, this.conf, this.rpcTimeout);
if (tryMaster.isMasterRunning(
null, RequestConverter.buildIsMasterRunningRequest()).getIsMasterRunning()) {
return tryMaster;
} else {
HBaseRPC.stopProxy(tryMaster);
HBaseClientRPC.stopProxy(tryMaster);
String msg = "Can create a proxy to master, but it is not running";
LOG.info(msg);
throw new MasterNotRunningException(msg);
@ -1380,7 +1380,7 @@ public class HConnectionManager {
// Only create isa when we need to.
InetSocketAddress address = new InetSocketAddress(hostname, port);
// definitely a cache miss. establish an RPC for this RS
server = HBaseRPC.waitForProxy(
server = HBaseClientRPC.waitForProxy(
protocolClass, version, address, this.conf,
this.maxRPCAttempts, this.rpcTimeout, this.rpcTimeout);
protocols.put(protocol, server);
@ -1595,7 +1595,7 @@ public class HConnectionManager {
synchronized (masterAndZKLock) {
if (!isKeepAliveMasterConnectedAndRunning(protocolState)) {
if (protocolState.protocol != null) {
HBaseRPC.stopProxy(protocolState.protocol);
HBaseClientRPC.stopProxy(protocolState.protocol);
}
protocolState.protocol = null;
protocolState.protocol = createMasterWithRetries(protocolState);
@ -1671,7 +1671,7 @@ public class HConnectionManager {
private void closeMasterProtocol(MasterProtocolState protocolState) {
if (protocolState.protocol != null){
LOG.info("Closing master protocol: " + protocolState.protocolClass.getName());
HBaseRPC.stopProxy(protocolState.protocol);
HBaseClientRPC.stopProxy(protocolState.protocol);
protocolState.protocol = null;
}
protocolState.userCount = 0;
@ -2252,7 +2252,7 @@ public class HConnectionManager {
closeMaster();
for (Map<String, VersionedProtocol> i : servers.values()) {
for (VersionedProtocol server: i.values()) {
HBaseRPC.stopProxy(server);
HBaseClientRPC.stopProxy(server);
}
}
}

View File

@ -33,8 +33,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.ipc.RemoteException;
@ -113,12 +112,12 @@ public abstract class ServerCallable<T> implements Callable<T> {
}
public void beforeCall() {
HBaseRPC.setRpcTimeout(this.callTimeout);
HBaseClientRPC.setRpcTimeout(this.callTimeout);
this.startTime = System.currentTimeMillis();
}
public void afterCall() {
HBaseRPC.resetRpcTimeout();
HBaseClientRPC.resetRpcTimeout();
this.endTime = System.currentTimeMillis();
}

View File

@ -18,20 +18,17 @@
package org.apache.hadoop.hbase.filter;
import java.util.Collections;
import java.util.Set;
import java.util.TreeSet;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.mapreduce.RowCounter;
import org.apache.hadoop.hbase.protobuf.generated.FilterProtos;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Set;
import java.util.TreeSet;
/**
* The filter looks for the given columns in KeyValue. Once there is a match for

View File

@ -178,8 +178,10 @@ public class HbaseObjectWritable implements Writable, WritableWithSize, Configur
addToMap(Integer.class, code++);
addToMap(Integer[].class, code++);
addToMap(HRegion.class, code++);
addToMap(HRegion[].class, code++);
//HRegion shouldn't be pushed across the wire.
code++; //addToMap(HRegion.class, code++);
code++; //addToMap(HRegion[].class, code++);
addToMap(HRegionInfo.class, code++);
addToMap(HRegionInfo[].class, code++);
code++; // Removed

View File

@ -28,6 +28,7 @@ import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
@ -41,6 +42,7 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -106,6 +108,8 @@ public class HBaseClient {
public static final Log LOG = LogFactory
.getLog("org.apache.hadoop.ipc.HBaseClient");
protected final PoolMap<ConnectionId, Connection> connections;
private static final Map<String, Method> methodInstances =
new ConcurrentHashMap<String, Method>();
protected int counter; // counter for call ids
protected final AtomicBoolean running = new AtomicBoolean(true); // if client runs
@ -959,6 +963,24 @@ public class HBaseClient {
}
}
private Method getMethod(Class<? extends VersionedProtocol> protocol,
String methodName) {
Method method = methodInstances.get(methodName);
if (method != null) {
return method;
}
Method[] methods = protocol.getMethods();
for (Method m : methods) {
if (m.getName().equals(methodName)) {
m.setAccessible(true);
methodInstances.put(methodName, m);
return m;
}
}
return null;
}
/* Receive a response.
* Because only one receiver, so no synchronization on in.
*/
@ -990,9 +1012,9 @@ public class HBaseClient {
if (status == Status.SUCCESS) {
Message rpcResponseType;
try {
rpcResponseType = ProtobufRpcEngine.Invoker.getReturnProtoType(
ProtobufRpcEngine.Server.getMethod(remoteId.getProtocol(),
call.param.getMethodName()));
rpcResponseType = ProtobufRpcClientEngine.Invoker.getReturnProtoType(
getMethod(remoteId.getProtocol(),
call.param.getMethodName()));
} catch (Exception e) {
throw new RuntimeException(e); //local exception
}

View File

@ -0,0 +1,294 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.ReflectionUtils;
import javax.net.SocketFactory;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Proxy;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
/**
* An RPC implementation. This class provides the client side implementation.
*/
@InterfaceAudience.Private
public class HBaseClientRPC {
protected static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseClientRPC");
// cache of RpcEngines by protocol
private static final Map<Class, RpcClientEngine> PROTOCOL_ENGINES
= new HashMap<Class, RpcClientEngine>();
/**
* Configuration key for the {@link org.apache.hadoop.hbase.ipc.RpcClientEngine} implementation
* to load to handle connection protocols. Handlers for individual protocols can be
* configured using {@code "hbase.rpc.client.engine." + protocol.class.name}.
*/
public static final String RPC_ENGINE_PROP = "hbase.rpc.client.engine";
// track what RpcEngine is used by a proxy class, for stopProxy()
private static final Map<Class, RpcClientEngine> PROXY_ENGINES
= new HashMap<Class, RpcClientEngine>();
// thread-specific RPC timeout, which may override that of RpcEngine
private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
}
};
static long getProtocolVersion(Class<? extends VersionedProtocol> protocol)
throws NoSuchFieldException, IllegalAccessException {
Field versionField = protocol.getField("VERSION");
versionField.setAccessible(true);
return versionField.getLong(protocol);
}
// set a protocol to use a non-default RpcEngine
static void setProtocolEngine(Configuration conf,
Class protocol, Class engine) {
conf.setClass(RPC_ENGINE_PROP + "." + protocol.getName(), engine, RpcClientEngine.class);
}
// return the RpcEngine configured to handle a protocol
static synchronized RpcClientEngine getProtocolEngine(Class protocol,
Configuration conf) {
RpcClientEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) {
// check for a configured default engine
Class<?> defaultEngine =
conf.getClass(RPC_ENGINE_PROP, ProtobufRpcClientEngine.class);
// check for a per interface override
Class<?> impl = conf.getClass(RPC_ENGINE_PROP + "." + protocol.getName(),
defaultEngine);
LOG.debug("Using " + impl.getName() + " for " + protocol.getName());
engine = (RpcClientEngine) ReflectionUtils.newInstance(impl, conf);
if (protocol.isInterface())
PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(),
protocol),
engine);
PROTOCOL_ENGINES.put(protocol, engine);
}
return engine;
}
// return the RpcEngine that handles a proxy object
private static synchronized RpcClientEngine getProxyEngine(Object proxy) {
return PROXY_ENGINES.get(proxy.getClass());
}
/**
* @param protocol protocol interface
* @param clientVersion which client version we expect
* @param addr address of remote service
* @param conf configuration
* @param maxAttempts max attempts
* @param rpcTimeout timeout for each RPC
* @param timeout timeout in milliseconds
* @return proxy
* @throws java.io.IOException e
*/
@SuppressWarnings("unchecked")
public static VersionedProtocol waitForProxy(Class protocol,
long clientVersion,
InetSocketAddress addr,
Configuration conf,
int maxAttempts,
int rpcTimeout,
long timeout
) throws IOException {
// HBase does limited number of reconnects which is different from hadoop.
long startTime = System.currentTimeMillis();
IOException ioe;
int reconnectAttempts = 0;
while (true) {
try {
return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
} catch (SocketTimeoutException te) { // namenode is busy
LOG.info("Problem connecting to server: " + addr);
ioe = te;
} catch (IOException ioex) {
// We only handle the ConnectException.
ConnectException ce = null;
if (ioex instanceof ConnectException) {
ce = (ConnectException) ioex;
ioe = ce;
} else if (ioex.getCause() != null
&& ioex.getCause() instanceof ConnectException) {
ce = (ConnectException) ioex.getCause();
ioe = ce;
} else if (ioex.getMessage().toLowerCase()
.contains("connection refused")) {
ce = new ConnectException(ioex.getMessage());
ioe = ce;
} else {
// This is the exception we can't handle.
ioe = ioex;
}
if (ce != null) {
handleConnectionException(++reconnectAttempts, maxAttempts, protocol,
addr, ce);
}
}
// check if timed out
if (System.currentTimeMillis() - timeout >= startTime) {
throw ioe;
}
// wait for retry
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
// IGNORE
}
}
}
/**
* @param retries current retried times.
* @param maxAttmpts max attempts
* @param protocol protocol interface
* @param addr address of remote service
* @param ce ConnectException
* @throws org.apache.hadoop.hbase.client.RetriesExhaustedException
*
*/
private static void handleConnectionException(int retries,
int maxAttmpts,
Class<?> protocol,
InetSocketAddress addr,
ConnectException ce)
throws RetriesExhaustedException {
if (maxAttmpts >= 0 && retries >= maxAttmpts) {
LOG.info("Server at " + addr + " could not be reached after "
+ maxAttmpts + " tries, giving up.");
throw new RetriesExhaustedException("Failed setting up proxy " + protocol
+ " to " + addr.toString() + " after attempts=" + maxAttmpts, ce);
}
}
/**
* Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address.
*
* @param protocol interface
* @param clientVersion version we are expecting
* @param addr remote address
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout timeout for each RPC
* @return proxy
* @throws java.io.IOException e
*/
public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
long clientVersion,
InetSocketAddress addr,
Configuration conf,
SocketFactory factory,
int rpcTimeout) throws IOException {
return getProxy(protocol, clientVersion, addr,
User.getCurrent(), conf, factory, rpcTimeout);
}
/**
* Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address.
*
* @param protocol interface
* @param clientVersion version we are expecting
* @param addr remote address
* @param ticket ticket
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout timeout for each RPC
* @return proxy
* @throws java.io.IOException e
*/
public static VersionedProtocol getProxy(
Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, User ticket,
Configuration conf, SocketFactory factory, int rpcTimeout)
throws IOException {
RpcClientEngine engine = getProtocolEngine(protocol, conf);
VersionedProtocol proxy = engine
.getProxy(protocol, clientVersion, addr, ticket, conf, factory,
Math.min(rpcTimeout, getRpcTimeout()));
return proxy;
}
/**
* Construct a client-side proxy object with the default SocketFactory
*
* @param protocol interface
* @param clientVersion version we are expecting
* @param addr remote address
* @param conf configuration
* @param rpcTimeout timeout for each RPC
* @return a proxy instance
* @throws java.io.IOException e
*/
public static VersionedProtocol getProxy(
Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf,
int rpcTimeout)
throws IOException {
return getProxy(protocol, clientVersion, addr, conf, NetUtils
.getDefaultSocketFactory(conf), rpcTimeout);
}
/**
* Stop this proxy and release its invoker's resource
*
* @param proxy the proxy to be stopped
*/
public static void stopProxy(VersionedProtocol proxy) {
if (proxy != null) {
getProxyEngine(proxy).stopProxy(proxy);
}
}
public static void setRpcTimeout(int t) {
rpcTimeout.set(t);
}
public static int getRpcTimeout() {
return rpcTimeout.get();
}
public static void resetRpcTimeout() {
rpcTimeout.remove();
}
}

View File

@ -1,414 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.ReflectionUtils;
import javax.net.SocketFactory;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Proxy;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
/** A simple RPC mechanism.
*
* This is a local hbase copy of the hadoop RPC so we can do things like
* address HADOOP-414 for hbase-only and try other hbase-specific
* optimizations. Class has been renamed to avoid confusing it w/ hadoop
* versions.
* <p>
*
*
* A <i>protocol</i> is a Java interface. All parameters and return types must
* be Protobuf objects.
* All methods in the protocol should throw only IOException. No field data of
* the protocol instance is transmitted.
*/
@InterfaceAudience.Private
public class HBaseRPC {
// Leave this out in the hadoop ipc package but keep class name. Do this
// so that we dont' get the logging of this class's invocations by doing our
// blanket enabling DEBUG on the o.a.h.h. package.
protected static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseRPC");
private HBaseRPC() {
super();
} // no public ctor
/**
* Configuration key for the {@link RpcEngine} implementation to load to
* handle connection protocols. Handlers for individual protocols can be
* configured using {@code "hbase.rpc.engine." + protocol.class.name}.
*/
public static final String RPC_ENGINE_PROP = "hbase.rpc.engine";
// cache of RpcEngines by protocol
private static final Map<Class,RpcEngine> PROTOCOL_ENGINES
= new HashMap<Class,RpcEngine>();
// track what RpcEngine is used by a proxy class, for stopProxy()
private static final Map<Class,RpcEngine> PROXY_ENGINES
= new HashMap<Class,RpcEngine>();
// thread-specific RPC timeout, which may override that of RpcEngine
private static ThreadLocal<Integer> rpcTimeout = new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT;
}
};
static long getProtocolVersion(Class<? extends VersionedProtocol> protocol)
throws NoSuchFieldException, IllegalAccessException {
Field versionField = protocol.getField("VERSION");
versionField.setAccessible(true);
return versionField.getLong(protocol);
}
// set a protocol to use a non-default RpcEngine
static void setProtocolEngine(Configuration conf,
Class protocol, Class engine) {
conf.setClass(RPC_ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class);
}
// return the RpcEngine configured to handle a protocol
private static synchronized RpcEngine getProtocolEngine(Class protocol,
Configuration conf) {
RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) {
// check for a configured default engine
Class<?> defaultEngine =
conf.getClass(RPC_ENGINE_PROP, ProtobufRpcEngine.class);
// check for a per interface override
Class<?> impl = conf.getClass(RPC_ENGINE_PROP+"."+protocol.getName(),
defaultEngine);
LOG.debug("Using "+impl.getName()+" for "+protocol.getName());
engine = (RpcEngine) ReflectionUtils.newInstance(impl, conf);
if (protocol.isInterface())
PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(),
protocol),
engine);
PROTOCOL_ENGINES.put(protocol, engine);
}
return engine;
}
// return the RpcEngine that handles a proxy object
private static synchronized RpcEngine getProxyEngine(Object proxy) {
return PROXY_ENGINES.get(proxy.getClass());
}
/**
* A version mismatch for the RPC protocol.
*/
public static class VersionMismatch extends IOException {
private static final long serialVersionUID = 0;
private String interfaceName;
private long clientVersion;
private long serverVersion;
/**
* Create a version mismatch exception
* @param interfaceName the name of the protocol mismatch
* @param clientVersion the client's version of the protocol
* @param serverVersion the server's version of the protocol
*/
public VersionMismatch(String interfaceName, long clientVersion,
long serverVersion) {
super("Protocol " + interfaceName + " version mismatch. (client = " +
clientVersion + ", server = " + serverVersion + ")");
this.interfaceName = interfaceName;
this.clientVersion = clientVersion;
this.serverVersion = serverVersion;
}
/**
* Get the interface name
* @return the java class name
* (eg. org.apache.hadoop.mapred.InterTrackerProtocol)
*/
public String getInterfaceName() {
return interfaceName;
}
/**
* @return the client's preferred version
*/
public long getClientVersion() {
return clientVersion;
}
/**
* @return the server's agreed to version.
*/
public long getServerVersion() {
return serverVersion;
}
}
/**
* An error requesting an RPC protocol that the server is not serving.
*/
public static class UnknownProtocolException extends DoNotRetryIOException {
private Class<?> protocol;
public UnknownProtocolException(String mesg) {
// required for unwrapping from a RemoteException
super(mesg);
}
public UnknownProtocolException(Class<?> protocol) {
this(protocol, "Server is not handling protocol "+protocol.getName());
}
public UnknownProtocolException(Class<?> protocol, String mesg) {
super(mesg);
this.protocol = protocol;
}
public Class getProtocol() {
return protocol;
}
}
/**
* @param protocol protocol interface
* @param clientVersion which client version we expect
* @param addr address of remote service
* @param conf configuration
* @param maxAttempts max attempts
* @param rpcTimeout timeout for each RPC
* @param timeout timeout in milliseconds
* @return proxy
* @throws IOException e
*/
@SuppressWarnings("unchecked")
public static VersionedProtocol waitForProxy(Class protocol,
long clientVersion,
InetSocketAddress addr,
Configuration conf,
int maxAttempts,
int rpcTimeout,
long timeout
) throws IOException {
// HBase does limited number of reconnects which is different from hadoop.
long startTime = System.currentTimeMillis();
IOException ioe;
int reconnectAttempts = 0;
while (true) {
try {
return getProxy(protocol, clientVersion, addr, conf, rpcTimeout);
} catch(SocketTimeoutException te) { // namenode is busy
LOG.info("Problem connecting to server: " + addr);
ioe = te;
} catch (IOException ioex) {
// We only handle the ConnectException.
ConnectException ce = null;
if (ioex instanceof ConnectException) {
ce = (ConnectException) ioex;
ioe = ce;
} else if (ioex.getCause() != null
&& ioex.getCause() instanceof ConnectException) {
ce = (ConnectException) ioex.getCause();
ioe = ce;
} else if (ioex.getMessage().toLowerCase()
.contains("connection refused")) {
ce = new ConnectException(ioex.getMessage());
ioe = ce;
} else {
// This is the exception we can't handle.
ioe = ioex;
}
if (ce != null) {
handleConnectionException(++reconnectAttempts, maxAttempts, protocol,
addr, ce);
}
}
// check if timed out
if (System.currentTimeMillis() - timeout >= startTime) {
throw ioe;
}
// wait for retry
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
// IGNORE
}
}
}
/**
* @param retries current retried times.
* @param maxAttmpts max attempts
* @param protocol protocol interface
* @param addr address of remote service
* @param ce ConnectException
* @throws RetriesExhaustedException
*/
private static void handleConnectionException(int retries, int maxAttmpts,
Class<?> protocol, InetSocketAddress addr, ConnectException ce)
throws RetriesExhaustedException {
if (maxAttmpts >= 0 && retries >= maxAttmpts) {
LOG.info("Server at " + addr + " could not be reached after "
+ maxAttmpts + " tries, giving up.");
throw new RetriesExhaustedException("Failed setting up proxy " + protocol
+ " to " + addr.toString() + " after attempts=" + maxAttmpts, ce);
}
}
/**
* Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address.
*
* @param protocol interface
* @param clientVersion version we are expecting
* @param addr remote address
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout timeout for each RPC
* @return proxy
* @throws IOException e
*/
public static VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
return getProxy(protocol, clientVersion, addr,
User.getCurrent(), conf, factory, rpcTimeout);
}
/**
* Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address.
*
* @param protocol interface
* @param clientVersion version we are expecting
* @param addr remote address
* @param ticket ticket
* @param conf configuration
* @param factory socket factory
* @param rpcTimeout timeout for each RPC
* @return proxy
* @throws IOException e
*/
public static VersionedProtocol getProxy(
Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, User ticket,
Configuration conf, SocketFactory factory, int rpcTimeout)
throws IOException {
RpcEngine engine = getProtocolEngine(protocol,conf);
VersionedProtocol proxy = engine
.getProxy(protocol, clientVersion, addr, ticket, conf, factory,
Math.min(rpcTimeout, HBaseRPC.getRpcTimeout()));
return proxy;
}
/**
* Construct a client-side proxy object with the default SocketFactory
*
* @param protocol interface
* @param clientVersion version we are expecting
* @param addr remote address
* @param conf configuration
* @param rpcTimeout timeout for each RPC
* @return a proxy instance
* @throws IOException e
*/
public static VersionedProtocol getProxy(
Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr, Configuration conf,
int rpcTimeout)
throws IOException {
return getProxy(protocol, clientVersion, addr, conf, NetUtils
.getDefaultSocketFactory(conf), rpcTimeout);
}
/**
* Stop this proxy and release its invoker's resource
* @param proxy the proxy to be stopped
*/
public static void stopProxy(VersionedProtocol proxy) {
if (proxy!=null) {
getProxyEngine(proxy).stopProxy(proxy);
}
}
/**
* Construct a server for a protocol implementation instance listening on a
* port and address.
*
* @param instance instance
* @param bindAddress bind address
* @param port port to bind to
* @param numHandlers number of handlers to start
* @param verbose verbose flag
* @param conf configuration
* @return Server
* @throws IOException e
*/
public static RpcServer getServer(final Object instance,
final Class<?>[] ifaces,
final String bindAddress, final int port,
final int numHandlers,
int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel)
throws IOException {
return getServer(instance.getClass(), instance, ifaces, bindAddress, port, numHandlers, metaHandlerCount, verbose, conf, highPriorityLevel);
}
/** Construct a server for a protocol implementation instance. */
public static RpcServer getServer(Class protocol,
final Object instance,
final Class<?>[] ifaces, String bindAddress,
int port,
final int numHandlers,
int metaHandlerCount, final boolean verbose, Configuration conf, int highPriorityLevel)
throws IOException {
return getProtocolEngine(protocol, conf)
.getServer(protocol, instance, ifaces, bindAddress, port, numHandlers, metaHandlerCount, verbose, conf, highPriorityLevel);
}
public static void setRpcTimeout(int rpcTimeout) {
HBaseRPC.rpcTimeout.set(rpcTimeout);
}
public static int getRpcTimeout() {
return HBaseRPC.rpcTimeout.get();
}
public static void resetRpcTimeout() {
HBaseRPC.rpcTimeout.remove();
}
}

View File

@ -0,0 +1,173 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.Map;
/**
* A simple RPC mechanism.
*
* This is a local hbase copy of the hadoop RPC so we can do things like
* address HADOOP-414 for hbase-only and try other hbase-specific
* optimizations. Class has been renamed to avoid confusing it w/ hadoop
* versions.
* <p>
*
*
* A <i>protocol</i> is a Java interface. All parameters and return types must
* be Protobuf objects.
* All methods in the protocol should throw only IOException. No field data of
* the protocol instance is transmitted.
*
* This class provides the server side implementation.
*/
@InterfaceAudience.Private
public class HBaseServerRPC {
// Leave this out in the hadoop ipc package but keep class name. Do this
// so that we dont' get the logging of this class's invocations by doing our
// blanket enabling DEBUG on the o.a.h.h. package.
protected static final Log LOG =
LogFactory.getLog("org.apache.hadoop.ipc.HBaseServerRPC");
// cache of RpcEngines by protocol
private static final Map<Class, RpcServerEngine> PROTOCOL_ENGINES
= new HashMap<Class, RpcServerEngine>();
/**
* Configuration key for the {@link org.apache.hadoop.hbase.ipc.RpcServerEngine} implementation to
* load to handle connection protocols. Handlers for individual protocols can be
* configured using {@code "hbase.rpc.server.engine." + protocol.class.name}.
*/
public static final String RPC_ENGINE_PROP = "hbase.rpc.server.engine";
// track what RpcEngine is used by a proxy class, for stopProxy()
private static final Map<Class, RpcServerEngine> PROXY_ENGINES
= new HashMap<Class, RpcServerEngine>();
private HBaseServerRPC() {
super();
} // no public ctor
// set a protocol to use a non-default RpcEngine
static void setProtocolEngine(Configuration conf,
Class protocol, Class engine) {
conf.setClass(RPC_ENGINE_PROP + "." + protocol.getName(), engine, RpcServerEngine.class);
}
// return the RpcEngine configured to handle a protocol
static synchronized RpcServerEngine getProtocolEngine(Class protocol,
Configuration conf) {
RpcServerEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) {
// check for a configured default engine
Class<?> defaultEngine =
conf.getClass(RPC_ENGINE_PROP, ProtobufRpcServerEngine.class);
// check for a per interface override
Class<?> impl = conf.getClass(RPC_ENGINE_PROP + "." + protocol.getName(),
defaultEngine);
LOG.debug("Using " + impl.getName() + " for " + protocol.getName());
engine = (RpcServerEngine) ReflectionUtils.newInstance(impl, conf);
if (protocol.isInterface())
PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(),
protocol),
engine);
PROTOCOL_ENGINES.put(protocol, engine);
}
return engine;
}
// return the RpcEngine that handles a proxy object
private static synchronized RpcServerEngine getProxyEngine(Object proxy) {
return PROXY_ENGINES.get(proxy.getClass());
}
/**
* Construct a server for a protocol implementation instance listening on a
* port and address.
*
* @param instance instance
* @param bindAddress bind address
* @param port port to bind to
* @param numHandlers number of handlers to start
* @param verbose verbose flag
* @param conf configuration
* @return Server
* @throws IOException e
*/
public static RpcServer getServer(final Object instance,
final Class<?>[] ifaces,
final String bindAddress, final int port,
final int numHandlers,
int metaHandlerCount,
final boolean verbose,
Configuration conf,
int highPriorityLevel)
throws IOException {
return getServer(instance.getClass(),
instance,
ifaces,
bindAddress,
port,
numHandlers,
metaHandlerCount,
verbose,
conf,
highPriorityLevel);
}
/**
* Construct a server for a protocol implementation instance.
*/
public static RpcServer getServer(Class protocol,
final Object instance,
final Class<?>[] ifaces,
String bindAddress,
int port,
final int numHandlers,
int metaHandlerCount,
final boolean verbose,
Configuration conf,
int highPriorityLevel)
throws IOException {
return getProtocolEngine(protocol, conf)
.getServer(protocol,
instance,
ifaces,
bindAddress,
port,
numHandlers,
metaHandlerCount,
verbose,
conf,
highPriorityLevel);
}
}

View File

@ -0,0 +1,194 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.ipc.RemoteException;
import javax.net.SocketFactory;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class ProtobufRpcClientEngine implements RpcClientEngine {
private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine");
ProtobufRpcClientEngine() {
super();
}
protected final static ClientCache CLIENTS = new ClientCache();
@Override
public VersionedProtocol getProxy(
Class<? extends VersionedProtocol> protocol, long clientVersion,
InetSocketAddress addr, User ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
rpcTimeout);
return (VersionedProtocol) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker);
}
@Override
public void stopProxy(VersionedProtocol proxy) {
if (proxy!=null) {
((Invoker)Proxy.getInvocationHandler(proxy)).close();
}
}
static class Invoker implements InvocationHandler {
private static final Map<String, Message> returnTypes =
new ConcurrentHashMap<String, Message>();
private Class<? extends VersionedProtocol> protocol;
private InetSocketAddress address;
private User ticket;
private HBaseClient client;
private boolean isClosed = false;
final private int rpcTimeout;
private final long clientProtocolVersion;
public Invoker(Class<? extends VersionedProtocol> protocol,
InetSocketAddress addr, User ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
this.protocol = protocol;
this.address = addr;
this.ticket = ticket;
this.client = CLIENTS.getClient(conf, factory);
this.rpcTimeout = rpcTimeout;
Long version = Invocation.PROTOCOL_VERSION.get(protocol);
if (version != null) {
this.clientProtocolVersion = version;
} else {
try {
this.clientProtocolVersion = HBaseClientRPC.getProtocolVersion(protocol);
} catch (NoSuchFieldException e) {
throw new RuntimeException("Exception encountered during " +
protocol, e);
} catch (IllegalAccessException e) {
throw new RuntimeException("Exception encountered during " +
protocol, e);
}
}
}
private RpcRequestBody constructRpcRequest(Method method,
Object[] params) throws ServiceException {
RpcRequestBody rpcRequest;
RpcRequestBody.Builder builder = RpcRequestBody.newBuilder();
builder.setMethodName(method.getName());
Message param;
int length = params.length;
if (length == 2) {
// RpcController + Message in the method args
// (generated code from RPC bits in .proto files have RpcController)
param = (Message)params[1];
} else if (length == 1) { // Message
param = (Message)params[0];
} else {
throw new ServiceException("Too many parameters for request. Method: ["
+ method.getName() + "]" + ", Expected: 2, Actual: "
+ params.length);
}
builder.setRequestClassName(param.getClass().getName());
builder.setRequest(param.toByteString());
builder.setClientProtocolVersion(clientProtocolVersion);
rpcRequest = builder.build();
return rpcRequest;
}
/**
* This is the client side invoker of RPC method. It only throws
* ServiceException, since the invocation proxy expects only
* ServiceException to be thrown by the method in case protobuf service.
*
* ServiceException has the following causes:
* <ol>
* <li>Exceptions encountered on the client side in this method are
* set as cause in ServiceException as is.</li>
* <li>Exceptions from the server are wrapped in RemoteException and are
* set as cause in ServiceException</li>
* </ol>
*
* Note that the client calling protobuf RPC methods, must handle
* ServiceException by getting the cause from the ServiceException. If the
* cause is RemoteException, then unwrap it to get the exception thrown by
* the server.
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws ServiceException {
long startTime = 0;
if (LOG.isDebugEnabled()) {
startTime = System.currentTimeMillis();
}
RpcRequestBody rpcRequest = constructRpcRequest(method, args);
Message val = null;
try {
val = client.call(rpcRequest, address, protocol, ticket, rpcTimeout);
if (LOG.isDebugEnabled()) {
long callTime = System.currentTimeMillis() - startTime;
if (LOG.isTraceEnabled()) LOG.trace("Call: " + method.getName() + " " + callTime);
}
return val;
} catch (Throwable e) {
if (e instanceof RemoteException) {
Throwable cause = ((RemoteException)e).unwrapRemoteException();
throw new ServiceException(cause);
}
throw new ServiceException(e);
}
}
synchronized protected void close() {
if (!isClosed) {
isClosed = true;
CLIENTS.stopClient(client);
}
}
static Message getReturnProtoType(Method method) throws Exception {
if (returnTypes.containsKey(method.getName())) {
return returnTypes.get(method.getName());
}
Class<?> returnType = method.getReturnType();
Method newInstMethod = returnType.getMethod("getDefaultInstance");
newInstMethod.setAccessible(true);
Message protoType = (Message) newInstMethod.invoke(null, (Object[]) null);
returnTypes.put(method.getName(), protoType);
return protoType;
}
}
}

View File

@ -19,17 +19,12 @@
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -43,7 +38,6 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.token.AuthenticationTokenSecretManager;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.codehaus.jackson.map.ObjectMapper;
@ -51,165 +45,26 @@ import org.codehaus.jackson.map.ObjectMapper;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
/**
* The {@link RpcEngine} implementation for ProtoBuf-based RPCs.
* The {@link RpcServerEngine} implementation for ProtoBuf-based RPCs.
*/
@InterfaceAudience.Private
class ProtobufRpcEngine implements RpcEngine {
class ProtobufRpcServerEngine implements RpcServerEngine {
private static final Log LOG =
LogFactory.getLog("org.apache.hadoop.hbase.ipc.ProtobufRpcEngine");
protected final static ClientCache CLIENTS = new ClientCache();
@Override
public VersionedProtocol getProxy(
Class<? extends VersionedProtocol> protocol, long clientVersion,
InetSocketAddress addr, User ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
rpcTimeout);
return (VersionedProtocol)Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker);
}
LogFactory.getLog("org.apache.hadoop.hbase.ipc.ProtobufRpcServerEngine");
@Override
public void stopProxy(VersionedProtocol proxy) {
if (proxy!=null) {
((Invoker)Proxy.getInvocationHandler(proxy)).close();
}
ProtobufRpcServerEngine() {
super();
}
@Override
public Server getServer(Class<? extends VersionedProtocol> protocol,
Object instance, Class<?>[] ifaces, String bindAddress, int port,
int numHandlers, int metaHandlerCount, boolean verbose,
Configuration conf, int highPriorityLevel) throws IOException {
Object instance, Class<?>[] ifaces, String bindAddress, int port,
int numHandlers, int metaHandlerCount, boolean verbose,
Configuration conf, int highPriorityLevel) throws IOException {
return new Server(instance, ifaces, conf, bindAddress, port, numHandlers,
metaHandlerCount, verbose, highPriorityLevel);
}
static class Invoker implements InvocationHandler {
private static final Map<String, Message> returnTypes =
new ConcurrentHashMap<String, Message>();
private Class<? extends VersionedProtocol> protocol;
private InetSocketAddress address;
private User ticket;
private HBaseClient client;
private boolean isClosed = false;
final private int rpcTimeout;
private final long clientProtocolVersion;
public Invoker(Class<? extends VersionedProtocol> protocol,
InetSocketAddress addr, User ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
this.protocol = protocol;
this.address = addr;
this.ticket = ticket;
this.client = CLIENTS.getClient(conf, factory);
this.rpcTimeout = rpcTimeout;
Long version = Invocation.PROTOCOL_VERSION.get(protocol);
if (version != null) {
this.clientProtocolVersion = version;
} else {
try {
this.clientProtocolVersion = HBaseRPC.getProtocolVersion(protocol);
} catch (NoSuchFieldException e) {
throw new RuntimeException("Exception encountered during " +
protocol, e);
} catch (IllegalAccessException e) {
throw new RuntimeException("Exception encountered during " +
protocol, e);
}
}
}
private RpcRequestBody constructRpcRequest(Method method,
Object[] params) throws ServiceException {
RpcRequestBody rpcRequest;
RpcRequestBody.Builder builder = RpcRequestBody.newBuilder();
builder.setMethodName(method.getName());
Message param;
int length = params.length;
if (length == 2) {
// RpcController + Message in the method args
// (generated code from RPC bits in .proto files have RpcController)
param = (Message)params[1];
} else if (length == 1) { // Message
param = (Message)params[0];
} else {
throw new ServiceException("Too many parameters for request. Method: ["
+ method.getName() + "]" + ", Expected: 2, Actual: "
+ params.length);
}
builder.setRequestClassName(param.getClass().getName());
builder.setRequest(param.toByteString());
builder.setClientProtocolVersion(clientProtocolVersion);
rpcRequest = builder.build();
return rpcRequest;
}
/**
* This is the client side invoker of RPC method. It only throws
* ServiceException, since the invocation proxy expects only
* ServiceException to be thrown by the method in case protobuf service.
*
* ServiceException has the following causes:
* <ol>
* <li>Exceptions encountered on the client side in this method are
* set as cause in ServiceException as is.</li>
* <li>Exceptions from the server are wrapped in RemoteException and are
* set as cause in ServiceException</li>
* </ol>
*
* Note that the client calling protobuf RPC methods, must handle
* ServiceException by getting the cause from the ServiceException. If the
* cause is RemoteException, then unwrap it to get the exception thrown by
* the server.
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws ServiceException {
long startTime = 0;
if (LOG.isDebugEnabled()) {
startTime = System.currentTimeMillis();
}
RpcRequestBody rpcRequest = constructRpcRequest(method, args);
Message val = null;
try {
val = client.call(rpcRequest, address, protocol, ticket, rpcTimeout);
if (LOG.isDebugEnabled()) {
long callTime = System.currentTimeMillis() - startTime;
if (LOG.isTraceEnabled()) LOG.trace("Call: " + method.getName() + " " + callTime);
}
return val;
} catch (Throwable e) {
if (e instanceof RemoteException) {
Throwable cause = ((RemoteException)e).unwrapRemoteException();
throw new ServiceException(cause);
}
throw new ServiceException(e);
}
}
synchronized protected void close() {
if (!isClosed) {
isClosed = true;
CLIENTS.stopClient(client);
}
}
static Message getReturnProtoType(Method method) throws Exception {
if (returnTypes.containsKey(method.getName())) {
return returnTypes.get(method.getName());
}
Class<?> returnType = method.getReturnType();
Method newInstMethod = returnType.getMethod("getDefaultInstance");
newInstMethod.setAccessible(true);
Message protoType = (Message) newInstMethod.invoke(null, (Object[]) null);
returnTypes.put(method.getName(), protoType);
return protoType;
}
}
public static class Server extends HBaseServer {
boolean verbose;
@ -239,9 +94,9 @@ class ProtobufRpcEngine implements RpcEngine {
}
public Server(Object instance, final Class<?>[] ifaces,
Configuration conf, String bindAddress, int port,
int numHandlers, int metaHandlerCount, boolean verbose,
int highPriorityLevel)
Configuration conf, String bindAddress, int port,
int numHandlers, int metaHandlerCount, boolean verbose,
int highPriorityLevel)
throws IOException {
super(bindAddress, port, numHandlers, metaHandlerCount,
conf, classNameBase(instance.getClass().getName()),
@ -300,13 +155,13 @@ class ProtobufRpcEngine implements RpcEngine {
* exception name and the stack trace are returned in the protobuf response.
*/
public Message call(Class<? extends VersionedProtocol> protocol,
RpcRequestBody rpcRequest, long receiveTime, MonitoredRPCHandler status)
RpcRequestBody rpcRequest, long receiveTime, MonitoredRPCHandler status)
throws IOException {
try {
String methodName = rpcRequest.getMethodName();
Method method = getMethod(protocol, methodName);
if (method == null) {
throw new HBaseRPC.UnknownProtocolException("Method " + methodName +
throw new UnknownProtocolException("Method " + methodName +
" doesn't exist in protocol " + protocol.getName());
}
@ -342,7 +197,7 @@ class ProtobufRpcEngine implements RpcEngine {
if (protocol.isAssignableFrom(this.implementation)) {
impl = this.instance;
} else {
throw new HBaseRPC.UnknownProtocolException(protocol);
throw new UnknownProtocolException(protocol);
}
long startTime = System.currentTimeMillis();
@ -418,7 +273,7 @@ class ProtobufRpcEngine implements RpcEngine {
}
static Method getMethod(Class<? extends VersionedProtocol> protocol,
String methodName) {
String methodName) {
Method method = methodInstances.get(methodName);
if (method != null) {
return method;
@ -473,10 +328,10 @@ class ProtobufRpcEngine implements RpcEngine {
* prior to being initiated, in ms.
* @param responseSize The size in bytes of the response buffer.
*/
void logResponse(Object[] params, String methodName, String call, String tag,
String clientAddress, long startTime, int processingTime, int qTime,
long responseSize)
throws IOException {
void logResponse(Object[] params, String methodName, String call, String tag,
String clientAddress, long startTime, int processingTime, int qTime,
long responseSize)
throws IOException {
// for JSON encoding
ObjectMapper mapper = new ObjectMapper();
// base information that is reported regardless of type of call
@ -494,7 +349,7 @@ class ProtobufRpcEngine implements RpcEngine {
// if the slow process is a query, we want to log its table as well
// as its own fingerprint
byte [] tableName =
HRegionInfo.parseRegionName((byte[]) params[0])[0];
HRegionInfo.parseRegionName((byte[]) params[0])[0];
responseInfo.put("table", Bytes.toStringBinary(tableName));
// annotate the response map with operation details
responseInfo.putAll(((Operation) params[1]).toMap());

View File

@ -0,0 +1,42 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.security.User;
import javax.net.SocketFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
/** An RPC implementation for the client */
@InterfaceAudience.Private
public interface RpcClientEngine {
/** Construct a client-side proxy object. */
VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr,
User ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException;
/** Stop this proxy. */
void stopProxy(VersionedProtocol proxy);
}

View File

@ -18,27 +18,14 @@
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import javax.net.SocketFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.security.User;
/** An RPC implementation. */
import java.io.IOException;
/** An RPC implementation for the server. */
@InterfaceAudience.Private
interface RpcEngine {
/** Construct a client-side proxy object. */
VersionedProtocol getProxy(Class<? extends VersionedProtocol> protocol,
long clientVersion, InetSocketAddress addr,
User ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException;
/** Stop this proxy. */
void stopProxy(VersionedProtocol proxy);
interface RpcServerEngine {
/** Construct a server for a protocol implementation instance. */
RpcServer getServer(Class<? extends VersionedProtocol> protocol, Object instance,

View File

@ -0,0 +1,47 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import org.apache.hadoop.hbase.DoNotRetryIOException;
/**
* An error requesting an RPC protocol that the server is not serving.
*/
public class UnknownProtocolException extends DoNotRetryIOException {
private Class<?> protocol;
public UnknownProtocolException(String mesg) {
// required for unwrapping from a RemoteException
super(mesg);
}
public UnknownProtocolException(Class<?> protocol) {
this(protocol, "Server is not handling protocol "+protocol.getName());
}
public UnknownProtocolException(Class<?> protocol, String mesg) {
super(mesg);
this.protocol = protocol;
}
public Class getProtocol() {
return protocol;
}
}

View File

@ -71,9 +71,9 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.Tool;

View File

@ -83,11 +83,13 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.HBaseServerRPC;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.ipc.UnknownProtocolException;
import org.apache.hadoop.hbase.master.balancer.BalancerChore;
import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
@ -167,7 +169,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
@ -351,9 +352,9 @@ Server {
}
int numHandlers = conf.getInt("hbase.master.handler.count",
conf.getInt("hbase.regionserver.handler.count", 25));
this.rpcServer = HBaseRPC.getServer(MasterMonitorProtocol.class, this,
new Class<?>[]{MasterMonitorProtocol.class,
MasterAdminProtocol.class, RegionServerStatusProtocol.class},
this.rpcServer = HBaseServerRPC.getServer(MasterMonitorProtocol.class, this,
new Class<?>[]{MasterMonitorProtocol.class,
MasterAdminProtocol.class, RegionServerStatusProtocol.class},
initialIsa.getHostName(), // BindAddress is IP we got for this server.
initialIsa.getPort(),
numHandlers,
@ -2347,7 +2348,7 @@ Server {
String serviceName = call.getServiceName();
String methodName = call.getMethodName();
if (!coprocessorServiceHandlers.containsKey(serviceName)) {
throw new HBaseRPC.UnknownProtocolException(null,
throw new UnknownProtocolException(null,
"No registered master coprocessor service found for name "+serviceName);
}
@ -2355,7 +2356,7 @@ Server {
Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
if (methodDesc == null) {
throw new HBaseRPC.UnknownProtocolException(service.getClass(),
throw new UnknownProtocolException(service.getClass(),
"Unknown method "+methodName+" called on master service "+serviceName);
}

View File

@ -0,0 +1,35 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
public enum BloomType {
/**
* Bloomfilters disabled
*/
NONE,
/**
* Bloom enabled with Table row as Key
*/
ROW,
/**
* Bloom enabled with Table row & column (family+qualifier) as Key
*/
ROWCOL
}

View File

@ -106,9 +106,9 @@ import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.UnknownProtocolException;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
@ -4978,7 +4978,7 @@ public class HRegion implements HeapSize { // , Writable{
String serviceName = call.getServiceName();
String methodName = call.getMethodName();
if (!coprocessorServiceHandlers.containsKey(serviceName)) {
throw new HBaseRPC.UnknownProtocolException(null,
throw new UnknownProtocolException(null,
"No registered coprocessor service found for name "+serviceName+
" in region "+Bytes.toStringBinary(getRegionName()));
}
@ -4987,7 +4987,7 @@ public class HRegion implements HeapSize { // , Writable{
Descriptors.ServiceDescriptor serviceDesc = service.getDescriptorForType();
Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
if (methodDesc == null) {
throw new HBaseRPC.UnknownProtocolException(service.getClass(),
throw new UnknownProtocolException(service.getClass(),
"Unknown method "+methodName+" called on service "+serviceName+
" in region "+Bytes.toStringBinary(getRegionName()));
}

View File

@ -101,8 +101,11 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.HBaseServerRPC;
import org.apache.hadoop.hbase.ipc.MetricsHBaseServer;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
@ -489,10 +492,10 @@ public class HRegionServer implements ClientProtocol,
throw new IllegalArgumentException("Failed resolve of " + initialIsa);
}
this.rpcServer = HBaseRPC.getServer(AdminProtocol.class, this,
new Class<?>[]{ClientProtocol.class,
AdminProtocol.class, HBaseRPCErrorHandler.class,
OnlineRegions.class},
this.rpcServer = HBaseServerRPC.getServer(AdminProtocol.class, this,
new Class<?>[]{ClientProtocol.class,
AdminProtocol.class, HBaseRPCErrorHandler.class,
OnlineRegions.class},
initialIsa.getHostName(), // BindAddress is IP we got for this server.
initialIsa.getPort(),
conf.getInt("hbase.regionserver.handler.count", 10),
@ -965,7 +968,7 @@ public class HRegionServer implements ClientProtocol,
// Make sure the proxy is down.
if (this.hbaseMaster != null) {
HBaseRPC.stopProxy(this.hbaseMaster);
HBaseClientRPC.stopProxy(this.hbaseMaster);
this.hbaseMaster = null;
}
this.leases.close();
@ -1773,7 +1776,7 @@ public class HRegionServer implements ClientProtocol,
try {
// Do initial RPC setup. The final argument indicates that the RPC
// should retry indefinitely.
master = (RegionServerStatusProtocol) HBaseRPC.waitForProxy(
master = (RegionServerStatusProtocol) HBaseClientRPC.waitForProxy(
RegionServerStatusProtocol.class, RegionServerStatusProtocol.VERSION,
isa, this.conf, -1,
this.rpcTimeout, this.rpcTimeout);

View File

@ -92,21 +92,6 @@ import com.google.common.collect.Ordering;
public class StoreFile {
static final Log LOG = LogFactory.getLog(StoreFile.class.getName());
public static enum BloomType {
/**
* Bloomfilters disabled
*/
NONE,
/**
* Bloom enabled with Table row as Key
*/
ROW,
/**
* Bloom enabled with Table row & column (family+qualifier) as Key
*/
ROWCOL
}
// Keys for fileinfo values in HFile
/** Max Sequence ID in FileInfo */

View File

@ -279,7 +279,7 @@ public class StoreFileScanner implements KeyValueScanner {
boolean haveToSeek = true;
if (useBloom) {
// check ROWCOL Bloom filter first.
if (reader.getBloomFilterType() == StoreFile.BloomType.ROWCOL) {
if (reader.getBloomFilterType() == BloomType.ROWCOL) {
haveToSeek = reader.passesGeneralBloomFilter(kv.getBuffer(),
kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(),
kv.getQualifierOffset(), kv.getQualifierLength());

View File

@ -40,9 +40,9 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HbaseObjectWritable;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.QualifierFilter;
@ -98,7 +98,7 @@ public class AccessControlLists {
new HColumnDescriptor(ACL_LIST_FAMILY,
10, // Ten is arbitrary number. Keep versions to help debugging.
Compression.Algorithm.NONE.getName(), true, true, 8 * 1024,
HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
HConstants.FOREVER, BloomType.NONE.toString(),
HConstants.REPLICATION_SCOPE_LOCAL));
}

View File

@ -29,8 +29,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
import org.apache.hadoop.hbase.thrift.generated.TCell;
@ -54,7 +53,7 @@ public class ThriftUtilities {
throws IllegalArgument {
Compression.Algorithm comp =
Compression.getCompressionAlgorithmByName(in.compression.toLowerCase());
StoreFile.BloomType bt =
BloomType bt =
BloomType.valueOf(in.bloomFilterType);
if (in.name == null || !in.name.hasRemaining()) {

View File

@ -30,7 +30,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.regionserver.BloomType;
/**
* Handles Bloom filter initialization based on configuration and serialized

View File

@ -521,11 +521,19 @@
block is finished.
</description>
</property>
<property>
<name>hbase.rpc.engine</name>
<value>org.apache.hadoop.hbase.ipc.ProtobufRpcEngine</value>
<description>Implementation of org.apache.hadoop.hbase.ipc.RpcEngine to be
used for client / server RPC call marshalling.
<name>hbase.rpc.client.engine</name>
<value>org.apache.hadoop.hbase.ipc.ProtobufRpcClientEngine</value>
<description>Implementation of org.apache.hadoop.hbase.ipc.RpcClientEngine to be
used for client RPC call marshalling.
</description>
</property>
<property>
<name>hbase.rpc.server.engine</name>
<value>org.apache.hadoop.hbase.ipc.ProtobufRpcServerEngine</value>
<description>Implementation of org.apache.hadoop.hbase.ipc.RpcServerEngine to be
used for server RPC call marshalling.
</description>
</property>
<property>

View File

@ -70,12 +70,12 @@ import org.apache.hadoop.hbase.io.hfile.ChecksumUtil;
import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
@ -188,7 +188,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
List<Object[]> configurations = new ArrayList<Object[]>();
for (Compression.Algorithm comprAlgo :
HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
for (StoreFile.BloomType bloomType : StoreFile.BloomType.values()) {
for (BloomType bloomType : BloomType.values()) {
configurations.add(new Object[] { comprAlgo, bloomType });
}
}

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.junit.experimental.categories.Category;
@ -48,7 +49,7 @@ public class TestHColumnDescriptor {
boolean inmemory = hcd.isInMemory();
hcd.setScope(v);
hcd.setDataBlockEncoding(DataBlockEncoding.FAST_DIFF);
hcd.setBloomFilterType(StoreFile.BloomType.ROW);
hcd.setBloomFilterType(BloomType.ROW);
hcd.setCompressionType(Algorithm.SNAPPY);
@ -65,7 +66,7 @@ public class TestHColumnDescriptor {
assertEquals(hcd.getScope(), deserializedHcd.getScope());
assertTrue(deserializedHcd.getCompressionType().equals(Compression.Algorithm.SNAPPY));
assertTrue(deserializedHcd.getDataBlockEncoding().equals(DataBlockEncoding.FAST_DIFF));
assertTrue(deserializedHcd.getBloomFilterType().equals(StoreFile.BloomType.ROW));
assertTrue(deserializedHcd.getBloomFilterType().equals(BloomType.ROW));
}
@Test

View File

@ -479,8 +479,6 @@ public class TestHbaseObjectWritable extends TestCase {
assertEquals(18,HbaseObjectWritable.getClassCode(HColumnDescriptor.class).intValue());
assertEquals(19,HbaseObjectWritable.getClassCode(HConstants.Modify.class).intValue());
// 20 and 21 are place holders for HMsg
assertEquals(22,HbaseObjectWritable.getClassCode(HRegion.class).intValue());
assertEquals(23,HbaseObjectWritable.getClassCode(HRegion[].class).intValue());
assertEquals(24,HbaseObjectWritable.getClassCode(HRegionInfo.class).intValue());
assertEquals(25,HbaseObjectWritable.getClassCode(HRegionInfo[].class).intValue());
// Intentional hole... these objects have been removed.

View File

@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
@ -88,7 +88,7 @@ public class TestCacheOnWrite {
private static final int NUM_KV = 25000;
private static final int INDEX_BLOCK_SIZE = 512;
private static final int BLOOM_BLOCK_SIZE = 4096;
private static final BloomType BLOOM_TYPE = StoreFile.BloomType.ROWCOL;
private static final BloomType BLOOM_TYPE = BloomType.ROWCOL;
private static final ChecksumType CKTYPE = ChecksumType.CRC32;
private static final int CKBYTES = 512;

View File

@ -34,8 +34,8 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.hfile.BlockType.BlockCategory;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -43,7 +43,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
/**W
/**
* Make sure we always cache important block types, such as index blocks, as
* long as we have a block cache, even though block caching might be disabled
* for the column family.

View File

@ -41,7 +41,7 @@ import com.google.protobuf.ServiceException;
* Make sure to call setProtocolEngine to have the client actually use the RpcEngine
* for a specific protocol
*/
public class RandomTimeoutRpcEngine extends ProtobufRpcEngine {
public class RandomTimeoutRpcEngine extends ProtobufRpcClientEngine {
private static final Random RANDOM = new Random(System.currentTimeMillis());
public static double chanceOfTimeout = 0.3;
@ -67,7 +67,7 @@ public class RandomTimeoutRpcEngine extends ProtobufRpcEngine {
* Call this in order to set this class to run as the RpcEngine for the given protocol
*/
public static void setProtocolEngine(Configuration conf, Class protocol) {
HBaseRPC.setProtocolEngine(conf, protocol, RandomTimeoutRpcEngine.class);
HBaseClientRPC.setProtocolEngine(conf, protocol, RandomTimeoutRpcEngine.class);
}
/**

View File

@ -70,12 +70,12 @@ public class TestDelayedRpc {
Configuration conf = HBaseConfiguration.create();
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
rpcServer = HBaseRPC.getServer(new TestRpcImpl(delayReturnValue),
rpcServer = HBaseServerRPC.getServer(new TestRpcImpl(delayReturnValue),
new Class<?>[]{ TestRpcImpl.class },
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start();
TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0,
TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class, 0,
rpcServer.getListenerAddress(), conf, 1000);
List<Integer> results = new ArrayList<Integer>();
@ -133,11 +133,11 @@ public class TestDelayedRpc {
log.setLevel(Level.WARN);
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
rpcServer = HBaseRPC.getServer(new TestRpcImpl(true),
rpcServer = HBaseServerRPC.getServer(new TestRpcImpl(true),
new Class<?>[]{ TestRpcImpl.class },
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start();
TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0,
TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class, 0,
rpcServer.getListenerAddress(), conf, 1000);
Thread threads[] = new Thread[MAX_DELAYED_RPC + 1];
@ -264,12 +264,12 @@ public class TestDelayedRpc {
Configuration conf = HBaseConfiguration.create();
InetSocketAddress isa = new InetSocketAddress("localhost", 0);
rpcServer = HBaseRPC.getServer(new FaultyTestRpc(),
rpcServer = HBaseServerRPC.getServer(new FaultyTestRpc(),
new Class<?>[]{ TestRpcImpl.class },
isa.getHostName(), isa.getPort(), 1, 0, true, conf, 0);
rpcServer.start();
TestRpc client = (TestRpc) HBaseRPC.getProxy(TestRpc.class, 0,
TestRpc client = (TestRpc) HBaseClientRPC.getProxy(TestRpc.class, 0,
rpcServer.getListenerAddress(), conf, 1000);
int result = 0xDEADBEEF;

View File

@ -96,12 +96,13 @@ public class TestProtoBufRpc {
public void setUp() throws IOException { // Setup server for both protocols
conf = new Configuration();
// Set RPC engine to protobuf RPC engine
HBaseRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class);
HBaseClientRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcClientEngine.class);
HBaseServerRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcServerEngine.class);
// Create server side implementation
PBServerImpl serverImpl = new PBServerImpl();
// Get RPC server for server side implementation
server = HBaseRPC.getServer(TestRpcService.class,serverImpl,
server = HBaseServerRPC.getServer(TestRpcService.class,serverImpl,
new Class[]{TestRpcService.class},
ADDRESS, PORT, 10, 10, true, conf, 0);
addr = server.getListenerAddress();
@ -116,9 +117,10 @@ public class TestProtoBufRpc {
private static TestRpcService getClient() throws IOException {
// Set RPC engine to protobuf RPC engine
HBaseRPC.setProtocolEngine(conf, TestRpcService.class,
ProtobufRpcEngine.class);
return (TestRpcService) HBaseRPC.getProxy(TestRpcService.class, 0,
HBaseClientRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcClientEngine.class);
HBaseServerRPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcServerEngine.class);
return (TestRpcService) HBaseClientRPC.getProxy(TestRpcService.class, 0,
addr, conf, 10000);
}

View File

@ -37,8 +37,8 @@ import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.*;
import org.junit.experimental.categories.Category;

View File

@ -27,7 +27,7 @@ import java.net.SocketTimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
import org.apache.hadoop.hbase.MasterMonitorProtocol;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
@ -55,8 +55,8 @@ public class TestHMasterRPCException {
//try to connect too soon. Retry on SocketTimeoutException.
while (i < 20) {
try {
MasterMonitorProtocol inf = (MasterMonitorProtocol) HBaseRPC.getProxy(
MasterMonitorProtocol.class, MasterMonitorProtocol.VERSION, isa, conf, 100 * 10);
MasterMonitorProtocol inf = (MasterMonitorProtocol) HBaseClientRPC.getProxy(
MasterMonitorProtocol.class, MasterMonitorProtocol.VERSION, isa, conf, 100 * 10);
inf.isMasterRunning(null, IsMasterRunningRequest.getDefaultInstance());
fail();
} catch (ServiceException ex) {

View File

@ -101,7 +101,7 @@ public class CreateRandomStoreFile {
+ Arrays.toString(Compression.Algorithm.values()));
options.addOption(BLOOM_FILTER_OPTION, "bloom_filter", true,
"Bloom filter type, one of "
+ Arrays.toString(StoreFile.BloomType.values()));
+ Arrays.toString(BloomType.values()));
options.addOption(BLOCK_SIZE_OPTION, "block_size", true,
"HFile block size");
options.addOption(BLOOM_BLOCK_SIZE_OPTION, "bloom_block_size", true,
@ -162,9 +162,9 @@ public class CreateRandomStoreFile {
cmdLine.getOptionValue(COMPRESSION_OPTION));
}
StoreFile.BloomType bloomType = StoreFile.BloomType.NONE;
BloomType bloomType = BloomType.NONE;
if (cmdLine.hasOption(BLOOM_FILTER_OPTION)) {
bloomType = StoreFile.BloomType.valueOf(cmdLine.getOptionValue(
bloomType = BloomType.valueOf(cmdLine.getOptionValue(
BLOOM_FILTER_OPTION));
}

View File

@ -367,8 +367,6 @@ public class DataBlockEncodingTool {
/**
* Check decompress performance of a given algorithm and print it.
* @param algorithm Compression algorithm.
* @param compressorCodec Compressor to be tested.
* @param decompressorCodec Decompressor of the same algorithm.
* @param name Name of algorithm.
* @param buffer Buffer to be compressed.
* @param offset Position of the beginning of the data.
@ -584,7 +582,7 @@ public class DataBlockEncodingTool {
CacheConfig cacheConf = new CacheConfig(conf);
FileSystem fs = FileSystem.get(conf);
StoreFile hsf = new StoreFile(fs, path, conf, cacheConf,
StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
StoreFile.Reader reader = hsf.createReader();
reader.loadFileInfo();

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
/**
* Test seek performance for encoded data blocks. Read an HFile and do several

View File

@ -146,7 +146,7 @@ public class HFileReadWriteTest {
private HFileDataBlockEncoder dataBlockEncoder =
NoOpDataBlockEncoder.INSTANCE;
private StoreFile.BloomType bloomType = StoreFile.BloomType.NONE;
private BloomType bloomType = BloomType.NONE;
private int blockSize;
private Compression.Algorithm compression = Compression.Algorithm.NONE;
@ -178,7 +178,7 @@ public class HFileReadWriteTest {
+ Arrays.toString(Compression.Algorithm.values()) +
Workload.MERGE.onlyUsedFor());
options.addOption(BLOOM_FILTER_OPTION, true, "Bloom filter type, one of "
+ Arrays.toString(StoreFile.BloomType.values()) +
+ Arrays.toString(BloomType.values()) +
Workload.MERGE.onlyUsedFor());
options.addOption(BLOCK_SIZE_OPTION, true, "HFile block size" +
Workload.MERGE.onlyUsedFor());
@ -239,7 +239,7 @@ public class HFileReadWriteTest {
}
if (cmdLine.hasOption(BLOOM_FILTER_OPTION)) {
bloomType = StoreFile.BloomType.valueOf(cmdLine.getOptionValue(
bloomType = BloomType.valueOf(cmdLine.getOptionValue(
BLOOM_FILTER_OPTION));
}
@ -468,7 +468,7 @@ public class HFileReadWriteTest {
// We are passing the ROWCOL Bloom filter type, but StoreFile will still
// use the Bloom filter type specified in the HFile.
return new StoreFile(fs, filePath, conf, cacheConf,
StoreFile.BloomType.ROWCOL, dataBlockEncoder);
BloomType.ROWCOL, dataBlockEncoder);
}
public static int charToHex(int c) {

View File

@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.junit.Test;
@ -89,7 +88,7 @@ public class TestBlocksRead extends HBaseTestCase {
* @param tableName
* @param callingMethod
* @param conf
* @param families
* @param family
* @throws IOException
* @return created and initialized region.
*/

View File

@ -34,7 +34,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
@ -49,7 +48,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileBlock;
import org.apache.hadoop.hbase.io.hfile.HFileReaderV2;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.TestHFileWriterV2;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.util.Bytes;

View File

@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.TestHFileWriterV2;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.ByteBloomFilter;
import org.apache.hadoop.hbase.util.Bytes;

View File

@ -44,7 +44,6 @@ import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Assume;
import org.junit.Test;
@ -81,7 +80,7 @@ public class TestFSErrorsExposed {
writer, Bytes.toBytes("cf"), Bytes.toBytes("qual"));
StoreFile sf = new StoreFile(fs, writer.getPath(),
util.getConfiguration(), cacheConf, StoreFile.BloomType.NONE,
util.getConfiguration(), cacheConf, BloomType.NONE,
NoOpDataBlockEncoder.INSTANCE);
StoreFile.Reader reader = sf.createReader();

View File

@ -75,7 +75,6 @@ import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;

View File

@ -103,31 +103,6 @@ public class TestHRegionInfo {
+ id + "." + md5HashInHex + ".",
nameStr);
}
@Test
public void testGetSetOfHTD() throws IOException {
HBaseTestingUtility HTU = new HBaseTestingUtility();
final String tablename = "testGetSetOfHTD";
// Delete the temporary table directory that might still be there from the
// previous test run.
FSTableDescriptors.deleteTableDescriptorIfExists(tablename,
HTU.getConfiguration());
HTableDescriptor htd = new HTableDescriptor(tablename);
FSTableDescriptors.createTableDescriptor(htd, HTU.getConfiguration());
HRegionInfo hri = new HRegionInfo(Bytes.toBytes("testGetSetOfHTD"),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
HTableDescriptor htd2 = hri.getTableDesc();
assertTrue(htd.equals(htd2));
final String key = "SOME_KEY";
assertNull(htd.getValue(key));
final String value = "VALUE";
htd.setValue(key, value);
hri.setTableDesc(htd);
HTableDescriptor htd3 = hri.getTableDesc();
assertTrue(htd.equals(htd3));
}
@Test
public void testContainsRange() {

View File

@ -107,7 +107,7 @@ public class TestMultiColumnScanner {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final Compression.Algorithm comprAlgo;
private final StoreFile.BloomType bloomType;
private final BloomType bloomType;
private final DataBlockEncoding dataBlockEncoding;
// Some static sanity-checking.
@ -133,7 +133,7 @@ public class TestMultiColumnScanner {
}
public TestMultiColumnScanner(Compression.Algorithm comprAlgo,
StoreFile.BloomType bloomType, boolean useDataBlockEncoding) {
BloomType bloomType, boolean useDataBlockEncoding) {
this.comprAlgo = comprAlgo;
this.bloomType = bloomType;
this.dataBlockEncoding = useDataBlockEncoding ? DataBlockEncoding.PREFIX :

View File

@ -41,8 +41,6 @@ import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFilePrettyPrinter;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.util.Bytes;
@ -74,7 +72,7 @@ public class TestScanWithBloomError {
private static final byte[] ROW_BYTES = Bytes.toBytes(ROW);
private static NavigableSet<Integer> allColIds = new TreeSet<Integer>();
private HRegion region;
private StoreFile.BloomType bloomType;
private BloomType bloomType;
private FileSystem fs;
private Configuration conf;
@ -84,13 +82,13 @@ public class TestScanWithBloomError {
@Parameters
public static final Collection<Object[]> parameters() {
List<Object[]> configurations = new ArrayList<Object[]>();
for (StoreFile.BloomType bloomType : StoreFile.BloomType.values()) {
for (BloomType bloomType : BloomType.values()) {
configurations.add(new Object[] { bloomType });
}
return configurations;
}
public TestScanWithBloomError(StoreFile.BloomType bloomType) {
public TestScanWithBloomError(BloomType bloomType) {
this.bloomType = bloomType;
}

View File

@ -44,8 +44,6 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Before;
@ -115,7 +113,7 @@ public class TestSeekOptimizations {
private List<KeyValue> expectedKVs = new ArrayList<KeyValue>();
private Compression.Algorithm comprAlgo;
private StoreFile.BloomType bloomType;
private BloomType bloomType;
private long totalSeekDiligent, totalSeekLazy;
@ -128,7 +126,7 @@ public class TestSeekOptimizations {
}
public TestSeekOptimizations(Compression.Algorithm comprAlgo,
StoreFile.BloomType bloomType) {
BloomType bloomType) {
this.comprAlgo = comprAlgo;
this.bloomType = bloomType;
}

View File

@ -208,7 +208,7 @@ public class TestSplitTransaction {
TEST_UTIL.getConfiguration().setInt(HFile.FORMAT_VERSION_KEY, 1);
try {
for (Store store : this.parent.stores.values()) {
store.getFamily().setBloomFilterType(StoreFile.BloomType.ROW);
store.getFamily().setBloomFilterType(BloomType.ROW);
}
testWholesomeSplit();
} finally {

View File

@ -49,7 +49,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
@ -99,7 +99,7 @@ public class TestStoreFile extends HBaseTestCase {
.build();
writeStoreFile(writer);
checkHalfHFile(new StoreFile(this.fs, writer.getPath(), conf, cacheConf,
StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE));
BloomType.NONE, NoOpDataBlockEncoder.INSTANCE));
}
private void writeStoreFile(final StoreFile.Writer writer) throws IOException {
@ -142,7 +142,7 @@ public class TestStoreFile extends HBaseTestCase {
.build();
writeStoreFile(writer);
StoreFile hsf = new StoreFile(this.fs, writer.getPath(), conf, cacheConf,
StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
StoreFile.Reader reader = hsf.createReader();
// Split on a row, not in middle of row. Midkey returned by reader
// may be in middle of row. Create new one with empty column and
@ -154,7 +154,7 @@ public class TestStoreFile extends HBaseTestCase {
// Make a reference
Path refPath = StoreFile.split(fs, dir, hsf, midRow, true);
StoreFile refHsf = new StoreFile(this.fs, refPath, conf, cacheConf,
StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
// Now confirm that I can read from the reference and that it only gets
// keys from top half of the file.
HFileScanner s = refHsf.createReader().getScanner(false, false);
@ -191,7 +191,7 @@ public class TestStoreFile extends HBaseTestCase {
// Try to open store file from link
StoreFile hsf = new StoreFile(this.fs, linkFilePath, conf, cacheConf,
StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
assertTrue(hsf.isLink());
// Now confirm that I can read from the link
@ -284,10 +284,10 @@ public class TestStoreFile extends HBaseTestCase {
topPath = StoreFile.split(this.fs, topDir, f, badmidkey, true);
bottomPath = StoreFile.split(this.fs, bottomDir, f, badmidkey, false);
top = new StoreFile(this.fs, topPath, conf, cacheConf,
StoreFile.BloomType.NONE,
BloomType.NONE,
NoOpDataBlockEncoder.INSTANCE).createReader();
bottom = new StoreFile(this.fs, bottomPath, conf, cacheConf,
StoreFile.BloomType.NONE,
BloomType.NONE,
NoOpDataBlockEncoder.INSTANCE).createReader();
bottomScanner = bottom.getScanner(false, false);
int count = 0;
@ -330,10 +330,10 @@ public class TestStoreFile extends HBaseTestCase {
topPath = StoreFile.split(this.fs, topDir, f, badmidkey, true);
bottomPath = StoreFile.split(this.fs, bottomDir, f, badmidkey, false);
top = new StoreFile(this.fs, topPath, conf, cacheConf,
StoreFile.BloomType.NONE,
BloomType.NONE,
NoOpDataBlockEncoder.INSTANCE).createReader();
bottom = new StoreFile(this.fs, bottomPath, conf, cacheConf,
StoreFile.BloomType.NONE,
BloomType.NONE,
NoOpDataBlockEncoder.INSTANCE).createReader();
first = true;
bottomScanner = bottom.getScanner(false, false);
@ -433,7 +433,7 @@ public class TestStoreFile extends HBaseTestCase {
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs,
StoreFile.DEFAULT_BLOCKSIZE_SMALL)
.withFilePath(f)
.withBloomType(StoreFile.BloomType.ROW)
.withBloomType(BloomType.ROW)
.withMaxKeyCount(2000)
.withChecksumType(CKTYPE)
.withBytesPerChecksum(CKBYTES)
@ -510,8 +510,8 @@ public class TestStoreFile extends HBaseTestCase {
int versions = 2;
// run once using columns and once using rows
StoreFile.BloomType[] bt =
{StoreFile.BloomType.ROWCOL, StoreFile.BloomType.ROW};
BloomType[] bt =
{BloomType.ROWCOL, BloomType.ROW};
int[] expKeys = {rowCount*colCount, rowCount};
// below line deserves commentary. it is expected bloom false positives
// column = rowCount*2*colCount inserts
@ -569,7 +569,7 @@ public class TestStoreFile extends HBaseTestCase {
scanner.shouldUseScanner(scan, columns, Long.MIN_VALUE);
boolean shouldRowExist = i % 2 == 0;
boolean shouldColExist = j % 2 == 0;
shouldColExist = shouldColExist || bt[x] == StoreFile.BloomType.ROW;
shouldColExist = shouldColExist || bt[x] == BloomType.ROW;
if (shouldRowExist && shouldColExist) {
if (!exists) falseNeg++;
} else {
@ -602,7 +602,7 @@ public class TestStoreFile extends HBaseTestCase {
StoreFile.Writer writer = new StoreFile.WriterBuilder(conf, cacheConf, fs,
StoreFile.DEFAULT_BLOCKSIZE_SMALL)
.withFilePath(f)
.withBloomType(StoreFile.BloomType.ROW)
.withBloomType(BloomType.ROW)
.withMaxKeyCount(2000)
.withChecksumType(CKTYPE)
.withBytesPerChecksum(CKBYTES)
@ -620,7 +620,7 @@ public class TestStoreFile extends HBaseTestCase {
writer = new StoreFile.WriterBuilder(conf, cacheConf, fs,
StoreFile.DEFAULT_BLOCKSIZE_SMALL)
.withFilePath(f)
.withBloomType(StoreFile.BloomType.ROW)
.withBloomType(BloomType.ROW)
.withMaxKeyCount(27244696)
.build();
assertTrue(writer.hasGeneralBloom());
@ -631,7 +631,7 @@ public class TestStoreFile extends HBaseTestCase {
writer = new StoreFile.WriterBuilder(conf, cacheConf, fs,
StoreFile.DEFAULT_BLOCKSIZE_SMALL)
.withFilePath(f)
.withBloomType(StoreFile.BloomType.ROW)
.withBloomType(BloomType.ROW)
.withMaxKeyCount(Integer.MAX_VALUE)
.withChecksumType(CKTYPE)
.withBytesPerChecksum(CKBYTES)
@ -735,7 +735,7 @@ public class TestStoreFile extends HBaseTestCase {
writer.close();
StoreFile hsf = new StoreFile(this.fs, writer.getPath(), conf, cacheConf,
StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
StoreFile.Reader reader = hsf.createReader();
StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
TreeSet<byte[]> columns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
@ -778,7 +778,7 @@ public class TestStoreFile extends HBaseTestCase {
Path pathCowOff = new Path(baseDir, "123456789");
StoreFile.Writer writer = writeStoreFile(conf, cacheConf, pathCowOff, 3);
StoreFile hsf = new StoreFile(this.fs, writer.getPath(), conf, cacheConf,
StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
LOG.debug(hsf.getPath().toString());
// Read this file, we should see 3 misses
@ -800,7 +800,7 @@ public class TestStoreFile extends HBaseTestCase {
Path pathCowOn = new Path(baseDir, "123456788");
writer = writeStoreFile(conf, cacheConf, pathCowOn, 3);
hsf = new StoreFile(this.fs, writer.getPath(), conf, cacheConf,
StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
// Read this file, we should see 3 hits
reader = hsf.createReader();
@ -816,13 +816,13 @@ public class TestStoreFile extends HBaseTestCase {
// Let's read back the two files to ensure the blocks exactly match
hsf = new StoreFile(this.fs, pathCowOff, conf, cacheConf,
StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
StoreFile.Reader readerOne = hsf.createReader();
readerOne.loadFileInfo();
StoreFileScanner scannerOne = readerOne.getStoreFileScanner(true, true);
scannerOne.seek(KeyValue.LOWESTKEY);
hsf = new StoreFile(this.fs, pathCowOn, conf, cacheConf,
StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
StoreFile.Reader readerTwo = hsf.createReader();
readerTwo.loadFileInfo();
StoreFileScanner scannerTwo = readerTwo.getStoreFileScanner(true, true);
@ -853,7 +853,7 @@ public class TestStoreFile extends HBaseTestCase {
conf.setBoolean("hbase.rs.evictblocksonclose", true);
cacheConf = new CacheConfig(conf);
hsf = new StoreFile(this.fs, pathCowOff, conf, cacheConf,
StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
reader = hsf.createReader();
reader.close(cacheConf.shouldEvictOnClose());
@ -867,7 +867,7 @@ public class TestStoreFile extends HBaseTestCase {
conf.setBoolean("hbase.rs.evictblocksonclose", false);
cacheConf = new CacheConfig(conf);
hsf = new StoreFile(this.fs, pathCowOn, conf, cacheConf,
StoreFile.BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
BloomType.NONE, NoOpDataBlockEncoder.INSTANCE);
reader = hsf.createReader();
reader.close(cacheConf.shouldEvictOnClose());

View File

@ -42,8 +42,9 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseClientRPC;
import org.apache.hadoop.hbase.ipc.HBaseServer;
import org.apache.hadoop.hbase.ipc.HBaseServerRPC;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.ipc.RequestContext;
import org.apache.hadoop.hbase.ipc.RpcServer;
@ -122,7 +123,7 @@ public class TestTokenAuthentication {
throw new IllegalArgumentException("Failed resolve of " + initialIsa);
}
this.rpcServer = HBaseRPC.getServer(TokenServer.class, this,
this.rpcServer = HBaseServerRPC.getServer(TokenServer.class, this,
new Class<?>[]{AuthenticationProtos.AuthenticationService.Interface.class},
initialIsa.getHostName(), // BindAddress is IP we got for this server.
initialIsa.getPort(),
@ -377,7 +378,7 @@ public class TestTokenAuthentication {
c.set(HConstants.CLUSTER_ID, clusterId.toString());
AuthenticationProtos.AuthenticationService.BlockingInterface proxy =
(AuthenticationProtos.AuthenticationService.BlockingInterface)
HBaseRPC.waitForProxy(BlockingAuthenticationService.class,
HBaseClientRPC.waitForProxy(BlockingAuthenticationService.class,
BlockingAuthenticationService.VERSION,
server.getAddress(), c,
HConstants.DEFAULT_HBASE_CLIENT_RPC_MAXATTEMPTS,

View File

@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.BloomType;
/**
* A command-line utility that reads, writes, and verifies data. Unlike
@ -67,7 +67,7 @@ public class LoadTestTool extends AbstractHBaseTool {
"<verify_percent>[:<#threads=" + DEFAULT_NUM_THREADS + ">]";
private static final String OPT_USAGE_BLOOM = "Bloom filter type, one of " +
Arrays.toString(StoreFile.BloomType.values());
Arrays.toString(BloomType.values());
private static final String OPT_USAGE_COMPRESSION = "Compression type, " +
"one of " + Arrays.toString(Compression.Algorithm.values());
@ -115,7 +115,7 @@ public class LoadTestTool extends AbstractHBaseTool {
private DataBlockEncoding dataBlockEncodingAlgo;
private boolean encodeInCacheOnly;
private Compression.Algorithm compressAlgo;
private StoreFile.BloomType bloomType;
private BloomType bloomType;
// Writer options
private int numWriterThreads = DEFAULT_NUM_THREADS;
@ -317,7 +317,7 @@ public class LoadTestTool extends AbstractHBaseTool {
String bloomStr = cmd.getOptionValue(OPT_BLOOM);
bloomType = bloomStr == null ? null :
StoreFile.BloomType.valueOf(bloomStr);
BloomType.valueOf(bloomStr);
}
public void initTestTable() throws IOException {