HBASE-25934 Add username for RegionScannerHolder (#3325)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
f09924b650
commit
8d1473d321
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import com.google.errorprone.annotations.RestrictedApi;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
|
@ -457,10 +458,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
private boolean needCursor;
|
||||
private boolean fullRegionScan;
|
||||
private final String clientIPAndPort;
|
||||
private final String userName;
|
||||
|
||||
RegionScannerHolder(RegionScanner s, HRegion r,
|
||||
RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor,
|
||||
boolean fullRegionScan, String clientIPAndPort) {
|
||||
boolean fullRegionScan, String clientIPAndPort, String userName) {
|
||||
this.s = s;
|
||||
this.r = r;
|
||||
this.closeCallBack = closeCallBack;
|
||||
|
@ -468,6 +470,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
this.needCursor = needCursor;
|
||||
this.fullRegionScan = fullRegionScan;
|
||||
this.clientIPAndPort = clientIPAndPort;
|
||||
this.userName = userName;
|
||||
}
|
||||
|
||||
long getNextCallSeq() {
|
||||
|
@ -483,7 +486,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
// cache the String once made.
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.clientIPAndPort + ", " + this.r.getRegionInfo().getRegionNameAsString();
|
||||
return "clientIPAndPort=" + this.clientIPAndPort +
|
||||
", userName=" + this.userName +
|
||||
", regionInfo=" + this.r.getRegionInfo().getRegionNameAsString();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -504,8 +509,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
LOG.warn("Scanner lease {} expired but no outstanding scanner", this.scannerName);
|
||||
return;
|
||||
}
|
||||
LOG.info("Scanner lease {} expired {}, user={}", this.scannerName, rsh,
|
||||
RpcServer.getRequestUserName().orElse(null));
|
||||
LOG.info("Scanner lease {} expired {}", this.scannerName, rsh);
|
||||
RegionScanner s = rsh.s;
|
||||
HRegion region = null;
|
||||
try {
|
||||
|
@ -514,8 +518,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
region.getCoprocessorHost().preScannerClose(s);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Closing scanner {} {}, user={}", this.scannerName, rsh, e,
|
||||
RpcServer.getRequestUserName().orElse(null));
|
||||
LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
|
||||
} finally {
|
||||
try {
|
||||
s.close();
|
||||
|
@ -523,8 +526,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
region.getCoprocessorHost().postScannerClose(s);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Closing scanner {} {}, user={}", this.scannerName, rsh, e,
|
||||
RpcServer.getRequestUserName().orElse(null));
|
||||
LOG.error("Closing scanner {} {}", this.scannerName, rsh, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1412,6 +1414,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
/**
|
||||
* @return Remote client's ip and port else null if can't be determined.
|
||||
*/
|
||||
@RestrictedApi(
|
||||
explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
|
||||
link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
|
||||
static String getRemoteClientIpAndPort() {
|
||||
RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
|
||||
if (rpcCall == null) {
|
||||
|
@ -1427,6 +1432,20 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
return Address.fromParts(address.getHostAddress(), rpcCall.getRemotePort()).toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Remote client's username.
|
||||
*/
|
||||
@RestrictedApi(
|
||||
explanation = "Should only be called in TestRSRpcServices and RSRpcServices",
|
||||
link = "", allowedOnPath = ".*(TestRSRpcServices|RSRpcServices).java")
|
||||
static String getUserName() {
|
||||
RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
|
||||
if (rpcCall == null) {
|
||||
return HConstants.EMPTY_STRING;
|
||||
}
|
||||
return rpcCall.getRequestUserName().orElse(HConstants.EMPTY_STRING);
|
||||
}
|
||||
|
||||
private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Shipper shipper,
|
||||
HRegion r, boolean needCursor, boolean fullRegionScan) throws LeaseStillHeldException {
|
||||
Lease lease = regionServer.getLeaseManager().createLease(
|
||||
|
@ -1435,7 +1454,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
RpcCallback closeCallback = s instanceof RpcCallback?
|
||||
(RpcCallback)s: new RegionScannerCloseCallBack(s);
|
||||
RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback,
|
||||
needCursor, fullRegionScan, getRemoteClientIpAndPort());
|
||||
needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName());
|
||||
RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
|
||||
assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " +
|
||||
scannerName + ", " + existing;
|
||||
|
|
|
@ -17,8 +17,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Optional;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.ipc.RpcCall;
|
||||
|
@ -54,12 +56,17 @@ public class TestRSRpcServices {
|
|||
Mockito.when(call.getRemotePort()).thenReturn(port);
|
||||
InetAddress address = InetAddress.getLocalHost();
|
||||
Mockito.when(call.getRemoteAddress()).thenReturn(address);
|
||||
Optional<String> userName = Optional.ofNullable("test");
|
||||
Mockito.when(call.getRequestUserName()).thenReturn(userName);
|
||||
RpcServer.setCurrentCall(call);
|
||||
String clientIpAndPort = RSRpcServices.getRemoteClientIpAndPort();
|
||||
String userNameTest = RSRpcServices.getUserName();
|
||||
assertEquals("test", userNameTest);
|
||||
HRegion region = Mockito.mock(HRegion.class);
|
||||
Mockito.when(region.getRegionInfo()).thenReturn(RegionInfoBuilder.FIRST_META_REGIONINFO);
|
||||
RSRpcServices.RegionScannerHolder rsh = new RSRpcServices.RegionScannerHolder(null, region,
|
||||
null, null, false, false, clientIpAndPort);
|
||||
LOG.info("rsh={}", rsh);
|
||||
null, null, false, false, clientIpAndPort,
|
||||
userNameTest);
|
||||
LOG.info("rsh: {}", rsh);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue