HBASE-16505 Add AsyncRegion interface to pass deadline and support async operations (Phil Yang)

This commit is contained in:
tedyu 2016-09-08 06:57:03 -07:00
parent c04b389181
commit 1574c6ef39
11 changed files with 526 additions and 11 deletions

View File

@ -113,6 +113,16 @@ public class DelegatingHBaseRpcController implements HBaseRpcController {
return delegate.hasCallTimeout();
}
@Override
public void setDeadline(long deadline) {
delegate.setDeadline(deadline);
}
@Override
public long getDeadline() {
return delegate.getDeadline();
}
@Override
public void setFailed(IOException e) {
delegate.setFailed(e);

View File

@ -67,6 +67,10 @@ public interface HBaseRpcController extends RpcController, CellScannable {
boolean hasCallTimeout();
void setDeadline(long deadline);
long getDeadline();
/**
* Set failed with an exception to pass on. For use in async rpc clients
* @param e exception to set with

View File

@ -51,6 +51,8 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
private IOException exception;
private long deadline = Long.MAX_VALUE;
/**
* Priority to set on this request. Set it here in controller so available composing the request.
* This is the ordained way of setting priorities going forward. We will be undoing the old
@ -117,6 +119,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
cellScanner = null;
exception = null;
callTimeout = null;
deadline = Long.MAX_VALUE;
// In the implementations of some callable with replicas, rpc calls are executed in a executor
// and we could cancel the operation from outside which means there could be a race between
// reset and startCancel. Although I think the race should be handled by the callable since the
@ -147,6 +150,16 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
return callTimeout != null;
}
@Override
public void setDeadline(long deadline) {
this.deadline = deadline;
}
@Override
public long getDeadline() {
return this.deadline;
}
@Override
public synchronized String errorText() {
if (!done || exception == null) {

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.util.Pair;
@ -121,7 +122,11 @@ public class CallRunner {
}
// make the call
resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
call.timestamp, this.status, call.startTime, call.timeout);
call.timestamp, this.status, call.startTime, call.timeout);
} catch (TimeoutIOException e) {
RpcServer.LOG.info("Timeout while handling request, won't send response to client: " + call,
e);
return;
} catch (Throwable e) {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
errorThrowable = e;

View File

@ -400,7 +400,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return "callId: " + this.id + " service: " + serviceName +
" methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
" size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
" connection: " + connection.toString();
" connection: " + connection.toString() +
" timeout: " + timeout;
}
String toTraceString() {
@ -2207,6 +2208,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
//get an instance of the method arg type
HBaseRpcController controller = new HBaseRpcControllerImpl(cellScanner);
controller.setCallTimeout(timeout);
controller.setDeadline(timeout > 0 ? receiveTime + timeout : Long.MAX_VALUE);
Message result = service.callBlockingMethod(md, controller, param);
long endTime = System.currentTimeMillis();
int processingTime = (int) (endTime - startTime);

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.wal.WALSplitter;
/**
* Async version of Region. Support non-blocking operations and can pass more information into
* the operations.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public interface AsyncRegion extends Region {
void getRowLock(RegionOperationContext<RowLock> context, byte[] row, boolean readLock);
void append(RegionOperationContext<Result> context, Append append, long nonceGroup, long nonce);
void batchMutate(RegionOperationContext<OperationStatus[]> context, Mutation[] mutations,
long nonceGroup, long nonce);
void batchReplay(RegionOperationContext<OperationStatus[]> context, WALSplitter.MutationReplay[] mutations,
long replaySeqId);
void checkAndMutate(RegionOperationContext<Boolean> context, byte [] row, byte [] family,
byte [] qualifier, CompareFilter.CompareOp compareOp, ByteArrayComparable comparator, Mutation mutation,
boolean writeToWAL);
void checkAndRowMutate(RegionOperationContext<Boolean> context, byte [] row, byte [] family,
byte [] qualifier, CompareFilter.CompareOp compareOp, ByteArrayComparable comparator,
RowMutations mutations, boolean writeToWAL);
void delete(RegionOperationContext<Void> context, Delete delete);
void get(RegionOperationContext<Result> context, Get get);
void get(RegionOperationContext<List<Cell>> context, Get get, boolean withCoprocessor);
void get(RegionOperationContext<List<Cell>> context, Get get, boolean withCoprocessor,
long nonceGroup, long nonce);
void getScanner(RegionOperationContext<RegionScanner> context, Scan scan);
void getScanner(RegionOperationContext<RegionScanner> context, Scan scan,
List<KeyValueScanner> additionalScanners);
void increment(RegionOperationContext<Result> context, Increment increment, long nonceGroup,
long nonce);
void mutateRow(RegionOperationContext<Void> context, RowMutations mutations);
void mutateRowsWithLocks(RegionOperationContext<Void> context, Collection<Mutation> mutations,
Collection<byte[]> rowsToLock, long nonceGroup, long nonce);
void processRowsWithLocks(RegionOperationContext<Void> context, RowProcessor<?,?> processor);
void processRowsWithLocks(RegionOperationContext<Void> context, RowProcessor<?,?> processor,
long nonceGroup, long nonce);
void processRowsWithLocks(RegionOperationContext<Void> context, RowProcessor<?,?> processor,
long timeout, long nonceGroup, long nonce);
void put(RegionOperationContext<Void> context, Put put);
}

View File

@ -128,6 +128,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterWrapper;
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
@ -197,7 +198,7 @@ import org.apache.htrace.TraceScope;
@SuppressWarnings("deprecation")
@InterfaceAudience.Private
public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
public class HRegion implements HeapSize, PropagatingConfigurationObserver, AsyncRegion {
private static final Log LOG = LogFactory.getLog(HRegion.class);
public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
@ -5296,6 +5297,200 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
@Override
public void append(RegionOperationContext<Result> context, Append append, long nonceGroup,
long nonce) {
try {
context.done(append(append, nonceGroup, nonce));
} catch (Throwable e) {
context.error(e);
}
}
@Override
public void batchMutate(RegionOperationContext<OperationStatus[]> context, Mutation[] mutations,
long nonceGroup, long nonce) {
try {
context.done(batchMutate(mutations, nonceGroup, nonce));
} catch (Throwable e) {
context.error(e);
}
}
@Override
public void batchReplay(RegionOperationContext<OperationStatus[]> context,
MutationReplay[] mutations, long replaySeqId) {
try {
context.done(batchReplay(mutations, replaySeqId));
} catch (Throwable e) {
context.error(e);
}
}
@Override
public void checkAndMutate(RegionOperationContext<Boolean> context, byte[] row, byte[] family,
byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
Mutation mutation, boolean writeToWAL) {
try {
context.done(
checkAndMutate(row, family, qualifier, compareOp, comparator, mutation, writeToWAL));
} catch (Throwable e) {
context.error(e);
}
}
@Override
public void checkAndRowMutate(RegionOperationContext<Boolean> context, byte[] row, byte[] family,
byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
RowMutations mutations, boolean writeToWAL) {
try {
context.done(
checkAndRowMutate(row, family, qualifier, compareOp, comparator, mutations, writeToWAL));
} catch (Throwable e) {
context.error(e);
}
}
@Override
public void delete(RegionOperationContext<Void> context, Delete delete) {
try {
delete(delete);
context.done(null);
} catch (Throwable e) {
context.error(e);
}
}
@Override
public void get(RegionOperationContext<Result> context, Get get) {
try {
context.done(get(get));
} catch (Throwable e) {
context.error(e);
}
}
@Override
public void get(RegionOperationContext<List<Cell>> context, Get get, boolean withCoprocessor) {
try {
context.done(get(get, withCoprocessor));
} catch (Throwable e) {
context.error(e);
}
}
@Override
public void get(RegionOperationContext<List<Cell>> context, Get get, boolean withCoprocessor,
long nonceGroup, long nonce) {
try {
context.done(get(get, withCoprocessor, nonceGroup, nonce));
} catch (Throwable e) {
context.error(e);
}
}
@Override
public void getScanner(RegionOperationContext<RegionScanner> context, Scan scan) {
try {
context.done(getScanner(scan));
} catch (Throwable e) {
context.error(e);
}
}
@Override
public void getScanner(RegionOperationContext<RegionScanner> context, Scan scan,
List<KeyValueScanner> additionalScanners) {
try {
context.done(getScanner(scan, additionalScanners));
} catch (Throwable e) {
context.error(e);
}
}
@Override
public void getRowLock(RegionOperationContext<RowLock> context, byte[] row, boolean readLock) {
try {
context.done(getRowLock(row, readLock));
} catch (Throwable e) {
context.error(e);
}
}
@Override
public void increment(RegionOperationContext<Result> context, Increment increment,
long nonceGroup, long nonce) {
try {
context.done(increment(increment, nonceGroup, nonce));
} catch (Throwable e) {
context.error(e);
}
}
@Override
public void mutateRow(RegionOperationContext<Void> context, RowMutations mutations) {
try {
mutateRow(mutations);
context.done(null);
} catch (Throwable e) {
context.error(e);
}
}
@Override
public void mutateRowsWithLocks(RegionOperationContext<Void> context,
Collection<Mutation> mutations, Collection<byte[]> rowsToLock, long nonceGroup, long nonce) {
try {
mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce);
context.done(null);
} catch (Throwable e) {
context.error(e);
}
}
@Override
public void processRowsWithLocks(RegionOperationContext<Void> context,
RowProcessor<?, ?> processor) {
try {
processRowsWithLocks(processor);
context.done(null);
} catch (Throwable e) {
context.error(e);
}
}
@Override
public void processRowsWithLocks(RegionOperationContext<Void> context,
RowProcessor<?, ?> processor, long nonceGroup, long nonce) {
try {
processRowsWithLocks(processor, nonceGroup, nonce);
context.done(null);
} catch (Throwable e) {
context.error(e);
}
}
@Override
public void processRowsWithLocks(RegionOperationContext<Void> context,
RowProcessor<?, ?> processor, long timeout, long nonceGroup, long nonce) {
try {
processRowsWithLocks(processor, timeout, nonceGroup, nonce);
context.done(null);
} catch (Throwable e) {
context.error(e);
}
}
@Override
public void put(RegionOperationContext<Void> context, Put put) {
try {
put(put);
context.done(null);
} catch (Throwable e) {
context.error(e);
}
}
public ConcurrentHashMap<HashedBytes, RowLockContext> getLockedRows() {
return lockedRows;
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Interface for listeners of AsyncRegion.
* @param <T> type of result, Void of it has no result.
*/
@InterfaceAudience.Private
public interface OperationListener<T> {
void completed(T result);
void failed(Throwable t);
}

View File

@ -282,15 +282,16 @@ public interface Region extends ConfigurationObserver {
}
/**
* Tries to acquire a lock on the given row.
* @param waitForLock if true, will block until the lock is available.
* Otherwise, just tries to obtain the lock and returns
* false if unavailable.
* @return the row lock if acquired,
* null if waitForLock was false and the lock was not acquired
* @throws IOException if waitForLock was true and the lock could not be acquired after waiting
*
* Get a row lock for the specified row. All locks are reentrant.
*
* Before calling this function make sure that a region operation has already been
* started (the calling thread has already acquired the region-close-guard lock).
* @param row The row actions will be performed against
* @param readLock is the lock reader or writer. True indicates that a non-exlcusive
* lock is requested
*/
RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException;
RowLock getRowLock(byte[] row, boolean readLock) throws IOException;
/**
* If the given list of row locks is not null, releases all locks.

View File

@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.protobuf.RpcController;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
/**
* In each operation of AsyncRegion, we pass a context object with information of the request.
* We can pass deadline of this request to AsyncRegion implementation to drop timeout request and
* not waste time on timed out requests.
* We can add listeners to watch the event of completion/failure of this operation, which helps us
* make operation of AsyncRegion non-blocking. It is important for Staged Event-Driven Architecture
* (SEDA), see HBASE-16583 for details.
* The context is RPC-free, don't add RPC related code. In RPC we should use listener to deal with
* the result.
* @param <T> The type of result, Void if the operation has no result.
*/
@InterfaceAudience.Private
public class RegionOperationContext<T> {
private long deadline = Long.MAX_VALUE;
private List<OperationListener<T>> listeners;
public long getDeadline() {
return deadline;
}
public void setDeadline(long deadline) {
this.deadline = deadline;
}
public RegionOperationContext() {
listeners = new ArrayList<>();
}
public RegionOperationContext(RegionOperationContext<T> context) {
this.deadline = context.deadline;
this.listeners = new ArrayList<>(context.listeners);
}
public RegionOperationContext(RpcController controller) {
if (controller instanceof HBaseRpcController) {
this.deadline = ((HBaseRpcController) controller).getDeadline();
}
}
public synchronized void addListener(OperationListener<T> listener) {
listeners.add(listener);
}
/**
* We will call this only in one thread, so no need to lock.
*/
public void error(Throwable error) {
for (OperationListener<T> listener : listeners) {
listener.failed(error);
}
}
/**
* We will call this only in one thread, so no need to lock.
*/
public void done(T result) {
for (OperationListener<T> listener : listeners) {
listener.completed(result);
}
}
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.common.base.Throwables;
import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* An OperationListener supporting getting result directly. Temporarily used when AsyncRegion is
* not fully non-blocking. When call getResult of this class, the operation must have been done.
*/
@InterfaceAudience.Private
public class SynchronousOperationListener<T> implements OperationListener<T> {
private T result;
private boolean done;
private Throwable error;
@Override
public void completed(T result) {
done = true;
this.result = result;
}
@Override
public void failed(Throwable error) {
done = true;
this.error = error;
}
/**
* We call this method after calling operation of AsyncRegion synchronously in the same thread.
* So no need to lock and success/fail must has been called.
*/
public T getResult() throws IOException {
assert done;
if (error != null) {
// We also need throw unchecked throwable
Throwables.propagateIfPossible(error, IOException.class);
// Wrap to IOE if it is not IOE or unchecked
throw new IOException(error);
}
return result;
}
}