HBASE-14631 Region merge request should be audited with request user through proper scope of doAs() calls to region observer notifications

This commit is contained in:
tedyu 2015-10-19 08:41:58 -07:00
parent 39f6a4eb0b
commit 61b11e0765
6 changed files with 171 additions and 19 deletions

View File

@ -223,9 +223,9 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
}
public synchronized void requestRegionsMerge(final Region a,
final Region b, final boolean forcible, long masterSystemTime) {
final Region b, final boolean forcible, long masterSystemTime, User user) {
try {
mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime));
mergePool.execute(new RegionMergeRequest(a, b, this.server, forcible, masterSystemTime,user));
if (LOG.isDebugEnabled()) {
LOG.debug("Region merge requested for " + a + "," + b + ", forcible="
+ forcible + ". " + this);

View File

@ -1349,7 +1349,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
regionServer.metricsRegionServer.updateFlushTime(endTime - startTime);
}
regionServer.compactSplitThread.requestRegionsMerge(regionA, regionB, forcible,
masterSystemTime);
masterSystemTime, RpcServer.getRequestUser());
return MergeRegionsResponse.newBuilder().build();
} catch (DroppedSnapshotException ex) {
regionServer.abort("Replay of WAL required. Forcing server shutdown", ex);

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils;
@ -43,15 +44,17 @@ class RegionMergeRequest implements Runnable {
private final boolean forcible;
private TableLock tableLock;
private final long masterSystemTime;
private final User user;
RegionMergeRequest(Region a, Region b, HRegionServer hrs, boolean forcible,
long masterSystemTime) {
long masterSystemTime, User user) {
Preconditions.checkNotNull(hrs);
this.region_a = (HRegion)a;
this.region_b = (HRegion)b;
this.server = hrs;
this.forcible = forcible;
this.masterSystemTime = masterSystemTime;
this.user = user;
}
@Override
@ -88,7 +91,7 @@ class RegionMergeRequest implements Runnable {
// the prepare call -- we are not ready to merge just now. Just return.
if (!mt.prepare(this.server)) return;
try {
mt.execute(this.server, this.server);
mt.execute(this.server, this.server, this.user);
} catch (Exception e) {
if (this.server.isStopping() || this.server.isStopped()) {
LOG.info(

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.security.User;
/**
* Executes region merge as a "transaction". It is similar with
@ -170,9 +171,24 @@ public interface RegionMergeTransaction {
* @return merged region
* @throws IOException
* @see #rollback(Server, RegionServerServices)
* @deprecated use #execute(Server, RegionServerServices, User)
*/
@Deprecated
Region execute(Server server, RegionServerServices services) throws IOException;
/**
* Run the transaction.
* @param server Hosting server instance. Can be null when testing
* @param services Used to online/offline regions.
* @param user
* @throws IOException If thrown, transaction failed. Call
* {@link #rollback(Server, RegionServerServices)}
* @return merged region
* @throws IOException
* @see #rollback(Server, RegionServerServices, User)
*/
Region execute(Server server, RegionServerServices services, User user) throws IOException;
/**
* Roll back a failed transaction
* @param server Hosting server instance (May be null when testing).
@ -181,9 +197,23 @@ public interface RegionMergeTransaction {
* @return True if we successfully rolled back, false if we got to the point
* of no return and so now need to abort the server to minimize
* damage.
* @deprecated use #rollback(Server, RegionServerServices, User)
*/
@Deprecated
boolean rollback(Server server, RegionServerServices services) throws IOException;
/**
* Roll back a failed transaction
* @param server Hosting server instance (May be null when testing).
* @param services Services of regionserver, used to online regions.
* @param user
* @throws IOException If thrown, rollback failed. Take drastic action.
* @return True if we successfully rolled back, false if we got to the point
* of no return and so now need to abort the server to minimize
* damage.
*/
boolean rollback(Server server, RegionServerServices services, User user) throws IOException;
/**
* Register a listener for transaction preparation, execution, and possibly
* rollback phases.

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
@ -42,6 +44,7 @@ import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
import org.apache.hadoop.hbase.coordination.RegionMergeCoordination.RegionMergeDetails;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl.LoggingProgressable;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConfigUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -249,6 +252,15 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
@Override
public HRegion execute(final Server server,
final RegionServerServices services) throws IOException {
if (User.isHBaseSecurityEnabled(region_a.getBaseConf())) {
LOG.warn("Should use execute(Server, RegionServerServices, User)");
}
return execute(server, services, null);
}
@Override
public HRegion execute(final Server server, final RegionServerServices services, User user)
throws IOException {
this.server = server;
this.rsServices = services;
useCoordinationForAssignment =
@ -263,19 +275,35 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
rsCoprocessorHost = server != null ?
((HRegionServer) server).getRegionServerCoprocessorHost() : null;
}
HRegion mergedRegion = createMergedRegion(server, services);
final HRegion mergedRegion = createMergedRegion(server, services, user);
if (rsCoprocessorHost != null) {
rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion);
if (user == null) {
rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion);
} else {
try {
user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
rsCoprocessorHost.postMergeCommit(region_a, region_b, mergedRegion);
return null;
}
});
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
}
mergedRegion = stepsAfterPONR(server, services, mergedRegion);
stepsAfterPONR(server, services, mergedRegion, user);
transition(RegionMergeTransactionPhase.COMPLETED);
return mergedRegion;
}
public HRegion stepsAfterPONR(final Server server, final RegionServerServices services,
HRegion mergedRegion) throws IOException {
public void stepsAfterPONR(final Server server, final RegionServerServices services,
final HRegion mergedRegion, User user) throws IOException {
openMergedRegion(server, services, mergedRegion);
if (useCoordination(server)) {
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
@ -283,9 +311,24 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
mergedRegionInfo, region_a, region_b, rmd, mergedRegion);
}
if (rsCoprocessorHost != null) {
rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion);
if (user == null) {
rsCoprocessorHost.postMerge(region_a, region_b, mergedRegion);
} else {
try {
user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
rsCoprocessorHost.postMerge(region_a, region_b, mergedRegion);
return null;
}
});
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
}
return mergedRegion;
}
/**
@ -297,7 +340,7 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
* {@link #rollback(Server, RegionServerServices)}
*/
HRegion createMergedRegion(final Server server,
final RegionServerServices services) throws IOException {
final RegionServerServices services, User user) throws IOException {
LOG.info("Starting merge of " + region_a + " and "
+ region_b.getRegionInfo().getRegionNameAsString() + ", forcible=" + forcible);
if ((server != null && server.isStopped())
@ -306,7 +349,24 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
}
if (rsCoprocessorHost != null) {
if (rsCoprocessorHost.preMerge(this.region_a, this.region_b)) {
boolean ret = false;
if (user == null) {
ret = rsCoprocessorHost.preMerge(region_a, region_b);
} else {
try {
ret = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws Exception {
return rsCoprocessorHost.preMerge(region_a, region_b);
}
});
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
if (ret) {
throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
+ this.region_b + " merge.");
}
@ -319,9 +379,27 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
HRegion mergedRegion = stepsBeforePONR(server, services, testing);
@MetaMutationAnnotation
List<Mutation> metaEntries = new ArrayList<Mutation>();
final List<Mutation> metaEntries = new ArrayList<Mutation>();
if (rsCoprocessorHost != null) {
if (rsCoprocessorHost.preMergeCommit(this.region_a, this.region_b, metaEntries)) {
boolean ret = false;
if (user == null) {
ret = rsCoprocessorHost.preMergeCommit(region_a, region_b, metaEntries);
} else {
try {
ret = user.getUGI().doAs(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws Exception {
return rsCoprocessorHost.preMergeCommit(region_a, region_b, metaEntries);
}
});
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
if (ret) {
throw new IOException("Coprocessor bypassing regions " + this.region_a + " "
+ this.region_b + " merge.");
}
@ -685,10 +763,35 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
@SuppressWarnings("deprecation")
public boolean rollback(final Server server,
final RegionServerServices services) throws IOException {
if (User.isHBaseSecurityEnabled(region_a.getBaseConf())) {
LOG.warn("Should use execute(Server, RegionServerServices, User)");
}
return rollback(server, services, null);
}
@Override
public boolean rollback(final Server server,
final RegionServerServices services, User user) throws IOException {
assert this.mergedRegionInfo != null;
// Coprocessor callback
if (rsCoprocessorHost != null) {
rsCoprocessorHost.preRollBackMerge(this.region_a, this.region_b);
if (user == null) {
rsCoprocessorHost.preRollBackMerge(region_a, region_b);
} else {
try {
user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
rsCoprocessorHost.preRollBackMerge(region_a, region_b);
return null;
}
});
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
}
boolean result = true;
@ -776,7 +879,23 @@ public class RegionMergeTransactionImpl implements RegionMergeTransaction {
}
// Coprocessor callback
if (rsCoprocessorHost != null) {
rsCoprocessorHost.postRollBackMerge(this.region_a, this.region_b);
if (user == null) {
rsCoprocessorHost.postRollBackMerge(region_a, region_b);
} else {
try {
user.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
rsCoprocessorHost.postRollBackMerge(region_a, region_b);
return null;
}
});
} catch (InterruptedException ie) {
InterruptedIOException iioe = new InterruptedIOException();
iioe.initCause(ie);
throw iioe;
}
}
}
return result;

View File

@ -166,7 +166,7 @@ public class TestRegionServerObserver {
preMergeAfterPONRCalled = true;
RegionServerCoprocessorEnvironment environment = ctx.getEnvironment();
HRegionServer rs = (HRegionServer) environment.getRegionServerServices();
rmt.stepsAfterPONR(rs, rs, this.mergedRegion);
rmt.stepsAfterPONR(rs, rs, this.mergedRegion, null);
}
@Override