HBASE-9321 Contention getting the current user in RpcClient.writeRequest

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1518693 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jxiang 2013-08-29 16:28:07 +00:00
parent 143b12daec
commit 52ea8f817c
19 changed files with 331 additions and 910 deletions

View File

@ -317,7 +317,7 @@ public class HConnectionManager {
*/ */
public static HConnection createConnection(Configuration conf) public static HConnection createConnection(Configuration conf)
throws IOException { throws IOException {
return createConnection(conf, false, null); return createConnection(conf, false, null, User.getCurrent());
} }
/** /**
@ -342,17 +342,69 @@ public class HConnectionManager {
*/ */
public static HConnection createConnection(Configuration conf, ExecutorService pool) public static HConnection createConnection(Configuration conf, ExecutorService pool)
throws IOException { throws IOException {
return createConnection(conf, false, pool); return createConnection(conf, false, pool, User.getCurrent());
}
/**
* Create a new HConnection instance using the passed <code>conf</code> instance.
* <p>Note: This bypasses the usual HConnection life cycle management done by
* {@link #getConnection(Configuration)}. The caller is responsible for
* calling {@link HConnection#close()} on the returned connection instance.
* This is the recommended way to create HConnections.
* {@code
* ExecutorService pool = ...;
* HConnection connection = HConnectionManager.createConnection(conf, pool);
* HTableInterface table = connection.getTable("mytable");
* table.get(...);
* ...
* table.close();
* connection.close();
* }
* @param conf configuration
* @param user the user the connection is for
* @return HConnection object for <code>conf</code>
* @throws ZooKeeperConnectionException
*/
public static HConnection createConnection(Configuration conf, User user)
throws IOException {
return createConnection(conf, false, null, user);
}
/**
* Create a new HConnection instance using the passed <code>conf</code> instance.
* <p>Note: This bypasses the usual HConnection life cycle management done by
* {@link #getConnection(Configuration)}. The caller is responsible for
* calling {@link HConnection#close()} on the returned connection instance.
* This is the recommended way to create HConnections.
* {@code
* ExecutorService pool = ...;
* HConnection connection = HConnectionManager.createConnection(conf, pool);
* HTableInterface table = connection.getTable("mytable");
* table.get(...);
* ...
* table.close();
* connection.close();
* }
* @param conf configuration
* @param pool the thread pool to use for batch operation in HTables used via this HConnection
* @param user the user the connection is for
* @return HConnection object for <code>conf</code>
* @throws ZooKeeperConnectionException
*/
public static HConnection createConnection(Configuration conf, ExecutorService pool, User user)
throws IOException {
return createConnection(conf, false, pool, user);
} }
@Deprecated @Deprecated
static HConnection createConnection(final Configuration conf, final boolean managed) static HConnection createConnection(final Configuration conf, final boolean managed)
throws IOException { throws IOException {
return createConnection(conf, managed, null); return createConnection(conf, managed, null, User.getCurrent());
} }
@Deprecated @Deprecated
static HConnection createConnection(final Configuration conf, final boolean managed, final ExecutorService pool) static HConnection createConnection(final Configuration conf, final boolean managed,
final ExecutorService pool, final User user)
throws IOException { throws IOException {
String className = conf.get("hbase.client.connection.impl", String className = conf.get("hbase.client.connection.impl",
HConnectionManager.HConnectionImplementation.class.getName()); HConnectionManager.HConnectionImplementation.class.getName());
@ -365,9 +417,10 @@ public class HConnectionManager {
try { try {
// Default HCM#HCI is not accessible; make it so before invoking. // Default HCM#HCI is not accessible; make it so before invoking.
Constructor<?> constructor = Constructor<?> constructor =
clazz.getDeclaredConstructor(Configuration.class, boolean.class, ExecutorService.class); clazz.getDeclaredConstructor(Configuration.class,
boolean.class, ExecutorService.class, User.class);
constructor.setAccessible(true); constructor.setAccessible(true);
return (HConnection) constructor.newInstance(conf, managed, pool); return (HConnection) constructor.newInstance(conf, managed, pool, user);
} catch (Exception e) { } catch (Exception e) {
throw new IOException(e); throw new IOException(e);
} }
@ -583,13 +636,15 @@ public class HConnectionManager {
// indicates whether this connection's life cycle is managed (by us) // indicates whether this connection's life cycle is managed (by us)
private boolean managed; private boolean managed;
private User user;
/** /**
* Cluster registry of basic info such as clusterid and meta region location. * Cluster registry of basic info such as clusterid and meta region location.
*/ */
Registry registry; Registry registry;
HConnectionImplementation(Configuration conf, boolean managed) throws IOException { HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
this(conf, managed, null); this(conf, managed, null, null);
} }
/** /**
@ -603,8 +658,10 @@ public class HConnectionManager {
* are shared, we have reference counting going on and will only do full cleanup when no more * are shared, we have reference counting going on and will only do full cleanup when no more
* users of an HConnectionImplementation instance. * users of an HConnectionImplementation instance.
*/ */
HConnectionImplementation(Configuration conf, boolean managed, ExecutorService pool) throws IOException { HConnectionImplementation(Configuration conf, boolean managed,
ExecutorService pool, User user) throws IOException {
this(conf); this(conf);
this.user = user;
this.batchPool = pool; this.batchPool = pool;
this.managed = managed; this.managed = managed;
this.registry = setupRegistry(); this.registry = setupRegistry();
@ -1574,7 +1631,7 @@ public class HConnectionManager {
stub = stubs.get(key); stub = stubs.get(key);
if (stub == null) { if (stub == null) {
BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn, BlockingRpcChannel channel = rpcClient.createBlockingRpcChannel(sn,
User.getCurrent(), rpcTimeout); user, rpcTimeout);
stub = makeStub(channel); stub = makeStub(channel);
isMasterRunning(); isMasterRunning();
stubs.put(key, stub); stubs.put(key, stub);
@ -1723,7 +1780,7 @@ public class HConnectionManager {
stub = (AdminService.BlockingInterface)this.stubs.get(key); stub = (AdminService.BlockingInterface)this.stubs.get(key);
if (stub == null) { if (stub == null) {
BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName, BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(serverName,
User.getCurrent(), this.rpcTimeout); user, this.rpcTimeout);
stub = AdminService.newBlockingStub(channel); stub = AdminService.newBlockingStub(channel);
this.stubs.put(key, stub); this.stubs.put(key, stub);
} }
@ -1744,7 +1801,7 @@ public class HConnectionManager {
stub = (ClientService.BlockingInterface)this.stubs.get(key); stub = (ClientService.BlockingInterface)this.stubs.get(key);
if (stub == null) { if (stub == null) {
BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn, BlockingRpcChannel channel = this.rpcClient.createBlockingRpcChannel(sn,
User.getCurrent(), this.rpcTimeout); user, this.rpcTimeout);
stub = ClientService.newBlockingStub(channel); stub = ClientService.newBlockingStub(channel);
// In old days, after getting stub/proxy, we'd make a call. We are not doing that here. // In old days, after getting stub/proxy, we'd make a call. We are not doing that here.
// Just fail on first actual call rather than in here on setup. // Just fail on first actual call rather than in here on setup.

View File

@ -1,453 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.MasterAdminService;
import org.apache.hadoop.hbase.protobuf.generated.MasterMonitorProtos.MasterMonitorService;
import org.apache.hadoop.security.UserGroupInformation;
/**
* This is a HConnection wrapper. Whenever a RPC connection is created,
* this class makes sure the specific user is used as the ticket. We assume
* just these methods will create any RPC connection based on the default
* HConnection implementation: getClient, getAdmin, getKeepAliveMasterMonitorService,
* and getKeepAliveMasterAdminService.
*
* This class is put here only because the HConnection interface exposes
* packaged private class MasterMonitorKeepAliveConnection and
* MasterAddKeepAliveConnection.
*/
@InterfaceAudience.Private
@SuppressWarnings("deprecation")
public class HConnectionWrapper implements HConnection {
private final UserGroupInformation ugi;
private final HConnection hconnection;
public HConnectionWrapper(final UserGroupInformation ugi,
final HConnection hconnection) {
this.hconnection = hconnection;
this.ugi = ugi;
}
@Override
public HTableInterface getTable(String tableName) throws IOException {
return hconnection.getTable(tableName);
}
@Override
public HTableInterface getTable(byte[] tableName) throws IOException {
return hconnection.getTable(tableName);
}
@Override
public HTableInterface getTable(TableName tableName) throws IOException {
return hconnection.getTable(tableName);
}
@Override
public HTableInterface getTable(String tableName, ExecutorService pool) throws IOException {
return hconnection.getTable(tableName, pool);
}
@Override
public HTableInterface getTable(byte[] tableName, ExecutorService pool) throws IOException {
return hconnection.getTable(tableName, pool);
}
@Override
public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
return hconnection.getTable(tableName, pool);
}
@Override
public void abort(String why, Throwable e) {
hconnection.abort(why, e);
}
@Override
public boolean isAborted() {
return hconnection.isAborted();
}
@Override
public void close() throws IOException {
hconnection.close();
}
@Override
public Configuration getConfiguration() {
return hconnection.getConfiguration();
}
@Override
public boolean isMasterRunning() throws MasterNotRunningException,
ZooKeeperConnectionException {
return hconnection.isMasterRunning();
}
@Override
public boolean isTableEnabled(TableName tableName) throws IOException {
return hconnection.isTableEnabled(tableName);
}
@Override
public boolean isTableEnabled(byte[] tableName) throws IOException {
return isTableEnabled(TableName.valueOf(tableName));
}
@Override
public boolean isTableDisabled(TableName tableName) throws IOException {
return hconnection.isTableDisabled(tableName);
}
@Override
public boolean isTableDisabled(byte[] tableName) throws IOException {
return isTableDisabled(TableName.valueOf(tableName));
}
@Override
public boolean isTableAvailable(TableName tableName) throws IOException {
return hconnection.isTableAvailable(tableName);
}
@Override
public boolean isTableAvailable(byte[] tableName) throws IOException {
return isTableAvailable(TableName.valueOf(tableName));
}
@Override
public boolean isTableAvailable(TableName tableName, byte[][] splitKeys) throws IOException {
return hconnection.isTableAvailable(tableName, splitKeys);
}
@Override
public boolean isTableAvailable(byte[] tableName, byte[][] splitKeys) throws IOException {
return isTableAvailable(TableName.valueOf(tableName), splitKeys);
}
@Override
public HTableDescriptor[] listTables() throws IOException {
return hconnection.listTables();
}
@Override
public String[] getTableNames() throws IOException {
return hconnection.getTableNames();
}
@Override
public TableName[] listTableNames() throws IOException {
return hconnection.listTableNames();
}
@Override
public HTableDescriptor getHTableDescriptor(TableName tableName) throws IOException {
return hconnection.getHTableDescriptor(tableName);
}
@Override
public HTableDescriptor getHTableDescriptor(byte[] tableName) throws IOException {
return getHTableDescriptor(TableName.valueOf(tableName));
}
@Override
public HRegionLocation locateRegion(TableName tableName, byte[] row) throws IOException {
return hconnection.locateRegion(tableName, row);
}
@Override
public HRegionLocation locateRegion(byte[] tableName, byte[] row) throws IOException {
return locateRegion(TableName.valueOf(tableName), row);
}
@Override
public void clearRegionCache() {
hconnection.clearRegionCache();
}
@Override
public void clearRegionCache(TableName tableName) {
hconnection.clearRegionCache(tableName);
}
@Override
public void clearRegionCache(byte[] tableName) {
clearRegionCache(TableName.valueOf(tableName));
}
@Override
public void deleteCachedRegionLocation(HRegionLocation location) {
hconnection.deleteCachedRegionLocation(location);
}
@Override
public HRegionLocation relocateRegion(TableName tableName, byte[] row) throws IOException {
return hconnection.relocateRegion(tableName, row);
}
@Override
public HRegionLocation relocateRegion(byte[] tableName, byte[] row) throws IOException {
return relocateRegion(TableName.valueOf(tableName), row);
}
@Override
public void updateCachedLocations(TableName tableName,
byte[] rowkey,
Object exception,
HRegionLocation source) {
hconnection.updateCachedLocations(tableName, rowkey, exception, source);
}
@Override
public void updateCachedLocations(byte[] tableName,
byte[] rowkey,
Object exception,
HRegionLocation source) {
updateCachedLocations(TableName.valueOf(tableName), rowkey, exception, source);
}
@Override
public HRegionLocation locateRegion(byte[] regionName) throws IOException {
return hconnection.locateRegion(regionName);
}
@Override
public List<HRegionLocation> locateRegions(TableName tableName) throws IOException {
return hconnection.locateRegions(tableName);
}
@Override
public List<HRegionLocation> locateRegions(byte[] tableName) throws IOException {
return locateRegions(TableName.valueOf(tableName));
}
@Override
public List<HRegionLocation> locateRegions(TableName tableName,
boolean useCache,
boolean offlined) throws IOException {
return hconnection.locateRegions(tableName, useCache, offlined);
}
@Override
public List<HRegionLocation> locateRegions(byte[] tableName,
boolean useCache,
boolean offlined) throws IOException {
return locateRegions(TableName.valueOf(tableName));
}
@Override
public MasterAdminService.BlockingInterface getMasterAdmin() throws IOException {
return hconnection.getMasterAdmin();
}
@Override
public MasterMonitorService.BlockingInterface getMasterMonitor()
throws IOException {
return hconnection.getMasterMonitor();
}
@Override
public AdminService.BlockingInterface getAdmin(
ServerName serverName) throws IOException {
return hconnection.getAdmin(serverName);
}
@Override
public ClientService.BlockingInterface getClient(
final ServerName serverName) throws IOException {
try {
return ugi.doAs(new PrivilegedExceptionAction<ClientService.BlockingInterface>() {
@Override
public ClientService.BlockingInterface run() throws IOException {
return hconnection.getClient(serverName);
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}
@Override
public AdminService.BlockingInterface getAdmin(
final ServerName serverName, final boolean getMaster) throws IOException {
try {
return ugi.doAs(new PrivilegedExceptionAction<AdminService.BlockingInterface>() {
@Override
public AdminService.BlockingInterface run() throws IOException {
return hconnection.getAdmin(serverName, getMaster);
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}
@Override
public HRegionLocation getRegionLocation(TableName tableName,
byte[] row, boolean reload) throws IOException {
return hconnection.getRegionLocation(tableName, row, reload);
}
@Override
public HRegionLocation getRegionLocation(byte[] tableName,
byte[] row, boolean reload) throws IOException {
return getRegionLocation(TableName.valueOf(tableName), row, reload);
}
@Override
public void processBatch(List<? extends Row> actions, TableName tableName, ExecutorService pool,
Object[] results) throws IOException, InterruptedException {
hconnection.processBatch(actions, tableName, pool, results);
}
@Override
public void processBatch(List<? extends Row> actions, byte[] tableName, ExecutorService pool,
Object[] results) throws IOException, InterruptedException {
processBatch(actions, TableName.valueOf(tableName), pool, results);
}
@Override
public <R> void processBatchCallback(List<? extends Row> list, TableName tableName,
ExecutorService pool,
Object[] results,
Callback<R> callback)
throws IOException, InterruptedException {
hconnection.processBatchCallback(list, tableName, pool, results, callback);
}
@Override
public <R> void processBatchCallback(List<? extends Row> list, byte[] tableName,
ExecutorService pool,
Object[] results,
Callback<R> callback)
throws IOException, InterruptedException {
processBatchCallback(list, TableName.valueOf(tableName), pool, results, callback);
}
@Override
public void setRegionCachePrefetch(TableName tableName, boolean enable) {
hconnection.setRegionCachePrefetch(tableName, enable);
}
@Override
public void setRegionCachePrefetch(byte[] tableName, boolean enable) {
setRegionCachePrefetch(TableName.valueOf(tableName), enable);
}
@Override
public boolean getRegionCachePrefetch(TableName tableName) {
return hconnection.getRegionCachePrefetch(tableName);
}
@Override
public boolean getRegionCachePrefetch(byte[] tableName) {
return getRegionCachePrefetch(TableName.valueOf(tableName));
}
@Override
public int getCurrentNrHRS() throws IOException {
return hconnection.getCurrentNrHRS();
}
@Override
public HTableDescriptor[] getHTableDescriptorsByTableName(
List<TableName> tableNames) throws IOException {
return hconnection.getHTableDescriptorsByTableName(tableNames);
}
@Override
public HTableDescriptor[] getHTableDescriptors(
List<String> names) throws IOException {
List<TableName> tableNames = new ArrayList<TableName>(names.size());
for(String name : names) {
tableNames.add(TableName.valueOf(name));
}
return getHTableDescriptorsByTableName(tableNames);
}
@Override
public boolean isClosed() {
return hconnection.isClosed();
}
@Override
public void clearCaches(ServerName sn) {
hconnection.clearCaches(sn);
}
@Override
public MasterMonitorKeepAliveConnection getKeepAliveMasterMonitorService()
throws MasterNotRunningException {
try {
return ugi.doAs(new PrivilegedExceptionAction<MasterMonitorKeepAliveConnection>() {
@Override
public MasterMonitorKeepAliveConnection run() throws MasterNotRunningException {
return hconnection.getKeepAliveMasterMonitorService();
}
});
} catch (IOException ie) {
throw new MasterNotRunningException(ie);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MasterNotRunningException(e);
}
}
@Override
public MasterAdminKeepAliveConnection getKeepAliveMasterAdminService()
throws MasterNotRunningException {
try {
return ugi.doAs(new PrivilegedExceptionAction<MasterAdminKeepAliveConnection>() {
@Override
public MasterAdminKeepAliveConnection run() throws MasterNotRunningException {
return hconnection.getKeepAliveMasterAdminService();
}
});
} catch (IOException ie) {
throw new MasterNotRunningException(ie);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MasterNotRunningException(e);
}
}
@Override
public boolean isDeadServer(ServerName serverName) {
return hconnection.isDeadServer(serverName);
}
}

View File

@ -1011,24 +1011,6 @@ public class RpcClient {
builder.setTraceInfo(RPCTInfo.newBuilder(). builder.setTraceInfo(RPCTInfo.newBuilder().
setParentId(s.getSpanId()).setTraceId(s.getTraceId())); setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
} }
UserGroupInformation ticket = remoteId.getTicket().getUGI();
if (ticket != null) {
UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
String effectiveUser = currentUser == null ? null : currentUser.getShortUserName();
if (effectiveUser != null && !effectiveUser.equals(ticket.getShortUserName())) {
if (ticket.getRealUser() != null) {
LOG.warn("Current user " + effectiveUser
+ " is different from the ticket user " + ticket.getShortUserName()
+ ". But the ticket is already a proxy user with real user "
+ ticket.getRealUser().getShortUserName());
} else {
// If the ticket is not a proxy user, and the current user
// is not the same as the ticket user, pass the current user
// over to the server as the actual effective user.
builder.setEffectiveUser(effectiveUser);
}
}
}
builder.setMethodName(call.md.getName()); builder.setMethodName(call.md.getName());
builder.setRequestParam(call.param != null); builder.setRequestParam(call.param != null);
ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells); ByteBuffer cellBlock = ipcUtil.buildCellBlock(this.codec, this.compressor, call.cells);

View File

@ -17,7 +17,8 @@
*/ */
package org.apache.hadoop.hbase.client; package org.apache.hadoop.hbase.client;
import static org.junit.Assert.*; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
@ -26,21 +27,22 @@ import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Before; import org.junit.Before;
import org.junit.Test;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -249,7 +251,7 @@ public class TestClientNoCluster {
final ClientService.BlockingInterface stub; final ClientService.BlockingInterface stub;
RegionServerStoppedOnScannerOpenConnection(Configuration conf, boolean managed, RegionServerStoppedOnScannerOpenConnection(Configuration conf, boolean managed,
ExecutorService pool) throws IOException { ExecutorService pool, User user) throws IOException {
super(conf, managed); super(conf, managed);
// Mock up my stub so open scanner returns a scanner id and then on next, we throw // Mock up my stub so open scanner returns a scanner id and then on next, we throw
// exceptions for three times and then after that, we return no more to scan. // exceptions for three times and then after that, we return no more to scan.
@ -280,7 +282,7 @@ public class TestClientNoCluster {
extends HConnectionManager.HConnectionImplementation { extends HConnectionManager.HConnectionImplementation {
final ClientService.BlockingInterface stub; final ClientService.BlockingInterface stub;
RpcTimeoutConnection(Configuration conf, boolean managed, ExecutorService pool) RpcTimeoutConnection(Configuration conf, boolean managed, ExecutorService pool, User user)
throws IOException { throws IOException {
super(conf, managed); super(conf, managed);
// Mock up my stub so an exists call -- which turns into a get -- throws an exception // Mock up my stub so an exists call -- which turns into a get -- throws an exception

View File

@ -3662,33 +3662,6 @@ public final class RPCProtos {
* </pre> * </pre>
*/ */
org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMetaOrBuilder getCellBlockMetaOrBuilder(); org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMetaOrBuilder getCellBlockMetaOrBuilder();
// optional string effective_user = 6;
/**
* <code>optional string effective_user = 6;</code>
*
* <pre>
* If present, the request is made on behalf of this user
* </pre>
*/
boolean hasEffectiveUser();
/**
* <code>optional string effective_user = 6;</code>
*
* <pre>
* If present, the request is made on behalf of this user
* </pre>
*/
java.lang.String getEffectiveUser();
/**
* <code>optional string effective_user = 6;</code>
*
* <pre>
* If present, the request is made on behalf of this user
* </pre>
*/
com.google.protobuf.ByteString
getEffectiveUserBytes();
} }
/** /**
* Protobuf type {@code RequestHeader} * Protobuf type {@code RequestHeader}
@ -3786,11 +3759,6 @@ public final class RPCProtos {
bitField0_ |= 0x00000010; bitField0_ |= 0x00000010;
break; break;
} }
case 50: {
bitField0_ |= 0x00000020;
effectiveUser_ = input.readBytes();
break;
}
} }
} }
} catch (com.google.protobuf.InvalidProtocolBufferException e) { } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@ -3978,68 +3946,12 @@ public final class RPCProtos {
return cellBlockMeta_; return cellBlockMeta_;
} }
// optional string effective_user = 6;
public static final int EFFECTIVE_USER_FIELD_NUMBER = 6;
private java.lang.Object effectiveUser_;
/**
* <code>optional string effective_user = 6;</code>
*
* <pre>
* If present, the request is made on behalf of this user
* </pre>
*/
public boolean hasEffectiveUser() {
return ((bitField0_ & 0x00000020) == 0x00000020);
}
/**
* <code>optional string effective_user = 6;</code>
*
* <pre>
* If present, the request is made on behalf of this user
* </pre>
*/
public java.lang.String getEffectiveUser() {
java.lang.Object ref = effectiveUser_;
if (ref instanceof java.lang.String) {
return (java.lang.String) ref;
} else {
com.google.protobuf.ByteString bs =
(com.google.protobuf.ByteString) ref;
java.lang.String s = bs.toStringUtf8();
if (bs.isValidUtf8()) {
effectiveUser_ = s;
}
return s;
}
}
/**
* <code>optional string effective_user = 6;</code>
*
* <pre>
* If present, the request is made on behalf of this user
* </pre>
*/
public com.google.protobuf.ByteString
getEffectiveUserBytes() {
java.lang.Object ref = effectiveUser_;
if (ref instanceof java.lang.String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref);
effectiveUser_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
private void initFields() { private void initFields() {
callId_ = 0; callId_ = 0;
traceInfo_ = org.apache.hadoop.hbase.protobuf.generated.Tracing.RPCTInfo.getDefaultInstance(); traceInfo_ = org.apache.hadoop.hbase.protobuf.generated.Tracing.RPCTInfo.getDefaultInstance();
methodName_ = ""; methodName_ = "";
requestParam_ = false; requestParam_ = false;
cellBlockMeta_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta.getDefaultInstance(); cellBlockMeta_ = org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta.getDefaultInstance();
effectiveUser_ = "";
} }
private byte memoizedIsInitialized = -1; private byte memoizedIsInitialized = -1;
public final boolean isInitialized() { public final boolean isInitialized() {
@ -4068,9 +3980,6 @@ public final class RPCProtos {
if (((bitField0_ & 0x00000010) == 0x00000010)) { if (((bitField0_ & 0x00000010) == 0x00000010)) {
output.writeMessage(5, cellBlockMeta_); output.writeMessage(5, cellBlockMeta_);
} }
if (((bitField0_ & 0x00000020) == 0x00000020)) {
output.writeBytes(6, getEffectiveUserBytes());
}
getUnknownFields().writeTo(output); getUnknownFields().writeTo(output);
} }
@ -4100,10 +4009,6 @@ public final class RPCProtos {
size += com.google.protobuf.CodedOutputStream size += com.google.protobuf.CodedOutputStream
.computeMessageSize(5, cellBlockMeta_); .computeMessageSize(5, cellBlockMeta_);
} }
if (((bitField0_ & 0x00000020) == 0x00000020)) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(6, getEffectiveUserBytes());
}
size += getUnknownFields().getSerializedSize(); size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size; memoizedSerializedSize = size;
return size; return size;
@ -4152,11 +4057,6 @@ public final class RPCProtos {
result = result && getCellBlockMeta() result = result && getCellBlockMeta()
.equals(other.getCellBlockMeta()); .equals(other.getCellBlockMeta());
} }
result = result && (hasEffectiveUser() == other.hasEffectiveUser());
if (hasEffectiveUser()) {
result = result && getEffectiveUser()
.equals(other.getEffectiveUser());
}
result = result && result = result &&
getUnknownFields().equals(other.getUnknownFields()); getUnknownFields().equals(other.getUnknownFields());
return result; return result;
@ -4190,10 +4090,6 @@ public final class RPCProtos {
hash = (37 * hash) + CELL_BLOCK_META_FIELD_NUMBER; hash = (37 * hash) + CELL_BLOCK_META_FIELD_NUMBER;
hash = (53 * hash) + getCellBlockMeta().hashCode(); hash = (53 * hash) + getCellBlockMeta().hashCode();
} }
if (hasEffectiveUser()) {
hash = (37 * hash) + EFFECTIVE_USER_FIELD_NUMBER;
hash = (53 * hash) + getEffectiveUser().hashCode();
}
hash = (29 * hash) + getUnknownFields().hashCode(); hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash; memoizedHashCode = hash;
return hash; return hash;
@ -4327,8 +4223,6 @@ public final class RPCProtos {
cellBlockMetaBuilder_.clear(); cellBlockMetaBuilder_.clear();
} }
bitField0_ = (bitField0_ & ~0x00000010); bitField0_ = (bitField0_ & ~0x00000010);
effectiveUser_ = "";
bitField0_ = (bitField0_ & ~0x00000020);
return this; return this;
} }
@ -4385,10 +4279,6 @@ public final class RPCProtos {
} else { } else {
result.cellBlockMeta_ = cellBlockMetaBuilder_.build(); result.cellBlockMeta_ = cellBlockMetaBuilder_.build();
} }
if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
to_bitField0_ |= 0x00000020;
}
result.effectiveUser_ = effectiveUser_;
result.bitField0_ = to_bitField0_; result.bitField0_ = to_bitField0_;
onBuilt(); onBuilt();
return result; return result;
@ -4422,11 +4312,6 @@ public final class RPCProtos {
if (other.hasCellBlockMeta()) { if (other.hasCellBlockMeta()) {
mergeCellBlockMeta(other.getCellBlockMeta()); mergeCellBlockMeta(other.getCellBlockMeta());
} }
if (other.hasEffectiveUser()) {
bitField0_ |= 0x00000020;
effectiveUser_ = other.effectiveUser_;
onChanged();
}
this.mergeUnknownFields(other.getUnknownFields()); this.mergeUnknownFields(other.getUnknownFields());
return this; return this;
} }
@ -4896,104 +4781,6 @@ public final class RPCProtos {
return cellBlockMetaBuilder_; return cellBlockMetaBuilder_;
} }
// optional string effective_user = 6;
private java.lang.Object effectiveUser_ = "";
/**
* <code>optional string effective_user = 6;</code>
*
* <pre>
* If present, the request is made on behalf of this user
* </pre>
*/
public boolean hasEffectiveUser() {
return ((bitField0_ & 0x00000020) == 0x00000020);
}
/**
* <code>optional string effective_user = 6;</code>
*
* <pre>
* If present, the request is made on behalf of this user
* </pre>
*/
public java.lang.String getEffectiveUser() {
java.lang.Object ref = effectiveUser_;
if (!(ref instanceof java.lang.String)) {
java.lang.String s = ((com.google.protobuf.ByteString) ref)
.toStringUtf8();
effectiveUser_ = s;
return s;
} else {
return (java.lang.String) ref;
}
}
/**
* <code>optional string effective_user = 6;</code>
*
* <pre>
* If present, the request is made on behalf of this user
* </pre>
*/
public com.google.protobuf.ByteString
getEffectiveUserBytes() {
java.lang.Object ref = effectiveUser_;
if (ref instanceof String) {
com.google.protobuf.ByteString b =
com.google.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref);
effectiveUser_ = b;
return b;
} else {
return (com.google.protobuf.ByteString) ref;
}
}
/**
* <code>optional string effective_user = 6;</code>
*
* <pre>
* If present, the request is made on behalf of this user
* </pre>
*/
public Builder setEffectiveUser(
java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000020;
effectiveUser_ = value;
onChanged();
return this;
}
/**
* <code>optional string effective_user = 6;</code>
*
* <pre>
* If present, the request is made on behalf of this user
* </pre>
*/
public Builder clearEffectiveUser() {
bitField0_ = (bitField0_ & ~0x00000020);
effectiveUser_ = getDefaultInstance().getEffectiveUser();
onChanged();
return this;
}
/**
* <code>optional string effective_user = 6;</code>
*
* <pre>
* If present, the request is made on behalf of this user
* </pre>
*/
public Builder setEffectiveUserBytes(
com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000020;
effectiveUser_ = value;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:RequestHeader) // @@protoc_insertion_point(builder_scope:RequestHeader)
} }
@ -6010,16 +5797,15 @@ public final class RPCProtos {
"\001(\r\"|\n\021ExceptionResponse\022\034\n\024exception_cl" + "\001(\r\"|\n\021ExceptionResponse\022\034\n\024exception_cl" +
"ass_name\030\001 \001(\t\022\023\n\013stack_trace\030\002 \001(\t\022\020\n\010h" + "ass_name\030\001 \001(\t\022\023\n\013stack_trace\030\002 \001(\t\022\020\n\010h" +
"ostname\030\003 \001(\t\022\014\n\004port\030\004 \001(\005\022\024\n\014do_not_re", "ostname\030\003 \001(\t\022\014\n\004port\030\004 \001(\005\022\024\n\014do_not_re",
"try\030\005 \001(\010\"\254\001\n\rRequestHeader\022\017\n\007call_id\030\001" + "try\030\005 \001(\010\"\224\001\n\rRequestHeader\022\017\n\007call_id\030\001" +
" \001(\r\022\035\n\ntrace_info\030\002 \001(\0132\t.RPCTInfo\022\023\n\013m" + " \001(\r\022\035\n\ntrace_info\030\002 \001(\0132\t.RPCTInfo\022\023\n\013m" +
"ethod_name\030\003 \001(\t\022\025\n\rrequest_param\030\004 \001(\010\022" + "ethod_name\030\003 \001(\t\022\025\n\rrequest_param\030\004 \001(\010\022" +
"\'\n\017cell_block_meta\030\005 \001(\0132\016.CellBlockMeta" + "\'\n\017cell_block_meta\030\005 \001(\0132\016.CellBlockMeta" +
"\022\026\n\016effective_user\030\006 \001(\t\"q\n\016ResponseHead" + "\"q\n\016ResponseHeader\022\017\n\007call_id\030\001 \001(\r\022%\n\te" +
"er\022\017\n\007call_id\030\001 \001(\r\022%\n\texception\030\002 \001(\0132\022" + "xception\030\002 \001(\0132\022.ExceptionResponse\022\'\n\017ce" +
".ExceptionResponse\022\'\n\017cell_block_meta\030\003 " + "ll_block_meta\030\003 \001(\0132\016.CellBlockMetaB<\n*o" +
"\001(\0132\016.CellBlockMetaB<\n*org.apache.hadoop" + "rg.apache.hadoop.hbase.protobuf.generate" +
".hbase.protobuf.generatedB\tRPCProtosH\001\240\001" + "dB\tRPCProtosH\001\240\001\001"
"\001"
}; };
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -6055,7 +5841,7 @@ public final class RPCProtos {
internal_static_RequestHeader_fieldAccessorTable = new internal_static_RequestHeader_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable( com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RequestHeader_descriptor, internal_static_RequestHeader_descriptor,
new java.lang.String[] { "CallId", "TraceInfo", "MethodName", "RequestParam", "CellBlockMeta", "EffectiveUser", }); new java.lang.String[] { "CallId", "TraceInfo", "MethodName", "RequestParam", "CellBlockMeta", });
internal_static_ResponseHeader_descriptor = internal_static_ResponseHeader_descriptor =
getDescriptor().getMessageTypes().get(5); getDescriptor().getMessageTypes().get(5);
internal_static_ResponseHeader_fieldAccessorTable = new internal_static_ResponseHeader_fieldAccessorTable = new

View File

@ -646,7 +646,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -657,7 +657,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -667,7 +667,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -677,7 +677,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -688,7 +688,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -1020,7 +1020,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -1032,7 +1032,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -1045,7 +1045,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -1057,7 +1057,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -1069,7 +1069,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -2284,7 +2284,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -2300,7 +2300,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -2316,7 +2316,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -2332,7 +2332,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -2355,7 +2355,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -2375,7 +2375,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -2397,7 +2397,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -2420,7 +2420,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -2440,7 +2440,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -2460,7 +2460,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -2480,7 +2480,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -2499,7 +2499,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -2518,7 +2518,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -2531,7 +2531,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -2547,7 +2547,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -2564,7 +2564,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -2577,7 +2577,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */
@ -2591,7 +2591,7 @@ public final class WALProtos {
* *
* <pre> * <pre>
* *
*cluster_ids field contains the list of clusters that have *This field contains the list of clusters that have
*consumed the change *consumed the change
* </pre> * </pre>
*/ */

View File

@ -119,8 +119,6 @@ message RequestHeader {
optional bool request_param = 4; optional bool request_param = 4;
// If present, then an encoded data block follows. // If present, then an encoded data block follows.
optional CellBlockMeta cell_block_meta = 5; optional CellBlockMeta cell_block_meta = 5;
// If present, the request is made on behalf of this user
optional string effective_user = 6;
// TODO: Have client specify priority // TODO: Have client specify priority
} }

View File

@ -259,11 +259,10 @@ public class RpcServer implements RpcServerInterface {
protected long size; // size of current call protected long size; // size of current call
protected boolean isError; protected boolean isError;
protected TraceInfo tinfo; protected TraceInfo tinfo;
protected String effectiveUser;
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, Connection connection, Responder responder, Message param, CellScanner cellScanner, Connection connection, Responder responder,
long size, TraceInfo tinfo, String effectiveUser) { long size, TraceInfo tinfo) {
this.id = id; this.id = id;
this.service = service; this.service = service;
this.md = md; this.md = md;
@ -278,7 +277,6 @@ public class RpcServer implements RpcServerInterface {
this.isError = false; this.isError = false;
this.size = size; this.size = size;
this.tinfo = tinfo; this.tinfo = tinfo;
this.effectiveUser = effectiveUser;
} }
@Override @Override
@ -1127,13 +1125,13 @@ public class RpcServer implements RpcServerInterface {
private static final int AUTHROIZATION_FAILED_CALLID = -1; private static final int AUTHROIZATION_FAILED_CALLID = -1;
private final Call authFailedCall = private final Call authFailedCall =
new Call(AUTHROIZATION_FAILED_CALLID, this.service, null, new Call(AUTHROIZATION_FAILED_CALLID, this.service, null,
null, null, null, this, null, 0, null, null); null, null, null, this, null, 0, null);
private ByteArrayOutputStream authFailedResponse = private ByteArrayOutputStream authFailedResponse =
new ByteArrayOutputStream(); new ByteArrayOutputStream();
// Fake 'call' for SASL context setup // Fake 'call' for SASL context setup
private static final int SASL_CALLID = -33; private static final int SASL_CALLID = -33;
private final Call saslCall = private final Call saslCall =
new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null, null); new Call(SASL_CALLID, this.service, null, null, null, null, this, null, 0, null);
public UserGroupInformation attemptingUser = null; // user name before auth public UserGroupInformation attemptingUser = null; // user name before auth
@ -1488,7 +1486,7 @@ public class RpcServer implements RpcServerInterface {
private int doBadPreambleHandling(final String msg, final Exception e) throws IOException { private int doBadPreambleHandling(final String msg, final Exception e) throws IOException {
LOG.warn(msg); LOG.warn(msg);
Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null, null); Call fakeCall = new Call(-1, null, null, null, null, null, this, responder, -1, null);
setupResponse(null, fakeCall, e, msg); setupResponse(null, fakeCall, e, msg);
responder.doRespond(fakeCall); responder.doRespond(fakeCall);
// Returning -1 closes out the connection. // Returning -1 closes out the connection.
@ -1640,7 +1638,7 @@ public class RpcServer implements RpcServerInterface {
if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) { if ((totalRequestSize + callQueueSize.get()) > maxQueueSize) {
final Call callTooBig = final Call callTooBig =
new Call(id, this.service, null, null, null, null, this, new Call(id, this.service, null, null, null, null, this,
responder, totalRequestSize, null, null); responder, totalRequestSize, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(), setupResponse(responseBuffer, callTooBig, new CallQueueTooBigException(),
"Call queue is full, is ipc.server.max.callqueue.size too small?"); "Call queue is full, is ipc.server.max.callqueue.size too small?");
@ -1650,7 +1648,6 @@ public class RpcServer implements RpcServerInterface {
MethodDescriptor md = null; MethodDescriptor md = null;
Message param = null; Message param = null;
CellScanner cellScanner = null; CellScanner cellScanner = null;
String effectiveUser = null;
try { try {
if (header.hasRequestParam() && header.getRequestParam()) { if (header.hasRequestParam() && header.getRequestParam()) {
md = this.service.getDescriptorForType().findMethodByName(header.getMethodName()); md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
@ -1669,15 +1666,12 @@ public class RpcServer implements RpcServerInterface {
cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec, cellScanner = ipcUtil.createCellScanner(this.codec, this.compressionCodec,
buf, offset, buf.length); buf, offset, buf.length);
} }
if (header.hasEffectiveUser()) {
effectiveUser = header.getEffectiveUser();
}
} catch (Throwable t) { } catch (Throwable t) {
String msg = "Unable to read call parameter from client " + getHostAddress(); String msg = "Unable to read call parameter from client " + getHostAddress();
LOG.warn(msg, t); LOG.warn(msg, t);
final Call readParamsFailedCall = final Call readParamsFailedCall =
new Call(id, this.service, null, null, null, null, this, new Call(id, this.service, null, null, null, null, this,
responder, totalRequestSize, null, null); responder, totalRequestSize, null);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream(); ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();
setupResponse(responseBuffer, readParamsFailedCall, t, setupResponse(responseBuffer, readParamsFailedCall, t,
msg + "; " + t.getMessage()); msg + "; " + t.getMessage());
@ -1690,8 +1684,7 @@ public class RpcServer implements RpcServerInterface {
: null; : null;
Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder, Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
totalRequestSize, totalRequestSize,
traceInfo, traceInfo);
effectiveUser);
callQueueSize.add(totalRequestSize); callQueueSize.add(totalRequestSize);
scheduler.dispatch(new CallRunner(call)); scheduler.dispatch(new CallRunner(call));
} }
@ -1801,21 +1794,8 @@ public class RpcServer implements RpcServerInterface {
if (call.tinfo != null) { if (call.tinfo != null) {
traceScope = Trace.startSpan(call.toTraceString(), call.tinfo); traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
} }
User user; RequestContext.set(User.create(call.connection.user), getRemoteIp(),
if (call.effectiveUser == null) { call.connection.service);
user = User.create(call.connection.user);
} else {
UserGroupInformation ugi = UserGroupInformation.createProxyUser(
call.effectiveUser, call.connection.user);
ProxyUsers.authorize(ugi, call.connection.getHostAddress(), conf);
if (LOG.isDebugEnabled()) {
LOG.debug("Authorized " + call.connection.user
+ " to impersonate " + call.effectiveUser);
}
user = User.create(ugi);
}
RequestContext.set(user, getRemoteIp(), call.connection.service);
// make the call // make the call
resultPair = call(call.service, call.md, call.param, call.cellScanner, call.timestamp, resultPair = call(call.service, call.md, call.param, call.cellScanner, call.timestamp,
status); status);

View File

@ -1,5 +1,4 @@
/* /**
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -41,7 +40,6 @@ import org.apache.hadoop.hbase.util.InfoServer;
import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.DNS;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.mortbay.jetty.Connector; import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server; import org.mortbay.jetty.Server;
@ -86,7 +84,6 @@ public class RESTServer implements Constants {
VersionInfo.logVersion(); VersionInfo.logVersion();
FilterHolder authFilter = null; FilterHolder authFilter = null;
UserGroupInformation realUser = null;
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
Class<? extends ServletContainer> containerClass = ServletContainer.class; Class<? extends ServletContainer> containerClass = ServletContainer.class;
@ -102,7 +99,6 @@ public class RESTServer implements Constants {
Preconditions.checkArgument(principalConfig != null && !principalConfig.isEmpty(), Preconditions.checkArgument(principalConfig != null && !principalConfig.isEmpty(),
REST_KERBEROS_PRINCIPAL + " should be set if security is enabled"); REST_KERBEROS_PRINCIPAL + " should be set if security is enabled");
User.login(conf, REST_KEYTAB_FILE, REST_KERBEROS_PRINCIPAL, machineName); User.login(conf, REST_KEYTAB_FILE, REST_KERBEROS_PRINCIPAL, machineName);
realUser = User.getCurrent().getUGI();
if (conf.get(REST_AUTHENTICATION_TYPE) != null) { if (conf.get(REST_AUTHENTICATION_TYPE) != null) {
containerClass = RESTServletContainer.class; containerClass = RESTServletContainer.class;
authFilter = new FilterHolder(); authFilter = new FilterHolder();
@ -111,6 +107,7 @@ public class RESTServer implements Constants {
} }
} }
UserGroupInformation realUser = User.getCurrent().getUGI();
RESTServlet servlet = RESTServlet.getInstance(conf, realUser); RESTServlet servlet = RESTServlet.getInstance(conf, realUser);
Options options = new Options(); Options options = new Options();

View File

@ -19,52 +19,139 @@
package org.apache.hadoop.hbase.rest; package org.apache.hadoop.hbase.rest;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedAction; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HConnectionWrapper;
import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;
/** /**
* Singleton class encapsulating global REST servlet state and functions. * Singleton class encapsulating global REST servlet state and functions.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class RESTServlet implements Constants { public class RESTServlet implements Constants {
private static Logger LOG = Logger.getLogger(RESTServlet.class);
private static RESTServlet INSTANCE; private static RESTServlet INSTANCE;
private final Configuration conf; private final Configuration conf;
private final HTablePool pool;
private final MetricsREST metrics = new MetricsREST(); private final MetricsREST metrics = new MetricsREST();
private final HBaseAdmin admin; private final Map<String, ConnectionInfo>
private final UserGroupInformation ugi; connections = new ConcurrentHashMap<String, ConnectionInfo>();
private final KeyLocker<String> locker = new KeyLocker<String>();
private final UserGroupInformation realUser;
static final String CLEANUP_INTERVAL = "hbase.rest.connection.cleanup-interval";
static final String MAX_IDLETIME = "hbase.rest.connection.max-idletime";
static final String NULL_USERNAME = "--NULL--";
private final ThreadLocal<String> effectiveUser = new ThreadLocal<String>() {
protected String initialValue() {
return NULL_USERNAME;
}
};
// A chore to clean up idle connections.
private final Chore connectionCleaner;
private final Stoppable stoppable;
class ConnectionInfo {
final HConnection connection;
final String userName;
volatile HBaseAdmin admin;
private long lastAccessTime;
private boolean closed;
ConnectionInfo(HConnection conn, String user) {
lastAccessTime = EnvironmentEdgeManager.currentTimeMillis();
connection = conn;
closed = false;
userName = user;
}
synchronized boolean updateAccessTime() {
if (closed) {
return false;
}
if (connection.isAborted() || connection.isClosed()) {
LOG.info("Unexpected: cached HConnection is aborted/closed, removed from cache");
connections.remove(userName);
return false;
}
lastAccessTime = EnvironmentEdgeManager.currentTimeMillis();
return true;
}
synchronized boolean timedOut(int maxIdleTime) {
long timeoutTime = lastAccessTime + maxIdleTime;
if (EnvironmentEdgeManager.currentTimeMillis() > timeoutTime) {
connections.remove(userName);
closed = true;
}
return false;
}
}
class ConnectionCleaner extends Chore {
private final int maxIdleTime;
public ConnectionCleaner(int cleanInterval, int maxIdleTime) {
super("REST-ConnectionCleaner", cleanInterval, stoppable);
this.maxIdleTime = maxIdleTime;
}
@Override
protected void chore() {
for (Map.Entry<String, ConnectionInfo> entry: connections.entrySet()) {
ConnectionInfo connInfo = entry.getValue();
if (connInfo.timedOut(maxIdleTime)) {
if (connInfo.admin != null) {
try {
connInfo.admin.close();
} catch (Throwable t) {
LOG.info("Got exception in closing idle admin", t);
}
}
try {
connInfo.connection.close();
} catch (Throwable t) {
LOG.info("Got exception in closing idle connection", t);
}
}
}
}
}
/** /**
* @return the RESTServlet singleton instance * @return the RESTServlet singleton instance
* @throws IOException
*/ */
public synchronized static RESTServlet getInstance() throws IOException { public synchronized static RESTServlet getInstance() {
assert(INSTANCE != null); assert(INSTANCE != null);
return INSTANCE; return INSTANCE;
} }
/** /**
* @param conf Existing configuration to use in rest servlet * @param conf Existing configuration to use in rest servlet
* @param realUser the login user
* @return the RESTServlet singleton instance * @return the RESTServlet singleton instance
* @throws IOException
*/ */
public synchronized static RESTServlet getInstance(Configuration conf)
throws IOException {
return getInstance(conf, null);
}
public synchronized static RESTServlet getInstance(Configuration conf, public synchronized static RESTServlet getInstance(Configuration conf,
UserGroupInformation ugi) throws IOException { UserGroupInformation realUser) {
if (INSTANCE == null) { if (INSTANCE == null) {
INSTANCE = new RESTServlet(conf, ugi); INSTANCE = new RESTServlet(conf, realUser);
} }
return INSTANCE; return INSTANCE;
} }
@ -76,54 +163,50 @@ public class RESTServlet implements Constants {
/** /**
* Constructor with existing configuration * Constructor with existing configuration
* @param conf existing configuration * @param conf existing configuration
* @throws IOException * @param realUser the login user
*/ */
RESTServlet(final Configuration conf, RESTServlet(final Configuration conf,
final UserGroupInformation ugi) throws IOException { final UserGroupInformation realUser) {
this.conf = conf; stoppable = new Stoppable() {
this.ugi = ugi; private volatile boolean isStopped = false;
int maxSize = conf.getInt("hbase.rest.htablepool.size", 10); @Override public void stop(String why) { isStopped = true;}
if (ugi == null) { @Override public boolean isStopped() {return isStopped;}
pool = new HTablePool(conf, maxSize);
admin = new HBaseAdmin(conf);
} else {
admin = new HBaseAdmin(new HConnectionWrapper(ugi,
HConnectionManager.getConnection(new Configuration(conf))));
pool = new HTablePool(conf, maxSize) {
/**
* A HTablePool adapter. It makes sure the real user is
* always used in creating any table so that the HConnection
* is not any proxy user in case impersonation with
* RESTServletContainer.
*/
@Override
protected HTableInterface createHTable(final String tableName) {
return ugi.doAs(new PrivilegedAction<HTableInterface>() {
@Override
public HTableInterface run() {
return callCreateHTable(tableName);
}
});
}
/**
* A helper method used to call super.createHTable.
*/
HTableInterface callCreateHTable(final String tableName) {
return super.createHTable(tableName);
}
}; };
}
int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
connectionCleaner = new ConnectionCleaner(cleanInterval, maxIdleTime);
Threads.setDaemonThreadRunning(connectionCleaner.getThread());
this.realUser = realUser;
this.conf = conf;
} }
HBaseAdmin getAdmin() { /**
return admin; * Caller doesn't close the admin afterwards.
* We need to manage it and close it properly.
*/
HBaseAdmin getAdmin() throws IOException {
ConnectionInfo connInfo = getCurrentConnection();
if (connInfo.admin == null) {
Lock lock = locker.acquireLock(effectiveUser.get());
try {
if (connInfo.admin == null) {
connInfo.admin = new HBaseAdmin(connInfo.connection);
}
} finally {
lock.unlock();
}
}
return connInfo.admin;
} }
HTablePool getTablePool() { /**
return pool; * Caller closes the table afterwards.
*/
HTableInterface getTable(String tableName) throws IOException {
ConnectionInfo connInfo = getCurrentConnection();
return connInfo.connection.getTable(tableName);
} }
Configuration getConfiguration() { Configuration getConfiguration() {
@ -143,7 +226,31 @@ public class RESTServlet implements Constants {
return getConfiguration().getBoolean("hbase.rest.readonly", false); return getConfiguration().getBoolean("hbase.rest.readonly", false);
} }
UserGroupInformation getUser() { void setEffectiveUser(String effectiveUser) {
return ugi; this.effectiveUser.set(effectiveUser);
}
private ConnectionInfo getCurrentConnection() throws IOException {
String userName = effectiveUser.get();
ConnectionInfo connInfo = connections.get(userName);
if (connInfo == null || !connInfo.updateAccessTime()) {
Lock lock = locker.acquireLock(userName);
try {
connInfo = connections.get(userName);
if (connInfo == null) {
UserGroupInformation ugi = realUser;
if (!userName.equals(NULL_USERNAME)) {
ugi = UserGroupInformation.createProxyUser(userName, realUser);
}
User user = User.create(ugi);
HConnection conn = HConnectionManager.createConnection(conf, user);
connInfo = new ConnectionInfo(conn, userName);
connections.put(userName, connInfo);
}
} finally {
lock.unlock();
}
}
return connInfo;
} }
} }

View File

@ -19,14 +19,12 @@
package org.apache.hadoop.hbase.rest; package org.apache.hadoop.hbase.rest;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.security.UserGroupInformation;
import com.sun.jersey.spi.container.servlet.ServletContainer; import com.sun.jersey.spi.container.servlet.ServletContainer;
@ -46,28 +44,7 @@ public class RESTServletContainer extends ServletContainer {
@Override @Override
public void service(final HttpServletRequest request, public void service(final HttpServletRequest request,
final HttpServletResponse response) throws ServletException, IOException { final HttpServletResponse response) throws ServletException, IOException {
UserGroupInformation effectiveUser = UserGroupInformation.createProxyUser( RESTServlet.getInstance().setEffectiveUser(request.getRemoteUser());
request.getRemoteUser(), RESTServlet.getInstance().getUser());
try {
effectiveUser.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws ServletException, IOException {
callService(request, response);
return null;
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
}
}
/**
* A helper method used to invoke super.service()
* on behalf of an effective user with doAs.
*/
void callService(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
super.service(request, response); super.service(request, response);
} }
} }

View File

@ -32,8 +32,8 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.Context; import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.UriInfo;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.rest.model.CellModel; import org.apache.hadoop.hbase.rest.model.CellModel;
import org.apache.hadoop.hbase.rest.model.CellSetModel; import org.apache.hadoop.hbase.rest.model.CellSetModel;
@ -187,7 +186,6 @@ public class RowResource extends ResourceBase {
.build(); .build();
} }
HTablePool pool = servlet.getTablePool();
HTableInterface table = null; HTableInterface table = null;
try { try {
List<RowModel> rows = model.getRows(); List<RowModel> rows = model.getRows();
@ -228,7 +226,7 @@ public class RowResource extends ResourceBase {
LOG.debug("PUT " + put.toString()); LOG.debug("PUT " + put.toString());
} }
} }
table = pool.getTable(tableResource.getName()); table = servlet.getTable(tableResource.getName());
table.put(puts); table.put(puts);
table.flushCommits(); table.flushCommits();
ResponseBuilder response = Response.ok(); ResponseBuilder response = Response.ok();
@ -257,7 +255,6 @@ public class RowResource extends ResourceBase {
.type(MIMETYPE_TEXT).entity("Forbidden" + CRLF) .type(MIMETYPE_TEXT).entity("Forbidden" + CRLF)
.build(); .build();
} }
HTablePool pool = servlet.getTablePool();
HTableInterface table = null; HTableInterface table = null;
try { try {
byte[] row = rowspec.getRow(); byte[] row = rowspec.getRow();
@ -291,7 +288,7 @@ public class RowResource extends ResourceBase {
} else { } else {
put.add(parts[0], null, timestamp, message); put.add(parts[0], null, timestamp, message);
} }
table = pool.getTable(tableResource.getName()); table = servlet.getTable(tableResource.getName());
table.put(put); table.put(put);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("PUT " + put.toString()); LOG.debug("PUT " + put.toString());
@ -387,10 +384,9 @@ public class RowResource extends ResourceBase {
} }
} }
} }
HTablePool pool = servlet.getTablePool();
HTableInterface table = null; HTableInterface table = null;
try { try {
table = pool.getTable(tableResource.getName()); table = servlet.getTable(tableResource.getName());
table.delete(delete); table.delete(delete);
servlet.getMetrics().incrementSucessfulDeleteRequests(1); servlet.getMetrics().incrementSucessfulDeleteRequests(1);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -417,7 +413,6 @@ public class RowResource extends ResourceBase {
* @return Response 200 OK, 304 Not modified, 400 Bad request * @return Response 200 OK, 304 Not modified, 400 Bad request
*/ */
Response checkAndPut(final CellSetModel model) { Response checkAndPut(final CellSetModel model) {
HTablePool pool = servlet.getTablePool();
HTableInterface table = null; HTableInterface table = null;
try { try {
if (model.getRows().size() != 1) { if (model.getRows().size() != 1) {
@ -467,7 +462,7 @@ public class RowResource extends ResourceBase {
.build(); .build();
} }
table = pool.getTable(this.tableResource.getName()); table = servlet.getTable(this.tableResource.getName());
boolean retValue = table.checkAndPut(key, valueToPutParts[0], boolean retValue = table.checkAndPut(key, valueToPutParts[0],
valueToPutParts[1], valueToCheckCell.getValue(), put); valueToPutParts[1], valueToCheckCell.getValue(), put);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -498,7 +493,6 @@ public class RowResource extends ResourceBase {
* @return Response 200 OK, 304 Not modified, 400 Bad request * @return Response 200 OK, 304 Not modified, 400 Bad request
*/ */
Response checkAndDelete(final CellSetModel model) { Response checkAndDelete(final CellSetModel model) {
HTablePool pool = servlet.getTablePool();
HTableInterface table = null; HTableInterface table = null;
Delete delete = null; Delete delete = null;
try { try {
@ -539,7 +533,7 @@ public class RowResource extends ResourceBase {
.build(); .build();
} }
table = pool.getTable(tableResource.getName()); table = servlet.getTable(tableResource.getName());
boolean retValue = table.checkAndDelete(key, parts[0], parts[1], boolean retValue = table.checkAndDelete(key, parts[0], parts[1],
valueToDeleteCell.getValue(), delete); valueToDeleteCell.getValue(), delete);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {

View File

@ -25,16 +25,14 @@ import java.util.NoSuchElementException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.util.StringUtils;
@InterfaceAudience.Private @InterfaceAudience.Private
public class RowResultGenerator extends ResultGenerator { public class RowResultGenerator extends ResultGenerator {
@ -45,8 +43,7 @@ public class RowResultGenerator extends ResultGenerator {
public RowResultGenerator(final String tableName, final RowSpec rowspec, public RowResultGenerator(final String tableName, final RowSpec rowspec,
final Filter filter) throws IllegalArgumentException, IOException { final Filter filter) throws IllegalArgumentException, IOException {
HTablePool pool = RESTServlet.getInstance().getTablePool(); HTableInterface table = RESTServlet.getInstance().getTable(tableName);
HTableInterface table = pool.getTable(tableName);
try { try {
Get get = new Get(rowspec.getRow()); Get get = new Get(rowspec.getRow());
if (rowspec.hasColumns()) { if (rowspec.hasColumns()) {

View File

@ -101,18 +101,17 @@ public class ScannerResource extends ResourceBase {
URI uri = builder.path(id).build(); URI uri = builder.path(id).build();
servlet.getMetrics().incrementSucessfulPutRequests(1); servlet.getMetrics().incrementSucessfulPutRequests(1);
return Response.created(uri).build(); return Response.created(uri).build();
} catch (RuntimeException e) { } catch (Exception e) {
servlet.getMetrics().incrementFailedPutRequests(1); servlet.getMetrics().incrementFailedPutRequests(1);
if (e.getCause() instanceof TableNotFoundException) { if (e instanceof TableNotFoundException) {
return Response.status(Response.Status.NOT_FOUND) return Response.status(Response.Status.NOT_FOUND)
.type(MIMETYPE_TEXT).entity("Not found" + CRLF) .type(MIMETYPE_TEXT).entity("Not found" + CRLF)
.build(); .build();
} } else if (e instanceof RuntimeException) {
return Response.status(Response.Status.BAD_REQUEST) return Response.status(Response.Status.BAD_REQUEST)
.type(MIMETYPE_TEXT).entity("Bad request" + CRLF) .type(MIMETYPE_TEXT).entity("Bad request" + CRLF)
.build(); .build();
} catch (Exception e) { }
servlet.getMetrics().incrementFailedPutRequests(1);
return Response.status(Response.Status.SERVICE_UNAVAILABLE) return Response.status(Response.Status.SERVICE_UNAVAILABLE)
.type(MIMETYPE_TEXT).entity("Unavailable" + CRLF) .type(MIMETYPE_TEXT).entity("Unavailable" + CRLF)
.build(); .build();

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
@ -64,8 +63,7 @@ public class ScannerResultGenerator extends ResultGenerator {
public ScannerResultGenerator(final String tableName, final RowSpec rowspec, public ScannerResultGenerator(final String tableName, final RowSpec rowspec,
final Filter filter, final int caching) throws IllegalArgumentException, IOException { final Filter filter, final int caching) throws IllegalArgumentException, IOException {
HTablePool pool = RESTServlet.getInstance().getTablePool(); HTableInterface table = RESTServlet.getInstance().getTable(tableName);
HTableInterface table = pool.getTable(tableName);
try { try {
Scan scan; Scan scan;
if (rowspec.hasEndRow()) { if (rowspec.hasEndRow()) {

View File

@ -31,23 +31,20 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.CacheControl; import javax.ws.rs.core.CacheControl;
import javax.ws.rs.core.Context; import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.UriInfo;
import javax.xml.namespace.QName; import javax.xml.namespace.QName;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.rest.model.ColumnSchemaModel; import org.apache.hadoop.hbase.rest.model.ColumnSchemaModel;
import org.apache.hadoop.hbase.rest.model.TableSchemaModel; import org.apache.hadoop.hbase.rest.model.TableSchemaModel;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -77,8 +74,7 @@ public class SchemaResource extends ResourceBase {
private HTableDescriptor getTableSchema() throws IOException, private HTableDescriptor getTableSchema() throws IOException,
TableNotFoundException { TableNotFoundException {
HTablePool pool = servlet.getTablePool(); HTableInterface table = servlet.getTable(tableResource.getName());
HTableInterface table = pool.getTable(tableResource.getName());
try { try {
return table.getTableDescriptor(); return table.getTableDescriptor();
} finally { } finally {

View File

@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.rest.filter.GzipFilter; import org.apache.hadoop.hbase.rest.filter.GzipFilter;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.mortbay.jetty.Server; import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.Context;
@ -47,7 +48,7 @@ public class HBaseRESTTestingUtility {
} }
// Inject the conf for the test by being first to make singleton // Inject the conf for the test by being first to make singleton
RESTServlet.getInstance(conf); RESTServlet.getInstance(conf, User.getCurrent().getUGI());
// set up the Jersey servlet container for Jetty // set up the Jersey servlet container for Jetty
ServletHolder sh = new ServletHolder(ServletContainer.class); ServletHolder sh = new ServletHolder(ServletContainer.class);

View File

@ -19,6 +19,9 @@
package org.apache.hadoop.hbase.rest; package org.apache.hadoop.hbase.rest;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.StringWriter; import java.io.StringWriter;
@ -31,7 +34,13 @@ import javax.xml.bind.Unmarshaller;
import org.apache.commons.httpclient.Header; import org.apache.commons.httpclient.Header;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.rest.client.Client; import org.apache.hadoop.hbase.rest.client.Client;
import org.apache.hadoop.hbase.rest.client.Cluster; import org.apache.hadoop.hbase.rest.client.Cluster;
@ -39,11 +48,10 @@ import org.apache.hadoop.hbase.rest.client.Response;
import org.apache.hadoop.hbase.rest.model.CellModel; import org.apache.hadoop.hbase.rest.model.CellModel;
import org.apache.hadoop.hbase.rest.model.CellSetModel; import org.apache.hadoop.hbase.rest.model.CellSetModel;
import org.apache.hadoop.hbase.rest.model.RowModel; import org.apache.hadoop.hbase.rest.model.RowModel;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.test.MetricsAssertHelper; import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import static org.junit.Assert.*;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -546,21 +554,18 @@ public class TestRowResource {
response = deleteRow(TABLE, ROW_4); response = deleteRow(TABLE, ROW_4);
assertEquals(response.getCode(), 200); assertEquals(response.getCode(), 200);
METRICS_ASSERT.assertCounterGt("requests", UserGroupInformation ugi = User.getCurrent().getUGI();
2l, METRICS_ASSERT.assertCounterGt("requests", 2l,
RESTServlet.getInstance(conf).getMetrics().getSource()); RESTServlet.getInstance(conf, ugi).getMetrics().getSource());
METRICS_ASSERT.assertCounterGt("successfulGet", METRICS_ASSERT.assertCounterGt("successfulGet", 0l,
0l, RESTServlet.getInstance(conf, ugi).getMetrics().getSource());
RESTServlet.getInstance(conf).getMetrics().getSource());
METRICS_ASSERT.assertCounterGt("successfulPut", METRICS_ASSERT.assertCounterGt("successfulPut", 0l,
0l, RESTServlet.getInstance(conf, ugi).getMetrics().getSource());
RESTServlet.getInstance(conf).getMetrics().getSource());
METRICS_ASSERT.assertCounterGt("successfulDelete", METRICS_ASSERT.assertCounterGt("successfulDelete", 0l,
0l, RESTServlet.getInstance(conf, ugi).getMetrics().getSource());
RESTServlet.getInstance(conf).getMetrics().getSource());
} }
@Test @Test

View File

@ -1933,7 +1933,7 @@ public class TestAccessController extends SecureTestUtil {
} finally { } finally {
acl.close(); acl.close();
} }
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); final HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
HTableDescriptor htd = new HTableDescriptor(TEST_TABLE2); HTableDescriptor htd = new HTableDescriptor(TEST_TABLE2);
htd.addFamily(new HColumnDescriptor(TEST_FAMILY)); htd.addFamily(new HColumnDescriptor(TEST_FAMILY));
admin.createTable(htd); admin.createTable(htd);
@ -1944,7 +1944,7 @@ public class TestAccessController extends SecureTestUtil {
final HRegionServer newRs = newRsThread.getRegionServer(); final HRegionServer newRs = newRsThread.getRegionServer();
// Move region to the new RegionServer. // Move region to the new RegionServer.
HTable table = new HTable(TEST_UTIL.getConfiguration(), TEST_TABLE2); final HTable table = new HTable(TEST_UTIL.getConfiguration(), TEST_TABLE2);
try { try {
NavigableMap<HRegionInfo, ServerName> regions = table NavigableMap<HRegionInfo, ServerName> regions = table
.getRegionLocations(); .getRegionLocations();
@ -1953,7 +1953,6 @@ public class TestAccessController extends SecureTestUtil {
PrivilegedExceptionAction moveAction = new PrivilegedExceptionAction() { PrivilegedExceptionAction moveAction = new PrivilegedExceptionAction() {
public Object run() throws Exception { public Object run() throws Exception {
HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
admin.move(firstRegion.getKey().getEncodedNameAsBytes(), admin.move(firstRegion.getKey().getEncodedNameAsBytes(),
Bytes.toBytes(newRs.getServerName().getServerName())); Bytes.toBytes(newRs.getServerName().getServerName()));
return null; return null;
@ -1979,7 +1978,6 @@ public class TestAccessController extends SecureTestUtil {
// permissions. // permissions.
PrivilegedExceptionAction putAction = new PrivilegedExceptionAction() { PrivilegedExceptionAction putAction = new PrivilegedExceptionAction() {
public Object run() throws Exception { public Object run() throws Exception {
HTable table = new HTable(TEST_UTIL.getConfiguration(), TEST_TABLE2);
Put put = new Put(Bytes.toBytes("test")); Put put = new Put(Bytes.toBytes("test"));
put.add(TEST_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value")); put.add(TEST_FAMILY, Bytes.toBytes("qual"), Bytes.toBytes("value"));
table.put(put); table.put(put);