HBASE-14631 Region merge request should be audited with request user through proper scope of doAs() calls to region observer notifications
This commit is contained in:
parent
8e6316a80c
commit
57fea77074
|
@ -223,9 +223,9 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
}
|
||||
|
||||
public synchronized void requestRegionsMerge(final Region a,
|
||||
final Region b, final boolean forcible, long masterSystemTime) {
|
||||
final Region b, final boolean forcible, long masterSystemTime, User user) {
|
||||
try {
|
||||
mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime));
|
||||
mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime,user));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
|
||||
+ forcible + ". " + this);
|
||||
|
|
|
@ -1432,7 +1432,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
|
||||
}
|
||||
regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
|
||||
masterSystemTime);
|
||||
masterSystemTime, RpcServer.getRequestUser());
|
||||
return MergeRegionsResponse.newBuilder().build();
|
||||
} catch (DroppedSnapshotException ex) {
|
||||
regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.DroppedSnapshotException;
|
||||
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
@ -43,15 +44,17 @@ class RegionMergeRequest implements Runnable {
|
|||
private final boolean forcible;
|
||||
private TableLock tableLock;
|
||||
private final long masterSystemTime;
|
||||
private final User user;
|
||||
|
||||
RegionMergeRequest(Region a, Region b, HRegionServer hrs, boolean forcible,
|
||||
long masterSystemTime) {
|
||||
long masterSystemTime, User user) {
|
||||
Preconditions.checkNotNull(hrs);
|
||||
this.region_a = (HRegion)a;
|
||||
this.region_b = (HRegion)b;
|
||||
this.server = hrs;
|
||||
this.forcible = forcible;
|
||||
this.masterSystemTime = masterSystemTime;
|
||||
this.user = user;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -88,7 +91,7 @@ class RegionMergeRequest implements Runnable {
|
|||
// the prepare call -- we are not ready to merge just now. Just return.
|
||||
if (!mt.prepare(this.server)) return;
|
||||
try {
|
||||
mt.execute(this.server, this.server);
|
||||
mt.execute(this.server, this.server, this.user);
|
||||
} catch (Exception e) {
|
||||
if (this.server.isStopping() || this.server.isStopped()) {
|
||||
LOG.info(
|
||||
|
|
|
@ -21,12 +21,12 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.regionserver.SplitTransaction.TransactionListener;
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
|
||||
/**
|
||||
* Executes region merge as a "transaction". It is similar with
|
||||
|
@ -171,9 +171,24 @@ public interface RegionMergeTransaction {
|
|||
* @return merged region
|
||||
* @throws IOException
|
||||
* @see #rollback(Server, RegionServerServices)
|
||||
* @deprecated use #execute(Server, RegionServerServices, User)
|
||||
*/
|
||||
@Deprecated
|
||||
Region execute(Server server, RegionServerServices services) throws IOException;
|
||||
|
||||
/**
|
||||
* Run the transaction.
|
||||
* @param server Hosting server instance. Can be null when testing
|
||||
* @param services Used to online/offline regions.
|
||||
* @param user
|
||||
* @throws IOException If thrown, transaction failed. Call
|
||||
* {@link #rollback(Server, RegionServerServices)}
|
||||
* @return merged region
|
||||
* @throws IOException
|
||||
* @see #rollback(Server, RegionServerServices, User)
|
||||
*/
|
||||
Region execute(Server server, RegionServerServices services, User user) throws IOException;
|
||||
|
||||
/**
|
||||
* Roll back a failed transaction
|
||||
* @param server Hosting server instance (May be null when testing).
|
||||
|
@ -182,9 +197,23 @@ public interface RegionMergeTransaction {
|
|||
* @return True if we successfully rolled back, false if we got to the point
|
||||
* of no return and so now need to abort the server to minimize
|
||||
* damage.
|
||||
* @deprecated use #rollback(Server, RegionServerServices, User)
|
||||
*/
|
||||
@Deprecated
|
||||
boolean rollback(Server server, RegionServerServices services) throws IOException;
|
||||
|
||||
/**
|
||||
* Roll back a failed transaction
|
||||
* @param server Hosting server instance (May be null when testing).
|
||||
* @param services Services of regionserver, used to online regions.
|
||||
* @param user
|
||||
* @throws IOException If thrown, rollback failed. Take drastic action.
|
||||
* @return True if we successfully rolled back, false if we got to the point
|
||||
* of no return and so now need to abort the server to minimize
|
||||
* damage.
|
||||
*/
|
||||
boolean rollback(Server server, RegionServerServices services, User user) throws IOException;
|
||||
|
||||
/**
|
||||
* Register a listener for transaction preparation, execution, and possibly
|
||||
* rollback phases.
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
|
@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.client.Mutation;
|
|||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||
import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl.LoggingProgressable;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
@ -227,17 +230,42 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
|
|||
@Override
|
||||
public Region execute(final Server server, final RegionServerServices services)
|
||||
throws IOException {
|
||||
if (User.isHBaseSecurityEnabled(region_a.getBaseConf())) {
|
||||
LOG.warn("Should use execute(Server, RegionServerServices, User)");
|
||||
}
|
||||
return execute(server, services, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Region execute(final Server server, final RegionServerServices services, User user)
|
||||
throws IOException {
|
||||
this.server = server;
|
||||
this.rsServices = services;
|
||||
if (rsCoprocessorHost == null) {
|
||||
rsCoprocessorHost = server != null ?
|
||||
((HRegionServer) server).getRegionServerCoprocessorHost() : null;
|
||||
}
|
||||
HRegion mergedRegion = createMergedRegion(server, services);
|
||||
final HRegion mergedRegion = createMergedRegion(server, services, user);
|
||||
if (rsCoprocessorHost != null) {
|
||||
rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion);
|
||||
if (user == null) {
|
||||
rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion);
|
||||
} else {
|
||||
try {
|
||||
user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
rsCoprocessorHost.postMergeCommit(region_a, region_b, mergedRegion);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException ie) {
|
||||
InterruptedIOException iioe = new InterruptedIOException();
|
||||
iioe.initCause(ie);
|
||||
throw iioe;
|
||||
}
|
||||
}
|
||||
}
|
||||
stepsAfterPONR(server, services, mergedRegion);
|
||||
stepsAfterPONR(server, services, mergedRegion, user);
|
||||
|
||||
transition(RegionMergeTransactionPhase.COMPLETED);
|
||||
|
||||
|
@ -246,10 +274,26 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
|
|||
|
||||
@VisibleForTesting
|
||||
public void stepsAfterPONR(final Server server, final RegionServerServices services,
|
||||
HRegion mergedRegion) throws IOException {
|
||||
final HRegion mergedRegion, User user) throws IOException {
|
||||
openMergedRegion(server, services, mergedRegion);
|
||||
if (rsCoprocessorHost != null) {
|
||||
rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion);
|
||||
if (user == null) {
|
||||
rsCoprocessorHost.postMerge(region_a, region_b, mergedRegion);
|
||||
} else {
|
||||
try {
|
||||
user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
rsCoprocessorHost.postMerge(region_a, region_b, mergedRegion);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException ie) {
|
||||
InterruptedIOException iioe = new InterruptedIOException();
|
||||
iioe.initCause(ie);
|
||||
throw iioe;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -261,8 +305,8 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
|
|||
* @throws IOException If thrown, transaction failed. Call
|
||||
* {@link #rollback(Server, RegionServerServices)}
|
||||
*/
|
||||
private HRegion createMergedRegion(final Server server, final RegionServerServices services)
|
||||
throws IOException {
|
||||
private HRegion createMergedRegion(final Server server, final RegionServerServices services,
|
||||
User user) throws IOException {
|
||||
LOG.info("Starting merge of " + region_a + " and "
|
||||
+ region_b.getRegionInfo().getRegionNameAsString() + ", forcible=" + forcible);
|
||||
if ((server != null && server.isStopped())
|
||||
|
@ -271,7 +315,24 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
|
|||
}
|
||||
|
||||
if (rsCoprocessorHost != null) {
|
||||
if (rsCoprocessorHost.preMerge(this.region_a, this.region_b)) {
|
||||
boolean ret = false;
|
||||
if (user == null) {
|
||||
ret = rsCoprocessorHost.preMerge(region_a, region_b);
|
||||
} else {
|
||||
try {
|
||||
ret = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
|
||||
@Override
|
||||
public Boolean run() throws Exception {
|
||||
return rsCoprocessorHost.preMerge(region_a, region_b);
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException ie) {
|
||||
InterruptedIOException iioe = new InterruptedIOException();
|
||||
iioe.initCause(ie);
|
||||
throw iioe;
|
||||
}
|
||||
}
|
||||
if (ret) {
|
||||
throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
|
||||
+ this.region_b + " merge.");
|
||||
}
|
||||
|
@ -284,9 +345,27 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
|
|||
HRegion mergedRegion = stepsBeforePONR(server, services, testing);
|
||||
|
||||
@MetaMutationAnnotation
|
||||
List<Mutation> metaEntries = new ArrayList<Mutation>();
|
||||
final List<Mutation> metaEntries = new ArrayList<Mutation>();
|
||||
if (rsCoprocessorHost != null) {
|
||||
if (rsCoprocessorHost.preMergeCommit(this.region_a, this.region_b, metaEntries)) {
|
||||
boolean ret = false;
|
||||
if (user == null) {
|
||||
ret = rsCoprocessorHost.preMergeCommit(region_a, region_b, metaEntries);
|
||||
} else {
|
||||
try {
|
||||
ret = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
|
||||
@Override
|
||||
public Boolean run() throws Exception {
|
||||
return rsCoprocessorHost.preMergeCommit(region_a, region_b, metaEntries);
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException ie) {
|
||||
InterruptedIOException iioe = new InterruptedIOException();
|
||||
iioe.initCause(ie);
|
||||
throw iioe;
|
||||
}
|
||||
}
|
||||
|
||||
if (ret) {
|
||||
throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
|
||||
+ this.region_b + " merge.");
|
||||
}
|
||||
|
@ -565,12 +644,37 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
|
|||
@Override
|
||||
public boolean rollback(final Server server,
|
||||
final RegionServerServices services) throws IOException {
|
||||
if (User.isHBaseSecurityEnabled(region_a.getBaseConf())) {
|
||||
LOG.warn("Should use execute(Server, RegionServerServices, User)");
|
||||
}
|
||||
return rollback(server, services, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean rollback(final Server server,
|
||||
final RegionServerServices services, User user) throws IOException {
|
||||
assert this.mergedRegionInfo != null;
|
||||
this.server = server;
|
||||
this.rsServices = services;
|
||||
// Coprocessor callback
|
||||
if (rsCoprocessorHost != null) {
|
||||
rsCoprocessorHost.preRollBackMerge(this.region_a, this.region_b);
|
||||
if (user == null) {
|
||||
rsCoprocessorHost.preRollBackMerge(region_a, region_b);
|
||||
} else {
|
||||
try {
|
||||
user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
rsCoprocessorHost.preRollBackMerge(region_a, region_b);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException ie) {
|
||||
InterruptedIOException iioe = new InterruptedIOException();
|
||||
iioe.initCause(ie);
|
||||
throw iioe;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
boolean result = true;
|
||||
|
@ -655,7 +759,23 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
|
|||
}
|
||||
// Coprocessor callback
|
||||
if (rsCoprocessorHost != null) {
|
||||
rsCoprocessorHost.postRollBackMerge(this.region_a, this.region_b);
|
||||
if (user == null) {
|
||||
rsCoprocessorHost.postRollBackMerge(region_a, region_b);
|
||||
} else {
|
||||
try {
|
||||
user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
rsCoprocessorHost.postRollBackMerge(region_a, region_b);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} catch (InterruptedException ie) {
|
||||
InterruptedIOException iioe = new InterruptedIOException();
|
||||
iioe.initCause(ie);
|
||||
throw iioe;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
|
|
|
@ -165,7 +165,7 @@ public class TestRegionServerObserver {
|
|||
preMergeAfterPONRCalled = true;
|
||||
RegionServerCoprocessorEnvironment environment = ctx.getEnvironment();
|
||||
HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
|
||||
rmt.stepsAfterPONR(rs, rs, this.mergedRegion);
|
||||
rmt.stepsAfterPONR(rs, rs, this.mergedRegion, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue