HBASE-16854 Refactor the org.apache.hadoop.hbase.client.Action (ChiaPing Tsai)

This commit is contained in:
tedyu 2016-10-18 09:47:25 -07:00
parent 317136e272
commit bb6cc4d43e
11 changed files with 140 additions and 125 deletions

View File

@ -27,16 +27,13 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
* the index from the original request.
*/
@InterfaceAudience.Private
//TODO: R is never used
public class Action<R> implements Comparable<R> {
// TODO: This class should not be visible outside of the client package.
private Row action;
private int originalIndex;
public class Action implements Comparable<Action> {
private final Row action;
private final int originalIndex;
private long nonce = HConstants.NO_NONCE;
private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
public Action(Row action, int originalIndex) {
super();
this.action = action;
this.originalIndex = originalIndex;
}
@ -46,15 +43,13 @@ public class Action<R> implements Comparable<R> {
* @param action Original action.
* @param replicaId Replica id for the new action.
*/
public Action(Action<R> action, int replicaId) {
super();
public Action(Action action, int replicaId) {
this.action = action.action;
this.nonce = action.nonce;
this.originalIndex = action.originalIndex;
this.replicaId = replicaId;
}
public void setNonce(long nonce) {
this.nonce = nonce;
}
@ -75,10 +70,9 @@ public class Action<R> implements Comparable<R> {
return replicaId;
}
@SuppressWarnings("rawtypes")
@Override
public int compareTo(Object o) {
return action.compareTo(((Action) o).getAction());
public int compareTo(Action other) {
return action.compareTo(other.getAction());
}
@Override
@ -89,9 +83,10 @@ public class Action<R> implements Comparable<R> {
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
Action<?> other = (Action<?>) obj;
return compareTo(other) == 0;
if (obj instanceof Action) {
return compareTo((Action) obj) == 0;
}
return false;
}
public long getNonce() {

View File

@ -347,9 +347,9 @@ class AsyncProcess {
return NO_REQS_RESULT;
}
Map<ServerName, MultiAction<Row>> actionsByServer =
new HashMap<ServerName, MultiAction<Row>>();
List<Action<Row>> retainedActions = new ArrayList<Action<Row>>(rows.size());
Map<ServerName, MultiAction> actionsByServer =
new HashMap<ServerName, MultiAction>();
List<Action> retainedActions = new ArrayList<Action>(rows.size());
NonceGenerator ng = this.connection.getNonceGenerator();
long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client.
@ -388,7 +388,7 @@ class AsyncProcess {
LOG.error("Failed to get region location ", ex);
// This action failed before creating ars. Retain it, but do not add to submit list.
// We will then add it to ars in an already-failed state.
retainedActions.add(new Action<Row>(r, ++posInList));
retainedActions.add(new Action(r, ++posInList));
locationErrors.add(ex);
locationErrorRows.add(posInList);
it.remove();
@ -400,7 +400,7 @@ class AsyncProcess {
break;
}
if (code == ReturnCode.INCLUDE) {
Action<Row> action = new Action<Row>(r, ++posInList);
Action action = new Action(r, ++posInList);
setNonce(ng, r, action);
retainedActions.add(action);
// TODO: replica-get is not supported on this path
@ -431,9 +431,9 @@ class AsyncProcess {
));
}
<CResult> AsyncRequestFuture submitMultiActions(TableName tableName,
List<Action<Row>> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
List<Action> retainedActions, long nonceGroup, Batch.Callback<CResult> callback,
Object[] results, boolean needResults, List<Exception> locationErrors,
List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
List<Integer> locationErrorRows, Map<ServerName, MultiAction> actionsByServer,
ExecutorService pool) {
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, -1);
@ -467,11 +467,11 @@ class AsyncProcess {
* @param actionsByServer the multiaction per server
* @param nonceGroup Nonce group.
*/
static void addAction(ServerName server, byte[] regionName, Action<Row> action,
Map<ServerName, MultiAction<Row>> actionsByServer, long nonceGroup) {
MultiAction<Row> multiAction = actionsByServer.get(server);
static void addAction(ServerName server, byte[] regionName, Action action,
Map<ServerName, MultiAction> actionsByServer, long nonceGroup) {
MultiAction multiAction = actionsByServer.get(server);
if (multiAction == null) {
multiAction = new MultiAction<Row>();
multiAction = new MultiAction();
actionsByServer.put(server, multiAction);
}
if (action.hasNonce() && !multiAction.hasNonceGroup()) {
@ -499,7 +499,7 @@ class AsyncProcess {
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
CancellableRegionServerCallable callable, int rpcTimeout) {
List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
List<Action> actions = new ArrayList<Action>(rows.size());
// The position will be used by the processBatch to match the object array returned.
int posInList = -1;
@ -512,7 +512,7 @@ class AsyncProcess {
throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
}
}
Action<Row> action = new Action<Row>(r, posInList);
Action action = new Action(r, posInList);
setNonce(ng, r, action);
actions.add(action);
}
@ -523,13 +523,13 @@ class AsyncProcess {
return ars;
}
private void setNonce(NonceGenerator ng, Row r, Action<Row> action) {
private void setNonce(NonceGenerator ng, Row r, Action action) {
if (!(r instanceof Append) && !(r instanceof Increment)) return;
action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled.
}
protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
TableName tableName, List<Action> actions, long nonceGroup, ExecutorService pool,
Batch.Callback<CResult> callback, Object[] results, boolean needResults,
CancellableRegionServerCallable callable, int rpcTimeout) {
return new AsyncRequestFutureImpl<CResult>(

View File

@ -82,9 +82,9 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
*/
private final class ReplicaCallIssuingRunnable implements Runnable {
private final long startTime;
private final List<Action<Row>> initialActions;
private final List<Action> initialActions;
public ReplicaCallIssuingRunnable(List<Action<Row>> initialActions, long startTime) {
public ReplicaCallIssuingRunnable(List<Action> initialActions, long startTime) {
this.initialActions = initialActions;
this.startTime = startTime;
}
@ -101,9 +101,9 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
}
}
if (done) return; // Done within primary timeout
Map<ServerName, MultiAction<Row>> actionsByServer =
new HashMap<ServerName, MultiAction<Row>>();
List<Action<Row>> unknownLocActions = new ArrayList<Action<Row>>();
Map<ServerName, MultiAction> actionsByServer =
new HashMap<ServerName, MultiAction>();
List<Action> unknownLocActions = new ArrayList<Action>();
if (replicaGetIndices == null) {
for (int i = 0; i < results.length; ++i) {
addReplicaActions(i, actionsByServer, unknownLocActions);
@ -117,8 +117,8 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty());
}
if (!unknownLocActions.isEmpty()) {
actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
for (Action<Row> action : unknownLocActions) {
actionsByServer = new HashMap<ServerName, MultiAction>();
for (Action action : unknownLocActions) {
addReplicaActionsAgain(action, actionsByServer);
}
// Some actions may have completely failed, they are handled inside addAgain.
@ -133,10 +133,10 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
* @param index Index of the original action.
* @param actionsByServer The map by server to add it to.
*/
private void addReplicaActions(int index, Map<ServerName, MultiAction<Row>> actionsByServer,
List<Action<Row>> unknownReplicaActions) {
private void addReplicaActions(int index, Map<ServerName, MultiAction> actionsByServer,
List<Action> unknownReplicaActions) {
if (results[index] != null) return; // opportunistic. Never goes from non-null to null.
Action<Row> action = initialActions.get(index);
Action action = initialActions.get(index);
RegionLocations loc = findAllLocationsOrFail(action, true);
if (loc == null) return;
HRegionLocation[] locs = loc.getRegionLocations();
@ -154,7 +154,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
results[index] = new ReplicaResultState(locs.length);
}
for (int i = 1; i < locs.length; ++i) {
Action<Row> replicaAction = new Action<Row>(action, i);
Action replicaAction = new Action(action, i);
if (locs[i] != null) {
asyncProcess.addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(),
replicaAction, actionsByServer, nonceGroup);
@ -165,7 +165,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
}
private void addReplicaActionsAgain(
Action<Row> action, Map<ServerName, MultiAction<Row>> actionsByServer) {
Action action, Map<ServerName, MultiAction> actionsByServer) {
if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
throw new AssertionError("Cannot have default replica here");
}
@ -181,13 +181,13 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
* single server. The server call is synchronous, therefore we do it on a thread pool.
*/
private final class SingleServerRequestRunnable implements Runnable {
private final MultiAction<Row> multiAction;
private final MultiAction multiAction;
private final int numAttempt;
private final ServerName server;
private final Set<CancellableRegionServerCallable> callsInProgress;
private Long heapSize = null;
private SingleServerRequestRunnable(
MultiAction<Row> multiAction, int numAttempt, ServerName server,
MultiAction multiAction, int numAttempt, ServerName server,
Set<CancellableRegionServerCallable> callsInProgress) {
this.multiAction = multiAction;
this.numAttempt = numAttempt;
@ -201,9 +201,9 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
return heapSize;
}
heapSize = 0L;
for (Map.Entry<byte[], List<Action<Row>>> e: this.multiAction.actions.entrySet()) {
List<Action<Row>> actions = e.getValue();
for (Action<Row> action: actions) {
for (Map.Entry<byte[], List<Action>> e: this.multiAction.actions.entrySet()) {
List<Action> actions = e.getValue();
for (Action action: actions) {
Row row = action.getAction();
if (row instanceof Mutation) {
heapSize += ((Mutation) row).heapSize();
@ -340,7 +340,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
public AsyncRequestFutureImpl(TableName tableName, List<Action> actions, long nonceGroup,
ExecutorService pool, boolean needResults, Object[] results, Batch.Callback<CResult> callback,
CancellableRegionServerCallable callable, int operationTimeout, int rpcTimeout,
AsyncProcess asyncProcess) {
@ -371,7 +371,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
// hasAnyReplicaGets to true.
boolean hasAnyNonReplicaReqs = false;
int posInList = 0;
for (Action<Row> action : actions) {
for (Action action : actions) {
boolean isReplicaGet = AsyncProcess.isReplicaGet(action.getAction());
if (isReplicaGet) {
hasAnyReplicaGets = true;
@ -447,13 +447,13 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
* @param currentActions - the list of row to submit
* @param numAttempt - the current numAttempt (first attempt is 1)
*/
void groupAndSendMultiAction(List<Action<Row>> currentActions, int numAttempt) {
Map<ServerName, MultiAction<Row>> actionsByServer =
new HashMap<ServerName, MultiAction<Row>>();
void groupAndSendMultiAction(List<Action> currentActions, int numAttempt) {
Map<ServerName, MultiAction> actionsByServer =
new HashMap<ServerName, MultiAction>();
boolean isReplica = false;
List<Action<Row>> unknownReplicaActions = null;
for (Action<Row> action : currentActions) {
List<Action> unknownReplicaActions = null;
for (Action action : currentActions) {
RegionLocations locs = findAllLocationsOrFail(action, true);
if (locs == null) continue;
boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId());
@ -466,7 +466,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
if (loc == null || loc.getServerName() == null) {
if (isReplica) {
if (unknownReplicaActions == null) {
unknownReplicaActions = new ArrayList<Action<Row>>();
unknownReplicaActions = new ArrayList<Action>();
}
unknownReplicaActions.add(action);
} else {
@ -488,8 +488,8 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
}
if (hasUnknown) {
actionsByServer = new HashMap<ServerName, MultiAction<Row>>();
for (Action<Row> action : unknownReplicaActions) {
actionsByServer = new HashMap<ServerName, MultiAction>();
for (Action action : unknownReplicaActions) {
HRegionLocation loc = getReplicaLocationOrFail(action);
if (loc == null) continue;
byte[] regionName = loc.getRegionInfo().getRegionName();
@ -502,7 +502,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
}
}
private HRegionLocation getReplicaLocationOrFail(Action<Row> action) {
private HRegionLocation getReplicaLocationOrFail(Action action) {
// We are going to try get location once again. For each action, we'll do it once
// from cache, because the previous calls in the loop might populate it.
int replicaId = action.getReplicaId();
@ -521,7 +521,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
return loc;
}
private void manageLocationError(Action<Row> action, Exception ex) {
private void manageLocationError(Action action, Exception ex) {
String msg = "Cannot get replica " + action.getReplicaId()
+ " location for " + action.getAction();
LOG.error(msg);
@ -532,7 +532,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
Retry.NO_LOCATION_PROBLEM, ex, null);
}
private RegionLocations findAllLocationsOrFail(Action<Row> action, boolean useCache) {
private RegionLocations findAllLocationsOrFail(Action action, boolean useCache) {
if (action.getAction() == null) throw new IllegalArgumentException("#" + asyncProcess.id +
", row cannot be null");
RegionLocations loc = null;
@ -553,15 +553,15 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
* @param numAttempt the attempt number.
* @param actionsForReplicaThread original actions for replica thread; null on non-first call.
*/
void sendMultiAction(Map<ServerName, MultiAction<Row>> actionsByServer,
int numAttempt, List<Action<Row>> actionsForReplicaThread, boolean reuseThread) {
void sendMultiAction(Map<ServerName, MultiAction> actionsByServer,
int numAttempt, List<Action> actionsForReplicaThread, boolean reuseThread) {
// Run the last item on the same thread if we are already on a send thread.
// We hope most of the time it will be the only item, so we can cut down on threads.
int actionsRemaining = actionsByServer.size();
// This iteration is by server (the HRegionLocation comparator is by server portion only).
for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) {
for (Map.Entry<ServerName, MultiAction> e : actionsByServer.entrySet()) {
ServerName server = e.getKey();
MultiAction<Row> multiAction = e.getValue();
MultiAction multiAction = e.getValue();
Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction,
numAttempt);
// make sure we correctly count the number of runnables before we try to reuse the send
@ -602,7 +602,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
}
private Collection<? extends Runnable> getNewMultiActionRunnable(ServerName server,
MultiAction<Row> multiAction,
MultiAction multiAction,
int numAttempt) {
// no stats to manage, just do the standard action
if (asyncProcess.connection.getStatisticsTracker() == null) {
@ -620,7 +620,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
.size());
// split up the actions
for (Map.Entry<byte[], List<Action<Row>>> e : multiAction.actions.entrySet()) {
for (Map.Entry<byte[], List<Action>> e : multiAction.actions.entrySet()) {
Long backoff = getBackoff(server, e.getKey());
DelayingRunner runner = actions.get(backoff);
if (runner == null) {
@ -673,7 +673,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
/**
* Starts waiting to issue replica calls on a different thread; or issues them immediately.
*/
private void startWaitingForReplicaCalls(List<Action<Row>> actionsForReplicaThread) {
private void startWaitingForReplicaCalls(List<Action> actionsForReplicaThread) {
long startTime = EnvironmentEdgeManager.currentTime();
ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
actionsForReplicaThread, startTime);
@ -726,7 +726,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
* @param t the throwable (if any) that caused the resubmit
*/
private void receiveGlobalFailure(
MultiAction<Row> rsActions, ServerName server, int numAttempt, Throwable t) {
MultiAction rsActions, ServerName server, int numAttempt, Throwable t) {
errorsByServer.reportServerError(server);
Retry canRetry = errorsByServer.canTryMore(numAttempt)
? Retry.YES : Retry.NO_RETRIES_EXHAUSTED;
@ -736,8 +736,8 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
asyncProcess.connection.clearCaches(server);
}
int failed = 0, stopped = 0;
List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) {
List<Action> toReplay = new ArrayList<Action>();
for (Map.Entry<byte[], List<Action>> e : rsActions.actions.entrySet()) {
byte[] regionName = e.getKey();
byte[] row = e.getValue().iterator().next().getAction().getRow();
// Do not use the exception for updating cache because it might be coming from
@ -752,7 +752,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
// we still process errors
LOG.error("Couldn't update cached region locations: " + ex);
}
for (Action<Row> action : e.getValue()) {
for (Action action : e.getValue()) {
Retry retry = manageError(
action.getOriginalIndex(), action.getAction(), canRetry, t, server);
if (retry == Retry.YES) {
@ -776,7 +776,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
* Log as much info as possible, and, if there is something to replay,
* submit it again after a back off sleep.
*/
private void resubmit(ServerName oldServer, List<Action<Row>> toReplay,
private void resubmit(ServerName oldServer, List<Action> toReplay,
int numAttempt, int failureCount, Throwable throwable) {
// We have something to replay. We're going to sleep a little before.
@ -833,7 +833,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
* @param responses - the response, if any
* @param numAttempt - the attempt
*/
private void receiveMultiAction(MultiAction<Row> multiAction,
private void receiveMultiAction(MultiAction multiAction,
ServerName server, MultiResponse responses, int numAttempt) {
assert responses != null;
@ -843,7 +843,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
// - DoNotRetryIOException: we continue to retry for other actions
// - RegionMovedException: we update the cache with the new region location
List<Action<Row>> toReplay = new ArrayList<Action<Row>>();
List<Action> toReplay = new ArrayList<Action>();
Throwable throwable = null;
int failureCount = 0;
boolean canRetry = true;
@ -853,7 +853,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
int failed = 0, stopped = 0;
// Go by original action.
for (Map.Entry<byte[], List<Action<Row>>> regionEntry : multiAction.actions.entrySet()) {
for (Map.Entry<byte[], List<Action>> regionEntry : multiAction.actions.entrySet()) {
byte[] regionName = regionEntry.getKey();
Map<Integer, Object> regionResults = results.get(regionName) == null
? null : results.get(regionName).result;
@ -866,7 +866,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
continue;
}
boolean regionFailureRegistered = false;
for (Action<Row> sentAction : regionEntry.getValue()) {
for (Action sentAction : regionEntry.getValue()) {
Object result = regionResults.get(sentAction.getOriginalIndex());
// Failure: retry if it's make sense else update the errors lists
if (result == null || result instanceof Throwable) {
@ -920,7 +920,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
for (Map.Entry<byte[], Throwable> throwableEntry : responses.getExceptions().entrySet()) {
throwable = throwableEntry.getValue();
byte[] region = throwableEntry.getKey();
List<Action<Row>> actions = multiAction.actions.get(region);
List<Action> actions = multiAction.actions.get(region);
if (actions == null || actions.isEmpty()) {
throw new IllegalStateException("Wrong response for the region: " +
HRegionInfo.encodeRegionName(region));
@ -947,7 +947,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
}
failureCount += actions.size();
for (Action<Row> action : actions) {
for (Action action : actions) {
Row row = action.getAction();
Retry retry = manageError(action.getOriginalIndex(), row,
canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server);
@ -1024,7 +1024,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
* @param action Action (request) that the server responded to.
* @param result The result.
*/
private void setResult(Action<Row> action, Object result) {
private void setResult(Action action, Object result) {
if (result == null) {
throw new RuntimeException("Result cannot be null");
}
@ -1287,9 +1287,9 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
/**
* Create a callable. Isolated to be easily overridden in the tests.
*/
private MultiServerCallable<Row> createCallable(final ServerName server, TableName tableName,
final MultiAction<Row> multi) {
return new MultiServerCallable<Row>(asyncProcess.connection, tableName, server,
private MultiServerCallable createCallable(final ServerName server, TableName tableName,
final MultiAction multi) {
return new MultiServerCallable(asyncProcess.connection, tableName, server,
multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker);
}
}

View File

@ -39,16 +39,16 @@ import java.util.Map;
* </p>
*/
@InterfaceAudience.Private
public class DelayingRunner<T> implements Runnable {
public class DelayingRunner implements Runnable {
private static final Log LOG = LogFactory.getLog(DelayingRunner.class);
private final Object sleepLock = new Object();
private boolean triggerWake = false;
private long sleepTime;
private MultiAction<T> actions = new MultiAction<T>();
private MultiAction actions = new MultiAction();
private Runnable runnable;
public DelayingRunner(long sleepTime, Map.Entry<byte[], List<Action<T>>> e) {
public DelayingRunner(long sleepTime, Map.Entry<byte[], List<Action>> e) {
this.sleepTime = sleepTime;
add(e);
}
@ -102,11 +102,11 @@ public class DelayingRunner<T> implements Runnable {
return true;
}
public void add(Map.Entry<byte[], List<Action<T>>> e) {
public void add(Map.Entry<byte[], List<Action>> e) {
actions.add(e.getKey(), e.getValue());
}
public MultiAction<T> getActions() {
public MultiAction getActions() {
return actions;
}

View File

@ -578,11 +578,11 @@ public class HTableMultiplexer {
// failedCount is decreased whenever a Put is success or resubmit.
failedCount = processingList.size();
List<Action<Row>> retainedActions = new ArrayList<>(processingList.size());
MultiAction<Row> actions = new MultiAction<>();
List<Action> retainedActions = new ArrayList<>(processingList.size());
MultiAction actions = new MultiAction();
for (int i = 0; i < processingList.size(); i++) {
PutStatus putStatus = processingList.get(i);
Action<Row> action = new Action<Row>(putStatus.put, i);
Action action = new Action(putStatus.put, i);
actions.add(putStatus.regionInfo.getRegionName(), action);
retainedActions.add(action);
}
@ -591,7 +591,7 @@ public class HTableMultiplexer {
List<PutStatus> failed = null;
Object[] results = new Object[actions.size()];
ServerName server = addr.getServerName();
Map<ServerName, MultiAction<Row>> actionsByServer =
Map<ServerName, MultiAction> actionsByServer =
Collections.singletonMap(server, actions);
try {
AsyncRequestFuture arf =

View File

@ -34,11 +34,11 @@ import org.apache.hadoop.hbase.util.Bytes;
* regionName. Intended to be used with {@link AsyncProcess}.
*/
@InterfaceAudience.Private
public final class MultiAction<R> {
public final class MultiAction {
// TODO: This class should not be visible outside of the client package.
// map of regions to lists of puts/gets/deletes for that region.
protected Map<byte[], List<Action<R>>> actions = new TreeMap<>(Bytes.BYTES_COMPARATOR);
protected Map<byte[], List<Action>> actions = new TreeMap<>(Bytes.BYTES_COMPARATOR);
private long nonceGroup = HConstants.NO_NONCE;
@ -67,7 +67,7 @@ public final class MultiAction<R> {
* @param regionName
* @param a
*/
public void add(byte[] regionName, Action<R> a) {
public void add(byte[] regionName, Action a) {
add(regionName, Arrays.asList(a));
}
@ -79,10 +79,10 @@ public final class MultiAction<R> {
* @param regionName
* @param actionList list of actions to add for the region
*/
public void add(byte[] regionName, List<Action<R>> actionList){
List<Action<R>> rsActions = actions.get(regionName);
public void add(byte[] regionName, List<Action> actionList){
List<Action> rsActions = actions.get(regionName);
if (rsActions == null) {
rsActions = new ArrayList<Action<R>>(actionList.size());
rsActions = new ArrayList<Action>(actionList.size());
actions.put(regionName, rsActions);
}
rsActions.addAll(actionList);

View File

@ -49,12 +49,12 @@ import com.google.common.annotations.VisibleForTesting;
* @param <R>
*/
@InterfaceAudience.Private
class MultiServerCallable<R> extends CancellableRegionServerCallable<MultiResponse> {
private MultiAction<R> multiAction;
class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse> {
private MultiAction multiAction;
private boolean cellBlock;
MultiServerCallable(final ClusterConnection connection, final TableName tableName,
final ServerName location, final MultiAction<R> multi, RpcController rpcController,
final ServerName location, final MultiAction multi, RpcController rpcController,
int rpcTimeout, RetryingTimeTracker tracker) {
super(connection, tableName, null, rpcController, rpcTimeout, tracker);
this.multiAction = multi;
@ -65,7 +65,7 @@ class MultiServerCallable<R> extends CancellableRegionServerCallable<MultiRespon
this.cellBlock = isCellBlock();
}
public void reset(ServerName location, MultiAction<R> multiAction) {
public void reset(ServerName location, MultiAction multiAction) {
this.location = new HRegionLocation(null, location);
this.multiAction = multiAction;
this.cellBlock = isCellBlock();
@ -81,7 +81,7 @@ class MultiServerCallable<R> extends CancellableRegionServerCallable<MultiRespon
throw new RuntimeException("Cannot get region info for multi-region request");
}
MultiAction<R> getMulti() {
MultiAction getMulti() {
return this.multiAction;
}
@ -99,9 +99,9 @@ class MultiServerCallable<R> extends CancellableRegionServerCallable<MultiRespon
if (nonceGroup != HConstants.NO_NONCE) {
multiRequestBuilder.setNonceGroup(nonceGroup);
}
for (Map.Entry<byte[], List<Action<R>>> e: this.multiAction.actions.entrySet()) {
for (Map.Entry<byte[], List<Action>> e: this.multiAction.actions.entrySet()) {
final byte [] regionName = e.getKey();
final List<Action<R>> actions = e.getValue();
final List<Action> actions = e.getValue();
regionActionBuilder.clear();
regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier(
HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName));

View File

@ -587,11 +587,11 @@ public final class RequestConverter {
* @return a multi request
* @throws IOException
*/
public static <R> RegionAction.Builder buildRegionAction(final byte[] regionName,
final List<Action<R>> actions, final RegionAction.Builder regionActionBuilder,
public static RegionAction.Builder buildRegionAction(final byte[] regionName,
final List<Action> actions, final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
final MutationProto.Builder mutationBuilder) throws IOException {
for (Action<R> action: actions) {
for (Action action: actions) {
Row row = action.getAction();
actionBuilder.clear();
actionBuilder.setIndex(action.getOriginalIndex());
@ -648,14 +648,14 @@ public final class RequestConverter {
* @return a multi request that does not carry any data.
* @throws IOException
*/
public static <R> RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
final List<Action<R>> actions, final List<CellScannable> cells,
public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName,
final List<Action> actions, final List<CellScannable> cells,
final RegionAction.Builder regionActionBuilder,
final ClientProtos.Action.Builder actionBuilder,
final MutationProto.Builder mutationBuilder) throws IOException {
RegionAction.Builder builder = getRegionActionBuilderWithRegion(
RegionAction.newBuilder(), regionName);
for (Action<R> action: actions) {
for (Action action: actions) {
Row row = action.getAction();
actionBuilder.clear();
actionBuilder.setIndex(action.getOriginalIndex());

View File

@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.client.AsyncProcess.SubmittedSizeChecker;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -159,7 +160,7 @@ public class TestAsyncProcess {
private long previousTimeout = -1;
@Override
protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
List<Action> actions, long nonceGroup, ExecutorService pool,
Batch.Callback<Res> callback, Object[] results, boolean needResults,
CancellableRegionServerCallable callable, int curTimeout) {
// Test HTable has tableName of null, so pass DUMMY_TABLE
@ -226,7 +227,7 @@ public class TestAsyncProcess {
callable1.getMulti(), nbMultiResponse, nbActions,
new ResponseGenerator() {
@Override
public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
if (Arrays.equals(FAILS, a.getAction().getRow())) {
mr.add(regionName, a.getOriginalIndex(), failure);
} else {
@ -259,7 +260,7 @@ public class TestAsyncProcess {
static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> {
public MyAsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
public MyAsyncRequestFutureImpl(TableName tableName, List<Action> actions, long nonceGroup,
ExecutorService pool, boolean needResults, Object[] results,
Batch.Callback callback, CancellableRegionServerCallable callable, int operationTimeout,
int rpcTimeout, AsyncProcess asyncProcess) {
@ -357,11 +358,11 @@ public class TestAsyncProcess {
@Override
protected RpcRetryingCaller<AbstractResponse> createCaller(
CancellableRegionServerCallable payloadCallable, int rpcTimeout) {
MultiServerCallable<Row> callable = (MultiServerCallable) payloadCallable;
MultiServerCallable callable = (MultiServerCallable) payloadCallable;
final MultiResponse mr = createMultiResponse(
callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
@Override
public void addResponse(MultiResponse mr, byte[] regionName, Action<Row> a) {
public void addResponse(MultiResponse mr, byte[] regionName, Action a) {
if (failures.contains(regionName)) {
mr.add(regionName, a.getOriginalIndex(), failure);
} else {
@ -374,7 +375,7 @@ public class TestAsyncProcess {
// Currently AsyncProcess either sends all-replica, or all-primary request.
final boolean isDefault = RegionReplicaUtil.isDefaultReplica(
callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId());
final ServerName server = ((MultiServerCallable<?>)callable).getServerName();
final ServerName server = ((MultiServerCallable)callable).getServerName();
String debugMsg = "Call to " + server + ", primary=" + isDefault + " with "
+ callable.getMulti().actions.size() + " entries: ";
for (byte[] region : callable.getMulti().actions.keySet()) {
@ -409,13 +410,13 @@ public class TestAsyncProcess {
}
}
static MultiResponse createMultiResponse(final MultiAction<Row> multi,
static MultiResponse createMultiResponse(final MultiAction multi,
AtomicInteger nbMultiResponse, AtomicInteger nbActions, ResponseGenerator gen) {
final MultiResponse mr = new MultiResponse();
nbMultiResponse.incrementAndGet();
for (Map.Entry<byte[], List<Action<Row>>> entry : multi.actions.entrySet()) {
for (Map.Entry<byte[], List<Action>> entry : multi.actions.entrySet()) {
byte[] regionName = entry.getKey();
for (Action<Row> a : entry.getValue()) {
for (Action a : entry.getValue()) {
nbActions.incrementAndGet();
gen.addResponse(mr, regionName, a);
}
@ -424,7 +425,7 @@ public class TestAsyncProcess {
}
private static interface ResponseGenerator {
void addResponse(final MultiResponse mr, byte[] regionName, Action<Row> a);
void addResponse(final MultiResponse mr, byte[] regionName, Action a);
}
/**
@ -1280,6 +1281,25 @@ public class TestAsyncProcess {
}
}
@Test
public void testAction() {
Action action_0 = new Action(new Put(Bytes.toBytes("abc")), 10);
Action action_1 = new Action(new Put(Bytes.toBytes("ccc")), 10);
Action action_2 = new Action(new Put(Bytes.toBytes("ccc")), 10);
Action action_3 = new Action(new Delete(Bytes.toBytes("ccc")), 10);
assertFalse(action_0.equals(action_1));
assertTrue(action_0.equals(action_0));
assertTrue(action_1.equals(action_2));
assertTrue(action_2.equals(action_1));
assertFalse(action_0.equals(new Put(Bytes.toBytes("abc"))));
assertTrue(action_2.equals(action_3));
assertFalse(action_0.equals(action_3));
assertEquals(0, action_0.compareTo(action_0));
assertEquals(-1, action_0.compareTo(action_1));
assertEquals(1, action_1.compareTo(action_0));
assertEquals(0, action_1.compareTo(action_2));
}
@Test
public void testBatch() throws IOException, InterruptedException {
ClusterConnection conn = new MyConnectionImpl(conf);

View File

@ -41,8 +41,8 @@ public class TestDelayingRunner {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testDelayingRunner() throws Exception{
MultiAction<Row> ma = new MultiAction<Row>();
ma.add(hri1.getRegionName(), new Action<Row>(new Put(DUMMY_BYTES_1), 0));
MultiAction ma = new MultiAction();
ma.add(hri1.getRegionName(), new Action(new Put(DUMMY_BYTES_1), 0));
final AtomicLong endTime = new AtomicLong();
final long sleepTime = 1000;
DelayingRunner runner = new DelayingRunner(sleepTime, ma.actions.entrySet().iterator().next());