HBASE-23895 STUCK Region-In-Transition when failed to insert procedure to procedure store (#1221)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
d641726da5
commit
77dc78437f
|
@ -677,6 +677,27 @@ public abstract class RpcServer implements RpcServerInterface,
|
|||
return CurCall.get() != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. For
|
||||
* master's rpc call, it may generate new procedure and mutate the region which store procedure.
|
||||
* There are some check about rpc when mutate region, such as rpc timeout check. So unset the rpc
|
||||
* call to avoid the rpc check.
|
||||
* @return the currently ongoing rpc call
|
||||
*/
|
||||
public static Optional<RpcCall> unsetCurrentCall() {
|
||||
Optional<RpcCall> rpcCall = getCurrentCall();
|
||||
CurCall.set(null);
|
||||
return rpcCall;
|
||||
}
|
||||
|
||||
/**
|
||||
* Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the
|
||||
* rpc call back after mutate region.
|
||||
*/
|
||||
public static void setCurrentCall(RpcCall rpcCall) {
|
||||
CurCall.set(rpcCall);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the user credentials associated with the current RPC request or not present if no
|
||||
* credentials were provided.
|
||||
|
|
|
@ -29,6 +29,8 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import org.apache.commons.lang3.mutable.MutableLong;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -47,6 +49,8 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
|||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.ipc.RpcCall;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
|
||||
|
@ -538,6 +542,20 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
|||
rowsToLock.add(row);
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert procedure may be called by master's rpc call. There are some check about the rpc call
|
||||
* when mutate region. Here unset the current rpc call and set it back in finally block. See
|
||||
* HBASE-23895 for more details.
|
||||
*/
|
||||
private void runWithoutRpcCall(Runnable runnable) {
|
||||
Optional<RpcCall> rpcCall = RpcServer.unsetCurrentCall();
|
||||
try {
|
||||
runnable.run();
|
||||
} finally {
|
||||
rpcCall.ifPresent(RpcServer::setCurrentCall);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void insert(Procedure<?> proc, Procedure<?>[] subProcs) {
|
||||
if (subProcs == null || subProcs.length == 0) {
|
||||
|
@ -547,17 +565,19 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
|||
}
|
||||
List<Mutation> mutations = new ArrayList<>(subProcs.length + 1);
|
||||
List<byte[]> rowsToLock = new ArrayList<>(subProcs.length + 1);
|
||||
try {
|
||||
serializePut(proc, mutations, rowsToLock);
|
||||
for (Procedure<?> subProc : subProcs) {
|
||||
serializePut(subProc, mutations, rowsToLock);
|
||||
runWithoutRpcCall(() -> {
|
||||
try {
|
||||
serializePut(proc, mutations, rowsToLock);
|
||||
for (Procedure<?> subProc : subProcs) {
|
||||
serializePut(subProc, mutations, rowsToLock);
|
||||
}
|
||||
region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
|
||||
} catch (IOException e) {
|
||||
LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc,
|
||||
Arrays.toString(subProcs), e);
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
|
||||
} catch (IOException e) {
|
||||
LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc,
|
||||
Arrays.toString(subProcs), e);
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
});
|
||||
flusherAndCompactor.onUpdate();
|
||||
}
|
||||
|
||||
|
@ -565,28 +585,32 @@ public class RegionProcedureStore extends ProcedureStoreBase {
|
|||
public void insert(Procedure<?>[] procs) {
|
||||
List<Mutation> mutations = new ArrayList<>(procs.length);
|
||||
List<byte[]> rowsToLock = new ArrayList<>(procs.length);
|
||||
try {
|
||||
for (Procedure<?> proc : procs) {
|
||||
serializePut(proc, mutations, rowsToLock);
|
||||
runWithoutRpcCall(() -> {
|
||||
try {
|
||||
for (Procedure<?> proc : procs) {
|
||||
serializePut(proc, mutations, rowsToLock);
|
||||
}
|
||||
region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
|
||||
} catch (IOException e) {
|
||||
LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e);
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
|
||||
} catch (IOException e) {
|
||||
LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e);
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
});
|
||||
flusherAndCompactor.onUpdate();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void update(Procedure<?> proc) {
|
||||
try {
|
||||
ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
|
||||
region.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(FAMILY, PROC_QUALIFIER,
|
||||
proto.toByteArray()));
|
||||
} catch (IOException e) {
|
||||
LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e);
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
runWithoutRpcCall(() -> {
|
||||
try {
|
||||
ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
|
||||
region.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(FAMILY, PROC_QUALIFIER,
|
||||
proto.toByteArray()));
|
||||
} catch (IOException e) {
|
||||
LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e);
|
||||
throw new UncheckedIOException(e);
|
||||
}
|
||||
});
|
||||
flusherAndCompactor.onUpdate();
|
||||
}
|
||||
|
||||
|
|
|
@ -21,13 +21,22 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetAddress;
|
||||
import java.util.HashSet;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.ipc.RpcCall;
|
||||
import org.apache.hadoop.hbase.ipc.RpcCallback;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -37,6 +46,13 @@ import org.junit.experimental.categories.Category;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
|
||||
|
||||
@Category({ MasterTests.class, SmallTests.class })
|
||||
public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
|
||||
|
||||
|
@ -130,4 +146,163 @@ public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
|
|||
assertFalse(store.region
|
||||
.get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for HBASE-23895
|
||||
*/
|
||||
@Test
|
||||
public void testInsertWithRpcCall() throws Exception {
|
||||
RpcServer.setCurrentCall(newRpcCallWithDeadline());
|
||||
RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
|
||||
store.insert(proc1, null);
|
||||
RpcServer.setCurrentCall(null);
|
||||
}
|
||||
|
||||
private RpcCall newRpcCallWithDeadline() {
|
||||
return new RpcCall() {
|
||||
@Override
|
||||
public long getDeadline() {
|
||||
return System.currentTimeMillis();
|
||||
}
|
||||
|
||||
@Override
|
||||
public BlockingService getService() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Descriptors.MethodDescriptor getMethod() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message getParam() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CellScanner getCellScanner() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getReceiveTime() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getStartTime() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setStartTime(long startTime) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getTimeout() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getPriority() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSize() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RPCProtos.RequestHeader getHeader() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getRemotePort() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResponse(Message param, CellScanner cells, Throwable errorThrowable,
|
||||
String error) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendResponseIfReady() throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toShortString() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long disconnectSince() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isClientCellBlockSupported() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<User> getRequestUser() {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public InetAddress getRemoteAddress() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HBaseProtos.VersionInfo getClientVersionInfo() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setCallBack(RpcCallback callback) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRetryImmediatelySupported() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getResponseCellSize() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementResponseCellSize(long cellSize) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getResponseBlockSize() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementResponseBlockSize(long blockSize) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getResponseExceptionSize() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrementResponseExceptionSize(long exceptionSize) {
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue