HBASE-23895 STUCK Region-In-Transition when failed to insert procedure to procedure store (#1221)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Guanghao Zhang 2020-03-07 15:50:09 +08:00 committed by GitHub
parent d099f242e9
commit 9b0d214b7b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 246 additions and 26 deletions

View File

@ -679,6 +679,27 @@ public abstract class RpcServer implements RpcServerInterface,
return CurCall.get() != null; return CurCall.get() != null;
} }
/**
* Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. For
* master's rpc call, it may generate new procedure and mutate the region which store procedure.
* There are some check about rpc when mutate region, such as rpc timeout check. So unset the rpc
* call to avoid the rpc check.
* @return the currently ongoing rpc call
*/
public static Optional<RpcCall> unsetCurrentCall() {
Optional<RpcCall> rpcCall = getCurrentCall();
CurCall.set(null);
return rpcCall;
}
/**
* Used by {@link org.apache.hadoop.hbase.procedure2.store.region.RegionProcedureStore}. Set the
* rpc call back after mutate region.
*/
public static void setCurrentCall(RpcCall rpcCall) {
CurCall.set(rpcCall);
}
/** /**
* Returns the user credentials associated with the current RPC request or not present if no * Returns the user credentials associated with the current RPC request or not present if no
* credentials were provided. * credentials were provided.

View File

@ -29,6 +29,8 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -47,6 +49,8 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.assignment.AssignProcedure; import org.apache.hadoop.hbase.master.assignment.AssignProcedure;
@ -538,6 +542,20 @@ public class RegionProcedureStore extends ProcedureStoreBase {
rowsToLock.add(row); rowsToLock.add(row);
} }
/**
* Insert procedure may be called by master's rpc call. There are some check about the rpc call
* when mutate region. Here unset the current rpc call and set it back in finally block. See
* HBASE-23895 for more details.
*/
private void runWithoutRpcCall(Runnable runnable) {
Optional<RpcCall> rpcCall = RpcServer.unsetCurrentCall();
try {
runnable.run();
} finally {
rpcCall.ifPresent(RpcServer::setCurrentCall);
}
}
@Override @Override
public void insert(Procedure<?> proc, Procedure<?>[] subProcs) { public void insert(Procedure<?> proc, Procedure<?>[] subProcs) {
if (subProcs == null || subProcs.length == 0) { if (subProcs == null || subProcs.length == 0) {
@ -547,17 +565,19 @@ public class RegionProcedureStore extends ProcedureStoreBase {
} }
List<Mutation> mutations = new ArrayList<>(subProcs.length + 1); List<Mutation> mutations = new ArrayList<>(subProcs.length + 1);
List<byte[]> rowsToLock = new ArrayList<>(subProcs.length + 1); List<byte[]> rowsToLock = new ArrayList<>(subProcs.length + 1);
try { runWithoutRpcCall(() -> {
serializePut(proc, mutations, rowsToLock); try {
for (Procedure<?> subProc : subProcs) { serializePut(proc, mutations, rowsToLock);
serializePut(subProc, mutations, rowsToLock); for (Procedure<?> subProc : subProcs) {
serializePut(subProc, mutations, rowsToLock);
}
region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc,
Arrays.toString(subProcs), e);
throw new UncheckedIOException(e);
} }
region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE); });
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", proc,
Arrays.toString(subProcs), e);
throw new UncheckedIOException(e);
}
flusherAndCompactor.onUpdate(); flusherAndCompactor.onUpdate();
} }
@ -565,28 +585,32 @@ public class RegionProcedureStore extends ProcedureStoreBase {
public void insert(Procedure<?>[] procs) { public void insert(Procedure<?>[] procs) {
List<Mutation> mutations = new ArrayList<>(procs.length); List<Mutation> mutations = new ArrayList<>(procs.length);
List<byte[]> rowsToLock = new ArrayList<>(procs.length); List<byte[]> rowsToLock = new ArrayList<>(procs.length);
try { runWithoutRpcCall(() -> {
for (Procedure<?> proc : procs) { try {
serializePut(proc, mutations, rowsToLock); for (Procedure<?> proc : procs) {
serializePut(proc, mutations, rowsToLock);
}
region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e);
throw new UncheckedIOException(e);
} }
region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE); });
} catch (IOException e) {
LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", Arrays.toString(procs), e);
throw new UncheckedIOException(e);
}
flusherAndCompactor.onUpdate(); flusherAndCompactor.onUpdate();
} }
@Override @Override
public void update(Procedure<?> proc) { public void update(Procedure<?> proc) {
try { runWithoutRpcCall(() -> {
ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc); try {
region.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(FAMILY, PROC_QUALIFIER, ProcedureProtos.Procedure proto = ProcedureUtil.convertToProtoProcedure(proc);
proto.toByteArray())); region.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(FAMILY, PROC_QUALIFIER,
} catch (IOException e) { proto.toByteArray()));
LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e); } catch (IOException e) {
throw new UncheckedIOException(e); LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e);
} throw new UncheckedIOException(e);
}
});
flusherAndCompactor.onUpdate(); flusherAndCompactor.onUpdate();
} }

