HBASE-14475 Region split requests are always audited with hbase user rather than request user (Ted Yu)
This commit is contained in:
parent
afb3b19a15
commit
7aaef0f920
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -43,6 +44,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
|||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.StealJobQueue;
|
||||
|
@ -247,6 +249,13 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
}
|
||||
|
||||
public synchronized void requestSplit(final Region r, byte[] midKey) {
|
||||
requestSplit(r, midKey, null);
|
||||
}
|
||||
|
||||
/*
|
||||
* The User parameter allows the split thread to assume the correct user identity
|
||||
*/
|
||||
public synchronized void requestSplit(final Region r, byte[] midKey, User user) {
|
||||
if (midKey == null) {
|
||||
LOG.debug("Region " + r.getRegionInfo().getRegionNameAsString() +
|
||||
" not splittable because midkey=null");
|
||||
|
@ -256,7 +265,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
return;
|
||||
}
|
||||
try {
|
||||
this.splits.execute(new SplitRequest(r, midKey, this.server));
|
||||
this.splits.execute(new SplitRequest(r, midKey, this.server, user));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Split requested for " + r + ". " + this);
|
||||
}
|
||||
|
@ -274,54 +283,55 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
@Override
|
||||
public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
|
||||
List<Pair<CompactionRequest, Store>> requests) throws IOException {
|
||||
return requestCompaction(r, why, Store.NO_PRIORITY, requests);
|
||||
return requestCompaction(r, why, Store.NO_PRIORITY, requests, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized CompactionRequest requestCompaction(final Region r, final Store s,
|
||||
final String why, CompactionRequest request) throws IOException {
|
||||
return requestCompaction(r, s, why, Store.NO_PRIORITY, request);
|
||||
return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized List<CompactionRequest> requestCompaction(final Region r, final String why,
|
||||
int p, List<Pair<CompactionRequest, Store>> requests) throws IOException {
|
||||
return requestCompactionInternal(r, why, p, requests, true);
|
||||
int p, List<Pair<CompactionRequest, Store>> requests, User user) throws IOException {
|
||||
return requestCompactionInternal(r, why, p, requests, true, user);
|
||||
}
|
||||
|
||||
private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
|
||||
int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow) throws IOException {
|
||||
int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
|
||||
throws IOException {
|
||||
// not a special compaction request, so make our own list
|
||||
List<CompactionRequest> ret = null;
|
||||
if (requests == null) {
|
||||
ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
|
||||
for (Store s : r.getStores()) {
|
||||
CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow);
|
||||
CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
|
||||
if (selectNow) ret.add(cr);
|
||||
}
|
||||
} else {
|
||||
Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
|
||||
ret = new ArrayList<CompactionRequest>(requests.size());
|
||||
for (Pair<CompactionRequest, Store> pair : requests) {
|
||||
ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst()));
|
||||
ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
public CompactionRequest requestCompaction(final Region r, final Store s,
|
||||
final String why, int priority, CompactionRequest request) throws IOException {
|
||||
return requestCompactionInternal(r, s, why, priority, request, true);
|
||||
final String why, int priority, CompactionRequest request, User user) throws IOException {
|
||||
return requestCompactionInternal(r, s, why, priority, request, true, user);
|
||||
}
|
||||
|
||||
public synchronized void requestSystemCompaction(
|
||||
final Region r, final String why) throws IOException {
|
||||
requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false);
|
||||
requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null);
|
||||
}
|
||||
|
||||
public void requestSystemCompaction(
|
||||
final Region r, final Store s, final String why) throws IOException {
|
||||
requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false);
|
||||
requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -333,7 +343,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
* compaction will be used.
|
||||
*/
|
||||
private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
|
||||
final String why, int priority, CompactionRequest request, boolean selectNow)
|
||||
final String why, int priority, CompactionRequest request, boolean selectNow, User user)
|
||||
throws IOException {
|
||||
if (this.server.isStopped()
|
||||
|| (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
|
||||
|
@ -350,7 +360,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
// pool; we will do selection there, and move to large pool if necessary.
|
||||
ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
|
||||
? longCompactions : shortCompactions;
|
||||
pool.execute(new CompactionRunner(s, r, compaction, pool));
|
||||
pool.execute(new CompactionRunner(s, r, compaction, pool, user));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
String type = (pool == shortCompactions) ? "Small " : "Large ";
|
||||
LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
|
||||
|
@ -454,9 +464,10 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
private CompactionContext compaction;
|
||||
private int queuedPriority;
|
||||
private ThreadPoolExecutor parent;
|
||||
private User user;
|
||||
|
||||
public CompactionRunner(Store store, Region region,
|
||||
CompactionContext compaction, ThreadPoolExecutor parent) {
|
||||
CompactionContext compaction, ThreadPoolExecutor parent, User user) {
|
||||
super();
|
||||
this.store = store;
|
||||
this.region = (HRegion)region;
|
||||
|
@ -464,6 +475,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
this.queuedPriority = (this.compaction == null)
|
||||
? store.getCompactPriority() : compaction.getRequest().getPriority();
|
||||
this.parent = parent;
|
||||
this.user = user;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -472,13 +484,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
: ("Store = " + store.toString() + ", pri = " + queuedPriority);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Preconditions.checkNotNull(server);
|
||||
if (server.isStopped()
|
||||
|| (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
|
||||
return;
|
||||
}
|
||||
private void doCompaction() {
|
||||
// Common case - system compaction without a file selection. Select now.
|
||||
if (this.compaction == null) {
|
||||
int oldPriority = this.queuedPriority;
|
||||
|
@ -551,6 +557,31 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
this.compaction.getRequest().afterExecute();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
Preconditions.checkNotNull(server);
|
||||
if (server.isStopped()
|
||||
|| (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
|
||||
return;
|
||||
}
|
||||
if (this.user == null) doCompaction();
|
||||
else {
|
||||
try {
|
||||
user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
doCompaction();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Encountered exception while compacting", ioe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String formatStackTrace(Exception ex) {
|
||||
StringWriter sw = new StringWriter();
|
||||
PrintWriter pw = new PrintWriter(sw);
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.List;
|
|||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
|
@ -72,12 +73,14 @@ public interface CompactionRequestor {
|
|||
* @param requests custom compaction requests. Each compaction must specify the store on which it
|
||||
* is acting. Can be <tt>null</tt> in which case a compaction will be attempted on all
|
||||
* stores for the region.
|
||||
* @user the effective user
|
||||
* @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
|
||||
* compactions were started.
|
||||
* @throws IOException
|
||||
*/
|
||||
List<CompactionRequest> requestCompaction(
|
||||
final Region r, final String why, int pri, List<Pair<CompactionRequest, Store>> requests
|
||||
final Region r, final String why, int pri, List<Pair<CompactionRequest, Store>> requests,
|
||||
User user
|
||||
) throws IOException;
|
||||
|
||||
/**
|
||||
|
@ -87,10 +90,11 @@ public interface CompactionRequestor {
|
|||
* @param pri Priority of this compaction. minHeap. <=0 is critical
|
||||
* @param request custom compaction request to run. {@link Store} and {@link Region} for the
|
||||
* request must match the region and store specified here.
|
||||
* @param user
|
||||
* @return The created {@link CompactionRequest} or <tt>null</tt> if no compaction was started
|
||||
* @throws IOException
|
||||
*/
|
||||
CompactionRequest requestCompaction(
|
||||
final Region r, final Store s, final String why, int pri, CompactionRequest request
|
||||
final Region r, final Store s, final String why, int pri, CompactionRequest request, User user
|
||||
) throws IOException;
|
||||
}
|
||||
|
|
|
@ -1538,7 +1538,7 @@ public class HRegionServer extends HasThread implements
|
|||
} else {
|
||||
this.instance.compactSplitThread.requestCompaction(r, s, getName()
|
||||
+ " requests major compaction; use configured priority",
|
||||
this.majorCompactPriority, null);
|
||||
this.majorCompactPriority, null, null);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -1155,10 +1155,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
|
||||
if(family != null) {
|
||||
regionServer.compactSplitThread.requestCompaction(region, store, log,
|
||||
Store.PRIORITY_USER, null);
|
||||
Store.PRIORITY_USER, null, RpcServer.getRequestUser());
|
||||
} else {
|
||||
regionServer.compactSplitThread.requestCompaction(region, log,
|
||||
Store.PRIORITY_USER, null);
|
||||
Store.PRIORITY_USER, null, RpcServer.getRequestUser());
|
||||
}
|
||||
return CompactRegionResponse.newBuilder().build();
|
||||
} catch (IOException ie) {
|
||||
|
@ -1790,7 +1790,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
splitPoint = request.getSplitPoint().toByteArray();
|
||||
}
|
||||
((HRegion)region).forceSplit(splitPoint);
|
||||
regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit());
|
||||
regionServer.compactSplitThread.requestSplit(region, ((HRegion)region).checkSplit(),
|
||||
RpcServer.getRequestUser());
|
||||
return SplitRegionResponse.newBuilder().build();
|
||||
} catch (DroppedSnapshotException ex) {
|
||||
regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -26,6 +27,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.hbase.DroppedSnapshotException;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
@ -41,13 +43,15 @@ class SplitRequest implements Runnable {
|
|||
private final HRegion parent;
|
||||
private final byte[] midKey;
|
||||
private final HRegionServer server;
|
||||
private final User user;
|
||||
private TableLock tableLock;
|
||||
|
||||
SplitRequest(Region region, byte[] midKey, HRegionServer hrs) {
|
||||
SplitRequest(Region region, byte[] midKey, HRegionServer hrs, User user) {
|
||||
Preconditions.checkNotNull(hrs);
|
||||
this.parent = (HRegion)region;
|
||||
this.midKey = midKey;
|
||||
this.server = hrs;
|
||||
this.user = user;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -55,13 +59,7 @@ class SplitRequest implements Runnable {
|
|||
return "regionName=" + parent + ", midKey=" + Bytes.toStringBinary(midKey);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (this.server.isStopping() || this.server.isStopped()) {
|
||||
LOG.debug("Skipping split because server is stopping=" +
|
||||
this.server.isStopping() + " or stopped=" + this.server.isStopped());
|
||||
return;
|
||||
}
|
||||
private void doSplitting() {
|
||||
boolean success = false;
|
||||
server.metricsRegionServer.incrSplitRequest();
|
||||
long startTime = EnvironmentEdgeManager.currentTime();
|
||||
|
@ -147,6 +145,31 @@ class SplitRequest implements Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (this.server.isStopping() || this.server.isStopped()) {
|
||||
LOG.debug("Skipping split because server is stopping=" +
|
||||
this.server.isStopping() + " or stopped=" + this.server.isStopped());
|
||||
return;
|
||||
}
|
||||
if (this.user == null) doSplitting();
|
||||
else {
|
||||
try {
|
||||
user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
doSplitting();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Encountered exception while splitting", ioe);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void releaseTableLock() {
|
||||
if (this.tableLock != null) {
|
||||
try {
|
||||
|
|
|
@ -290,7 +290,7 @@ public class TestCompaction {
|
|||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
TrackableCompactionRequest request = new TrackableCompactionRequest(latch);
|
||||
thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request);
|
||||
thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request,null);
|
||||
// wait for the latch to complete.
|
||||
latch.await();
|
||||
|
||||
|
@ -326,7 +326,7 @@ public class TestCompaction {
|
|||
}
|
||||
|
||||
thread.requestCompaction(r, "test mulitple custom comapctions", Store.PRIORITY_USER,
|
||||
Collections.unmodifiableList(requests));
|
||||
Collections.unmodifiableList(requests), null);
|
||||
|
||||
// wait for the latch to complete.
|
||||
latch.await();
|
||||
|
|
Loading…
Reference in New Issue