diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index d36e89ba683..44909b272f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -679,6 +679,27 @@ public abstract class RpcServer implements RpcServerInterface, return CurCall.get() != null; } + /** + * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. For + * master's rpc call, it may generate new procedure and mutate the region which store procedure. + * There are some check about rpc when mutate region, such as rpc timeout check. So unset the rpc + * call to avoid the rpc check. + * @return the currently ongoing rpc call + */ + public static Optional unsetCurrentCall() { + Optional rpcCall = getCurrentCall(); + CurCall.set(null); + return rpcCall; + } + + /** + * Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the + * rpc call back after mutate region. + */ + public static void setCurrentCall(RpcCall rpcCall) { + CurCall.set(rpcCall); + } + /** * Returns the user credentials associated with the current RPC request or not present if no * credentials were provided. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java index d1535087eb1..296a08be7a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java @@ -29,6 +29,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; + import org.apache.commons.lang3.mutable.MutableLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -47,6 +49,8 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.assignment.AssignProcedure; @@ -538,6 +542,20 @@ public class RegionProcedureStore extends ProcedureStoreBase { rowsToLock.add(row); } + /** + * Insert procedure may be called by master's rpc call. There are some check about the rpc call + * when mutate region. Here unset the current rpc call and set it back in finally block. See + * HBASE-23895 for more details. + */ + private void runWithoutRpcCall(Runnable runnable) { + Optional rpcCall = RpcServer.unsetCurrentCall(); + try { + runnable.run(); + } finally { + rpcCall.ifPresent(RpcServer::setCurrentCall); + } + } + @Override public void insert(Procedure proc, Procedure[] subProcs) { if (subProcs == null || subProcs.length == 0) { @@ -547,17 +565,19 @@ public class RegionProcedureStore extends ProcedureStoreBase { } List mutations = new ArrayList<>(subProcs.length + 1); List rowsToLock = new ArrayList<>(subProcs.length + 1); - try { - serializePut(proc, mutations, rowsToLock); - for (Procedure subProc : subProcs) { - serializePut(subProc, mutations, rowsToLock); + runWithoutRpcCall(() -> { + try { + serializePut(proc, mutations, rowsToLock); + for (Procedure subProc : subProcs) { + serializePut(subProc, mutations, rowsToLock); + } + region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE); + } catch (IOException e) { + LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc, + Arrays.toString(subProcs), e); + throw new UncheckedIOException(e); } - region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE); - } catch (IOException e) { - LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc, - Arrays.toString(subProcs), e); - throw new UncheckedIOException(e); - } + }); flusherAndCompactor.onUpdate(); } @@ -565,28 +585,32 @@ public class RegionProcedureStore extends ProcedureStoreBase { public void insert(Procedure[] procs) { List mutations = new ArrayList<>(procs.length); List rowsToLock = new ArrayList<>(procs.length); - try { - for (Procedure proc : procs) { - serializePut(proc, mutations, rowsToLock); + runWithoutRpcCall(() -> { + try { + for (Procedure proc : procs) { + serializePut(proc, mutations, rowsToLock); + } + region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE); + } catch (IOException e) { + LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e); + throw new UncheckedIOException(e); } - region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE); - } catch (IOException e) { - LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e); - throw new UncheckedIOException(e); - } + }); flusherAndCompactor.onUpdate(); } @Override public void update(Procedure proc) { - try { - ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc); - region.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(FAMILY, PROC_QUALIFIER, - proto.toByteArray())); - } catch (IOException e) { - LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e); - throw new UncheckedIOException(e); - } + runWithoutRpcCall(() -> { + try { + ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc); + region.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(FAMILY, PROC_QUALIFIER, + proto.toByteArray())); + } catch (IOException e) { + LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e); + throw new UncheckedIOException(e); + } + }); flusherAndCompactor.onUpdate(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java index 8b6535f6faa..178e78d23f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure2/store/region/TestRegionProcedureStore.java @@ -21,13 +21,22 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.io.IOException; +import java.net.InetAddress; import java.util.HashSet; +import java.util.Optional; import java.util.Set; + +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.ipc.RpcCall; +import org.apache.hadoop.hbase.ipc.RpcCallback; +import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; @@ -37,6 +46,13 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; +import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; + @Category({ MasterTests.class, SmallTests.class }) public class TestRegionProcedureStore extends RegionProcedureStoreTestBase { @@ -130,4 +146,163 @@ public class TestRegionProcedureStore extends RegionProcedureStoreTestBase { assertFalse(store.region .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists()); } + + /** + * Test for HBASE-23895 + */ + @Test + public void testInsertWithRpcCall() throws Exception { + RpcServer.setCurrentCall(newRpcCallWithDeadline()); + RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure(); + store.insert(proc1, null); + RpcServer.setCurrentCall(null); + } + + private RpcCall newRpcCallWithDeadline() { + return new RpcCall() { + @Override + public long getDeadline() { + return System.currentTimeMillis(); + } + + @Override + public BlockingService getService() { + return null; + } + + @Override + public Descriptors.MethodDescriptor getMethod() { + return null; + } + + @Override + public Message getParam() { + return null; + } + + @Override + public CellScanner getCellScanner() { + return null; + } + + @Override + public long getReceiveTime() { + return 0; + } + + @Override + public long getStartTime() { + return 0; + } + + @Override + public void setStartTime(long startTime) { + + } + + @Override + public int getTimeout() { + return 0; + } + + @Override + public int getPriority() { + return 0; + } + + @Override + public long getSize() { + return 0; + } + + @Override + public RPCProtos.RequestHeader getHeader() { + return null; + } + + @Override + public int getRemotePort() { + return 0; + } + + @Override + public void setResponse(Message param, CellScanner cells, Throwable errorThrowable, + String error) { + } + + @Override + public void sendResponseIfReady() throws IOException { + } + + @Override + public void cleanup() { + } + + @Override + public String toShortString() { + return null; + } + + @Override + public long disconnectSince() { + return 0; + } + + @Override + public boolean isClientCellBlockSupported() { + return false; + } + + @Override + public Optional getRequestUser() { + return Optional.empty(); + } + + @Override + public InetAddress getRemoteAddress() { + return null; + } + + @Override + public HBaseProtos.VersionInfo getClientVersionInfo() { + return null; + } + + @Override + public void setCallBack(RpcCallback callback) { + } + + @Override + public boolean isRetryImmediatelySupported() { + return false; + } + + @Override + public long getResponseCellSize() { + return 0; + } + + @Override + public void incrementResponseCellSize(long cellSize) { + } + + @Override + public long getResponseBlockSize() { + return 0; + } + + @Override + public void incrementResponseBlockSize(long blockSize) { + } + + @Override + public long getResponseExceptionSize() { + return 0; + } + + @Override + public void incrementResponseExceptionSize(long exceptionSize) { + } + }; + } }