View File

@ -21,13 +21,22 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.InetAddress;
import java.util.HashSet; import java.util.HashSet;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcCallback;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.LoadCounter;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -37,6 +46,13 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
@Category({ MasterTests.class, SmallTests.class }) @Category({ MasterTests.class, SmallTests.class })
public class TestRegionProcedureStore extends RegionProcedureStoreTestBase { public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
@ -130,4 +146,163 @@ public class TestRegionProcedureStore extends RegionProcedureStoreTestBase {
assertFalse(store.region assertFalse(store.region
.get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists()); .get(new Get(Bytes.toBytes(proc3.getProcId())).setCheckExistenceOnly(true)).getExists());
} }
/**
* Test for HBASE-23895
*/
@Test
public void testInsertWithRpcCall() throws Exception {
RpcServer.setCurrentCall(newRpcCallWithDeadline());
RegionProcedureStoreTestProcedure proc1 = new RegionProcedureStoreTestProcedure();
store.insert(proc1, null);
RpcServer.setCurrentCall(null);
}
private RpcCall newRpcCallWithDeadline() {
return new RpcCall() {
@Override
public long getDeadline() {
return System.currentTimeMillis();
}
@Override
public BlockingService getService() {
return null;
}
@Override
public Descriptors.MethodDescriptor getMethod() {
return null;
}
@Override
public Message getParam() {
return null;
}
@Override
public CellScanner getCellScanner() {
return null;
}
@Override
public long getReceiveTime() {
return 0;
}
@Override
public long getStartTime() {
return 0;
}
@Override
public void setStartTime(long startTime) {
}
@Override
public int getTimeout() {
return 0;
}
@Override
public int getPriority() {
return 0;
}
@Override
public long getSize() {
return 0;
}
@Override
public RPCProtos.RequestHeader getHeader() {
return null;
}
@Override
public int getRemotePort() {
return 0;
}
@Override
public void setResponse(Message param, CellScanner cells, Throwable errorThrowable,
String error) {
}
@Override
public void sendResponseIfReady() throws IOException {
}
@Override
public void cleanup() {
}
@Override
public String toShortString() {
return null;
}
@Override
public long disconnectSince() {
return 0;
}
@Override
public boolean isClientCellBlockSupported() {
return false;
}
@Override
public Optional<User> getRequestUser() {
return Optional.empty();
}
@Override
public InetAddress getRemoteAddress() {
return null;
}
@Override
public HBaseProtos.VersionInfo getClientVersionInfo() {
return null;
}
@Override
public void setCallBack(RpcCallback callback) {
}
@Override
public boolean isRetryImmediatelySupported() {
return false;
}
@Override
public long getResponseCellSize() {
return 0;
}
@Override
public void incrementResponseCellSize(long cellSize) {
}
@Override
public long getResponseBlockSize() {
return 0;
}
@Override
public void incrementResponseBlockSize(long blockSize) {
}
@Override
public long getResponseExceptionSize() {
return 0;
}
@Override
public void incrementResponseExceptionSize(long exceptionSize) {
}
};
}
} }