From bb6cc4d43e36fa1d558c6ece2c5c6d1ed414db0c Mon Sep 17 00:00:00 2001 From: tedyu Date: Tue, 18 Oct 2016 09:47:25 -0700 Subject: [PATCH] HBASE-16854 Refactor the org.apache.hadoop.hbase.client.Action (ChiaPing Tsai) --- .../apache/hadoop/hbase/client/Action.java | 29 ++--- .../hadoop/hbase/client/AsyncProcess.java | 30 ++--- .../hbase/client/AsyncRequestFutureImpl.java | 104 +++++++++--------- .../hadoop/hbase/client/DelayingRunner.java | 10 +- .../hbase/client/HTableMultiplexer.java | 8 +- .../hadoop/hbase/client/MultiAction.java | 12 +- .../hbase/client/MultiServerCallable.java | 14 +-- .../shaded/protobuf/RequestConverter.java | 12 +- .../hadoop/hbase/client/TestAsyncProcess.java | 40 +++++-- .../hbase/client/TestDelayingRunner.java | 4 +- .../hadoop/hbase/CoordinatedStateManager.java | 2 +- 11 files changed, 140 insertions(+), 125 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java index 1c38349f771..ef059122960 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java @@ -22,21 +22,18 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; /** - * A Get, Put, Increment, Append, or Delete associated with it's region. Used internally by + * A Get, Put, Increment, Append, or Delete associated with it's region. Used internally by * {@link Table#batch} to associate the action with it's region and maintain - * the index from the original request. + * the index from the original request. */ @InterfaceAudience.Private -//TODO: R is never used -public class Action implements Comparable { - // TODO: This class should not be visible outside of the client package. - private Row action; - private int originalIndex; +public class Action implements Comparable { + private final Row action; + private final int originalIndex; private long nonce = HConstants.NO_NONCE; private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID; public Action(Row action, int originalIndex) { - super(); this.action = action; this.originalIndex = originalIndex; } @@ -46,15 +43,13 @@ public class Action implements Comparable { * @param action Original action. * @param replicaId Replica id for the new action. */ - public Action(Action action, int replicaId) { - super(); + public Action(Action action, int replicaId) { this.action = action.action; this.nonce = action.nonce; this.originalIndex = action.originalIndex; this.replicaId = replicaId; } - public void setNonce(long nonce) { this.nonce = nonce; } @@ -75,10 +70,9 @@ public class Action implements Comparable { return replicaId; } - @SuppressWarnings("rawtypes") @Override - public int compareTo(Object o) { - return action.compareTo(((Action) o).getAction()); + public int compareTo(Action other) { + return action.compareTo(other.getAction()); } @Override @@ -89,9 +83,10 @@ public class Action implements Comparable { @Override public boolean equals(Object obj) { if (this == obj) return true; - if (obj == null || getClass() != obj.getClass()) return false; - Action other = (Action) obj; - return compareTo(other) == 0; + if (obj instanceof Action) { + return compareTo((Action) obj) == 0; + } + return false; } public long getNonce() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index abefc4611c1..e653c80ed52 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -347,9 +347,9 @@ class AsyncProcess { return NO_REQS_RESULT; } - Map> actionsByServer = - new HashMap>(); - List> retainedActions = new ArrayList>(rows.size()); + Map actionsByServer = + new HashMap(); + List retainedActions = new ArrayList(rows.size()); NonceGenerator ng = this.connection.getNonceGenerator(); long nonceGroup = ng.getNonceGroup(); // Currently, nonce group is per entire client. @@ -388,7 +388,7 @@ class AsyncProcess { LOG.error("Failed to get region location ", ex); // This action failed before creating ars. Retain it, but do not add to submit list. // We will then add it to ars in an already-failed state. - retainedActions.add(new Action(r, ++posInList)); + retainedActions.add(new Action(r, ++posInList)); locationErrors.add(ex); locationErrorRows.add(posInList); it.remove(); @@ -400,7 +400,7 @@ class AsyncProcess { break; } if (code == ReturnCode.INCLUDE) { - Action action = new Action(r, ++posInList); + Action action = new Action(r, ++posInList); setNonce(ng, r, action); retainedActions.add(action); // TODO: replica-get is not supported on this path @@ -431,9 +431,9 @@ class AsyncProcess { )); } AsyncRequestFuture submitMultiActions(TableName tableName, - List> retainedActions, long nonceGroup, Batch.Callback callback, + List retainedActions, long nonceGroup, Batch.Callback callback, Object[] results, boolean needResults, List locationErrors, - List locationErrorRows, Map> actionsByServer, + List locationErrorRows, Map actionsByServer, ExecutorService pool) { AsyncRequestFutureImpl ars = createAsyncRequestFuture( tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, -1); @@ -467,11 +467,11 @@ class AsyncProcess { * @param actionsByServer the multiaction per server * @param nonceGroup Nonce group. */ - static void addAction(ServerName server, byte[] regionName, Action action, - Map> actionsByServer, long nonceGroup) { - MultiAction multiAction = actionsByServer.get(server); + static void addAction(ServerName server, byte[] regionName, Action action, + Map actionsByServer, long nonceGroup) { + MultiAction multiAction = actionsByServer.get(server); if (multiAction == null) { - multiAction = new MultiAction(); + multiAction = new MultiAction(); actionsByServer.put(server, multiAction); } if (action.hasNonce() && !multiAction.hasNonceGroup()) { @@ -499,7 +499,7 @@ class AsyncProcess { public AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, List rows, Batch.Callback callback, Object[] results, CancellableRegionServerCallable callable, int rpcTimeout) { - List> actions = new ArrayList>(rows.size()); + List actions = new ArrayList(rows.size()); // The position will be used by the processBatch to match the object array returned. int posInList = -1; @@ -512,7 +512,7 @@ class AsyncProcess { throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item"); } } - Action action = new Action(r, posInList); + Action action = new Action(r, posInList); setNonce(ng, r, action); actions.add(action); } @@ -523,13 +523,13 @@ class AsyncProcess { return ars; } - private void setNonce(NonceGenerator ng, Row r, Action action) { + private void setNonce(NonceGenerator ng, Row r, Action action) { if (!(r instanceof Append) && !(r instanceof Increment)) return; action.setNonce(ng.newNonce()); // Action handles NO_NONCE, so it's ok if ng is disabled. } protected AsyncRequestFutureImpl createAsyncRequestFuture( - TableName tableName, List> actions, long nonceGroup, ExecutorService pool, + TableName tableName, List actions, long nonceGroup, ExecutorService pool, Batch.Callback callback, Object[] results, boolean needResults, CancellableRegionServerCallable callable, int rpcTimeout) { return new AsyncRequestFutureImpl( diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index a2642f3198d..d48179b2c08 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -82,9 +82,9 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { */ private final class ReplicaCallIssuingRunnable implements Runnable { private final long startTime; - private final List> initialActions; + private final List initialActions; - public ReplicaCallIssuingRunnable(List> initialActions, long startTime) { + public ReplicaCallIssuingRunnable(List initialActions, long startTime) { this.initialActions = initialActions; this.startTime = startTime; } @@ -101,9 +101,9 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { } } if (done) return; // Done within primary timeout - Map> actionsByServer = - new HashMap>(); - List> unknownLocActions = new ArrayList>(); + Map actionsByServer = + new HashMap(); + List unknownLocActions = new ArrayList(); if (replicaGetIndices == null) { for (int i = 0; i < results.length; ++i) { addReplicaActions(i, actionsByServer, unknownLocActions); @@ -117,8 +117,8 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { sendMultiAction(actionsByServer, 1, null, unknownLocActions.isEmpty()); } if (!unknownLocActions.isEmpty()) { - actionsByServer = new HashMap>(); - for (Action action : unknownLocActions) { + actionsByServer = new HashMap(); + for (Action action : unknownLocActions) { addReplicaActionsAgain(action, actionsByServer); } // Some actions may have completely failed, they are handled inside addAgain. @@ -133,10 +133,10 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { * @param index Index of the original action. * @param actionsByServer The map by server to add it to. */ - private void addReplicaActions(int index, Map> actionsByServer, - List> unknownReplicaActions) { + private void addReplicaActions(int index, Map actionsByServer, + List unknownReplicaActions) { if (results[index] != null) return; // opportunistic. Never goes from non-null to null. - Action action = initialActions.get(index); + Action action = initialActions.get(index); RegionLocations loc = findAllLocationsOrFail(action, true); if (loc == null) return; HRegionLocation[] locs = loc.getRegionLocations(); @@ -154,7 +154,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { results[index] = new ReplicaResultState(locs.length); } for (int i = 1; i < locs.length; ++i) { - Action replicaAction = new Action(action, i); + Action replicaAction = new Action(action, i); if (locs[i] != null) { asyncProcess.addAction(locs[i].getServerName(), locs[i].getRegionInfo().getRegionName(), replicaAction, actionsByServer, nonceGroup); @@ -165,7 +165,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { } private void addReplicaActionsAgain( - Action action, Map> actionsByServer) { + Action action, Map actionsByServer) { if (action.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) { throw new AssertionError("Cannot have default replica here"); } @@ -181,13 +181,13 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { * single server. The server call is synchronous, therefore we do it on a thread pool. */ private final class SingleServerRequestRunnable implements Runnable { - private final MultiAction multiAction; + private final MultiAction multiAction; private final int numAttempt; private final ServerName server; private final Set callsInProgress; private Long heapSize = null; private SingleServerRequestRunnable( - MultiAction multiAction, int numAttempt, ServerName server, + MultiAction multiAction, int numAttempt, ServerName server, Set callsInProgress) { this.multiAction = multiAction; this.numAttempt = numAttempt; @@ -201,9 +201,9 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { return heapSize; } heapSize = 0L; - for (Map.Entry>> e: this.multiAction.actions.entrySet()) { - List> actions = e.getValue(); - for (Action action: actions) { + for (Map.Entry> e: this.multiAction.actions.entrySet()) { + List actions = e.getValue(); + for (Action action: actions) { Row row = action.getAction(); if (row instanceof Mutation) { heapSize += ((Mutation) row).heapSize(); @@ -340,7 +340,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { - public AsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, + public AsyncRequestFutureImpl(TableName tableName, List actions, long nonceGroup, ExecutorService pool, boolean needResults, Object[] results, Batch.Callback callback, CancellableRegionServerCallable callable, int operationTimeout, int rpcTimeout, AsyncProcess asyncProcess) { @@ -371,7 +371,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { // hasAnyReplicaGets to true. boolean hasAnyNonReplicaReqs = false; int posInList = 0; - for (Action action : actions) { + for (Action action : actions) { boolean isReplicaGet = AsyncProcess.isReplicaGet(action.getAction()); if (isReplicaGet) { hasAnyReplicaGets = true; @@ -447,13 +447,13 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { * @param currentActions - the list of row to submit * @param numAttempt - the current numAttempt (first attempt is 1) */ - void groupAndSendMultiAction(List> currentActions, int numAttempt) { - Map> actionsByServer = - new HashMap>(); + void groupAndSendMultiAction(List currentActions, int numAttempt) { + Map actionsByServer = + new HashMap(); boolean isReplica = false; - List> unknownReplicaActions = null; - for (Action action : currentActions) { + List unknownReplicaActions = null; + for (Action action : currentActions) { RegionLocations locs = findAllLocationsOrFail(action, true); if (locs == null) continue; boolean isReplicaAction = !RegionReplicaUtil.isDefaultReplica(action.getReplicaId()); @@ -466,7 +466,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { if (loc == null || loc.getServerName() == null) { if (isReplica) { if (unknownReplicaActions == null) { - unknownReplicaActions = new ArrayList>(); + unknownReplicaActions = new ArrayList(); } unknownReplicaActions.add(action); } else { @@ -488,8 +488,8 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { } if (hasUnknown) { - actionsByServer = new HashMap>(); - for (Action action : unknownReplicaActions) { + actionsByServer = new HashMap(); + for (Action action : unknownReplicaActions) { HRegionLocation loc = getReplicaLocationOrFail(action); if (loc == null) continue; byte[] regionName = loc.getRegionInfo().getRegionName(); @@ -502,7 +502,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { } } - private HRegionLocation getReplicaLocationOrFail(Action action) { + private HRegionLocation getReplicaLocationOrFail(Action action) { // We are going to try get location once again. For each action, we'll do it once // from cache, because the previous calls in the loop might populate it. int replicaId = action.getReplicaId(); @@ -521,7 +521,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { return loc; } - private void manageLocationError(Action action, Exception ex) { + private void manageLocationError(Action action, Exception ex) { String msg = "Cannot get replica " + action.getReplicaId() + " location for " + action.getAction(); LOG.error(msg); @@ -532,7 +532,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { Retry.NO_LOCATION_PROBLEM, ex, null); } - private RegionLocations findAllLocationsOrFail(Action action, boolean useCache) { + private RegionLocations findAllLocationsOrFail(Action action, boolean useCache) { if (action.getAction() == null) throw new IllegalArgumentException("#" + asyncProcess.id + ", row cannot be null"); RegionLocations loc = null; @@ -553,15 +553,15 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { * @param numAttempt the attempt number. * @param actionsForReplicaThread original actions for replica thread; null on non-first call. */ - void sendMultiAction(Map> actionsByServer, - int numAttempt, List> actionsForReplicaThread, boolean reuseThread) { + void sendMultiAction(Map actionsByServer, + int numAttempt, List actionsForReplicaThread, boolean reuseThread) { // Run the last item on the same thread if we are already on a send thread. // We hope most of the time it will be the only item, so we can cut down on threads. int actionsRemaining = actionsByServer.size(); // This iteration is by server (the HRegionLocation comparator is by server portion only). - for (Map.Entry> e : actionsByServer.entrySet()) { + for (Map.Entry e : actionsByServer.entrySet()) { ServerName server = e.getKey(); - MultiAction multiAction = e.getValue(); + MultiAction multiAction = e.getValue(); Collection runnables = getNewMultiActionRunnable(server, multiAction, numAttempt); // make sure we correctly count the number of runnables before we try to reuse the send @@ -602,7 +602,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { } private Collection getNewMultiActionRunnable(ServerName server, - MultiAction multiAction, + MultiAction multiAction, int numAttempt) { // no stats to manage, just do the standard action if (asyncProcess.connection.getStatisticsTracker() == null) { @@ -620,7 +620,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { .size()); // split up the actions - for (Map.Entry>> e : multiAction.actions.entrySet()) { + for (Map.Entry> e : multiAction.actions.entrySet()) { Long backoff = getBackoff(server, e.getKey()); DelayingRunner runner = actions.get(backoff); if (runner == null) { @@ -673,7 +673,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { /** * Starts waiting to issue replica calls on a different thread; or issues them immediately. */ - private void startWaitingForReplicaCalls(List> actionsForReplicaThread) { + private void startWaitingForReplicaCalls(List actionsForReplicaThread) { long startTime = EnvironmentEdgeManager.currentTime(); ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable( actionsForReplicaThread, startTime); @@ -726,7 +726,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { * @param t the throwable (if any) that caused the resubmit */ private void receiveGlobalFailure( - MultiAction rsActions, ServerName server, int numAttempt, Throwable t) { + MultiAction rsActions, ServerName server, int numAttempt, Throwable t) { errorsByServer.reportServerError(server); Retry canRetry = errorsByServer.canTryMore(numAttempt) ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED; @@ -736,8 +736,8 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { asyncProcess.connection.clearCaches(server); } int failed = 0, stopped = 0; - List> toReplay = new ArrayList>(); - for (Map.Entry>> e : rsActions.actions.entrySet()) { + List toReplay = new ArrayList(); + for (Map.Entry> e : rsActions.actions.entrySet()) { byte[] regionName = e.getKey(); byte[] row = e.getValue().iterator().next().getAction().getRow(); // Do not use the exception for updating cache because it might be coming from @@ -752,7 +752,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { // we still process errors LOG.error("Couldn't update cached region locations: " + ex); } - for (Action action : e.getValue()) { + for (Action action : e.getValue()) { Retry retry = manageError( action.getOriginalIndex(), action.getAction(), canRetry, t, server); if (retry == Retry.YES) { @@ -776,7 +776,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { * Log as much info as possible, and, if there is something to replay, * submit it again after a back off sleep. */ - private void resubmit(ServerName oldServer, List> toReplay, + private void resubmit(ServerName oldServer, List toReplay, int numAttempt, int failureCount, Throwable throwable) { // We have something to replay. We're going to sleep a little before. @@ -833,7 +833,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { * @param responses - the response, if any * @param numAttempt - the attempt */ - private void receiveMultiAction(MultiAction multiAction, + private void receiveMultiAction(MultiAction multiAction, ServerName server, MultiResponse responses, int numAttempt) { assert responses != null; @@ -843,7 +843,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { // - DoNotRetryIOException: we continue to retry for other actions // - RegionMovedException: we update the cache with the new region location - List> toReplay = new ArrayList>(); + List toReplay = new ArrayList(); Throwable throwable = null; int failureCount = 0; boolean canRetry = true; @@ -853,7 +853,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { int failed = 0, stopped = 0; // Go by original action. - for (Map.Entry>> regionEntry : multiAction.actions.entrySet()) { + for (Map.Entry> regionEntry : multiAction.actions.entrySet()) { byte[] regionName = regionEntry.getKey(); Map regionResults = results.get(regionName) == null ? null : results.get(regionName).result; @@ -866,7 +866,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { continue; } boolean regionFailureRegistered = false; - for (Action sentAction : regionEntry.getValue()) { + for (Action sentAction : regionEntry.getValue()) { Object result = regionResults.get(sentAction.getOriginalIndex()); // Failure: retry if it's make sense else update the errors lists if (result == null || result instanceof Throwable) { @@ -920,7 +920,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { for (Map.Entry throwableEntry : responses.getExceptions().entrySet()) { throwable = throwableEntry.getValue(); byte[] region = throwableEntry.getKey(); - List> actions = multiAction.actions.get(region); + List actions = multiAction.actions.get(region); if (actions == null || actions.isEmpty()) { throw new IllegalStateException("Wrong response for the region: " + HRegionInfo.encodeRegionName(region)); @@ -947,7 +947,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { } failureCount += actions.size(); - for (Action action : actions) { + for (Action action : actions) { Row row = action.getAction(); Retry retry = manageError(action.getOriginalIndex(), row, canRetry ? Retry.YES : Retry.NO_RETRIES_EXHAUSTED, throwable, server); @@ -1024,7 +1024,7 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { * @param action Action (request) that the server responded to. * @param result The result. */ - private void setResult(Action action, Object result) { + private void setResult(Action action, Object result) { if (result == null) { throw new RuntimeException("Result cannot be null"); } @@ -1287,9 +1287,9 @@ class AsyncRequestFutureImpl implements AsyncRequestFuture { /** * Create a callable. Isolated to be easily overridden in the tests. */ - private MultiServerCallable createCallable(final ServerName server, TableName tableName, - final MultiAction multi) { - return new MultiServerCallable(asyncProcess.connection, tableName, server, + private MultiServerCallable createCallable(final ServerName server, TableName tableName, + final MultiAction multi) { + return new MultiServerCallable(asyncProcess.connection, tableName, server, multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker); } } \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java index 83c73b6a36a..9a2db038344 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelayingRunner.java @@ -39,16 +39,16 @@ import java.util.Map; *

*/ @InterfaceAudience.Private -public class DelayingRunner implements Runnable { +public class DelayingRunner implements Runnable { private static final Log LOG = LogFactory.getLog(DelayingRunner.class); private final Object sleepLock = new Object(); private boolean triggerWake = false; private long sleepTime; - private MultiAction actions = new MultiAction(); + private MultiAction actions = new MultiAction(); private Runnable runnable; - public DelayingRunner(long sleepTime, Map.Entry>> e) { + public DelayingRunner(long sleepTime, Map.Entry> e) { this.sleepTime = sleepTime; add(e); } @@ -102,11 +102,11 @@ public class DelayingRunner implements Runnable { return true; } - public void add(Map.Entry>> e) { + public void add(Map.Entry> e) { actions.add(e.getKey(), e.getValue()); } - public MultiAction getActions() { + public MultiAction getActions() { return actions; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index e8379efb179..8ff64bf17e1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -578,11 +578,11 @@ public class HTableMultiplexer { // failedCount is decreased whenever a Put is success or resubmit. failedCount = processingList.size(); - List> retainedActions = new ArrayList<>(processingList.size()); - MultiAction actions = new MultiAction<>(); + List retainedActions = new ArrayList<>(processingList.size()); + MultiAction actions = new MultiAction(); for (int i = 0; i < processingList.size(); i++) { PutStatus putStatus = processingList.get(i); - Action action = new Action(putStatus.put, i); + Action action = new Action(putStatus.put, i); actions.add(putStatus.regionInfo.getRegionName(), action); retainedActions.add(action); } @@ -591,7 +591,7 @@ public class HTableMultiplexer { List failed = null; Object[] results = new Object[actions.size()]; ServerName server = addr.getServerName(); - Map> actionsByServer = + Map actionsByServer = Collections.singletonMap(server, actions); try { AsyncRequestFuture arf = diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java index 6d155ca243e..ef1fce9cf49 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java @@ -34,11 +34,11 @@ import org.apache.hadoop.hbase.util.Bytes; * regionName. Intended to be used with {@link AsyncProcess}. */ @InterfaceAudience.Private -public final class MultiAction { +public final class MultiAction { // TODO: This class should not be visible outside of the client package. // map of regions to lists of puts/gets/deletes for that region. - protected Map>> actions = new TreeMap<>(Bytes.BYTES_COMPARATOR); + protected Map> actions = new TreeMap<>(Bytes.BYTES_COMPARATOR); private long nonceGroup = HConstants.NO_NONCE; @@ -67,7 +67,7 @@ public final class MultiAction { * @param regionName * @param a */ - public void add(byte[] regionName, Action a) { + public void add(byte[] regionName, Action a) { add(regionName, Arrays.asList(a)); } @@ -79,10 +79,10 @@ public final class MultiAction { * @param regionName * @param actionList list of actions to add for the region */ - public void add(byte[] regionName, List> actionList){ - List> rsActions = actions.get(regionName); + public void add(byte[] regionName, List actionList){ + List rsActions = actions.get(regionName); if (rsActions == null) { - rsActions = new ArrayList>(actionList.size()); + rsActions = new ArrayList(actionList.size()); actions.put(regionName, rsActions); } rsActions.addAll(actionList); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 7d50a277acf..c4adf347c5e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -49,12 +49,12 @@ import com.google.common.annotations.VisibleForTesting; * @param */ @InterfaceAudience.Private -class MultiServerCallable extends CancellableRegionServerCallable { - private MultiAction multiAction; +class MultiServerCallable extends CancellableRegionServerCallable { + private MultiAction multiAction; private boolean cellBlock; MultiServerCallable(final ClusterConnection connection, final TableName tableName, - final ServerName location, final MultiAction multi, RpcController rpcController, + final ServerName location, final MultiAction multi, RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker) { super(connection, tableName, null, rpcController, rpcTimeout, tracker); this.multiAction = multi; @@ -65,7 +65,7 @@ class MultiServerCallable extends CancellableRegionServerCallable multiAction) { + public void reset(ServerName location, MultiAction multiAction) { this.location = new HRegionLocation(null, location); this.multiAction = multiAction; this.cellBlock = isCellBlock(); @@ -81,7 +81,7 @@ class MultiServerCallable extends CancellableRegionServerCallable getMulti() { + MultiAction getMulti() { return this.multiAction; } @@ -99,9 +99,9 @@ class MultiServerCallable extends CancellableRegionServerCallable>> e: this.multiAction.actions.entrySet()) { + for (Map.Entry> e: this.multiAction.actions.entrySet()) { final byte [] regionName = e.getKey(); - final List> actions = e.getValue(); + final List actions = e.getValue(); regionActionBuilder.clear(); regionActionBuilder.setRegion(RequestConverter.buildRegionSpecifier( HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME, regionName)); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 4c8a21ce0cd..7da3727e0eb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -587,11 +587,11 @@ public final class RequestConverter { * @return a multi request * @throws IOException */ - public static RegionAction.Builder buildRegionAction(final byte[] regionName, - final List> actions, final RegionAction.Builder regionActionBuilder, + public static RegionAction.Builder buildRegionAction(final byte[] regionName, + final List actions, final RegionAction.Builder regionActionBuilder, final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder) throws IOException { - for (Action action: actions) { + for (Action action: actions) { Row row = action.getAction(); actionBuilder.clear(); actionBuilder.setIndex(action.getOriginalIndex()); @@ -648,14 +648,14 @@ public final class RequestConverter { * @return a multi request that does not carry any data. * @throws IOException */ - public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName, - final List> actions, final List cells, + public static RegionAction.Builder buildNoDataRegionAction(final byte[] regionName, + final List actions, final List cells, final RegionAction.Builder regionActionBuilder, final ClientProtos.Action.Builder actionBuilder, final MutationProto.Builder mutationBuilder) throws IOException { RegionAction.Builder builder = getRegionActionBuilderWithRegion( RegionAction.newBuilder(), regionName); - for (Action action: actions) { + for (Action action: actions) { Row row = action.getAction(); actionBuilder.clear(); actionBuilder.setIndex(action.getOriginalIndex()); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index ed521a31ca2..5a2169944f3 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.client.AsyncProcess.SubmittedSizeChecker; import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -159,7 +160,7 @@ public class TestAsyncProcess { private long previousTimeout = -1; @Override protected AsyncRequestFutureImpl createAsyncRequestFuture(TableName tableName, - List> actions, long nonceGroup, ExecutorService pool, + List actions, long nonceGroup, ExecutorService pool, Batch.Callback callback, Object[] results, boolean needResults, CancellableRegionServerCallable callable, int curTimeout) { // Test HTable has tableName of null, so pass DUMMY_TABLE @@ -226,7 +227,7 @@ public class TestAsyncProcess { callable1.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() { @Override - public void addResponse(MultiResponse mr, byte[] regionName, Action a) { + public void addResponse(MultiResponse mr, byte[] regionName, Action a) { if (Arrays.equals(FAILS, a.getAction().getRow())) { mr.add(regionName, a.getOriginalIndex(), failure); } else { @@ -259,7 +260,7 @@ public class TestAsyncProcess { static class MyAsyncRequestFutureImpl extends AsyncRequestFutureImpl { - public MyAsyncRequestFutureImpl(TableName tableName, List> actions, long nonceGroup, + public MyAsyncRequestFutureImpl(TableName tableName, List actions, long nonceGroup, ExecutorService pool, boolean needResults, Object[] results, Batch.Callback callback, CancellableRegionServerCallable callable, int operationTimeout, int rpcTimeout, AsyncProcess asyncProcess) { @@ -357,11 +358,11 @@ public class TestAsyncProcess { @Override protected RpcRetryingCaller createCaller( CancellableRegionServerCallable payloadCallable, int rpcTimeout) { - MultiServerCallable callable = (MultiServerCallable) payloadCallable; + MultiServerCallable callable = (MultiServerCallable) payloadCallable; final MultiResponse mr = createMultiResponse( callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() { @Override - public void addResponse(MultiResponse mr, byte[] regionName, Action a) { + public void addResponse(MultiResponse mr, byte[] regionName, Action a) { if (failures.contains(regionName)) { mr.add(regionName, a.getOriginalIndex(), failure); } else { @@ -374,7 +375,7 @@ public class TestAsyncProcess { // Currently AsyncProcess either sends all-replica, or all-primary request. final boolean isDefault = RegionReplicaUtil.isDefaultReplica( callable.getMulti().actions.values().iterator().next().iterator().next().getReplicaId()); - final ServerName server = ((MultiServerCallable)callable).getServerName(); + final ServerName server = ((MultiServerCallable)callable).getServerName(); String debugMsg = "Call to " + server + ", primary=" + isDefault + " with " + callable.getMulti().actions.size() + " entries: "; for (byte[] region : callable.getMulti().actions.keySet()) { @@ -409,13 +410,13 @@ public class TestAsyncProcess { } } - static MultiResponse createMultiResponse(final MultiAction multi, + static MultiResponse createMultiResponse(final MultiAction multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions, ResponseGenerator gen) { final MultiResponse mr = new MultiResponse(); nbMultiResponse.incrementAndGet(); - for (Map.Entry>> entry : multi.actions.entrySet()) { + for (Map.Entry> entry : multi.actions.entrySet()) { byte[] regionName = entry.getKey(); - for (Action a : entry.getValue()) { + for (Action a : entry.getValue()) { nbActions.incrementAndGet(); gen.addResponse(mr, regionName, a); } @@ -424,7 +425,7 @@ public class TestAsyncProcess { } private static interface ResponseGenerator { - void addResponse(final MultiResponse mr, byte[] regionName, Action a); + void addResponse(final MultiResponse mr, byte[] regionName, Action a); } /** @@ -1280,6 +1281,25 @@ public class TestAsyncProcess { } } + @Test + public void testAction() { + Action action_0 = new Action(new Put(Bytes.toBytes("abc")), 10); + Action action_1 = new Action(new Put(Bytes.toBytes("ccc")), 10); + Action action_2 = new Action(new Put(Bytes.toBytes("ccc")), 10); + Action action_3 = new Action(new Delete(Bytes.toBytes("ccc")), 10); + assertFalse(action_0.equals(action_1)); + assertTrue(action_0.equals(action_0)); + assertTrue(action_1.equals(action_2)); + assertTrue(action_2.equals(action_1)); + assertFalse(action_0.equals(new Put(Bytes.toBytes("abc")))); + assertTrue(action_2.equals(action_3)); + assertFalse(action_0.equals(action_3)); + assertEquals(0, action_0.compareTo(action_0)); + assertEquals(-1, action_0.compareTo(action_1)); + assertEquals(1, action_1.compareTo(action_0)); + assertEquals(0, action_1.compareTo(action_2)); + } + @Test public void testBatch() throws IOException, InterruptedException { ClusterConnection conn = new MyConnectionImpl(conf); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java index 43481007950..fb9dc214c02 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestDelayingRunner.java @@ -41,8 +41,8 @@ public class TestDelayingRunner { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testDelayingRunner() throws Exception{ - MultiAction ma = new MultiAction(); - ma.add(hri1.getRegionName(), new Action(new Put(DUMMY_BYTES_1), 0)); + MultiAction ma = new MultiAction(); + ma.add(hri1.getRegionName(), new Action(new Put(DUMMY_BYTES_1), 0)); final AtomicLong endTime = new AtomicLong(); final long sleepTime = 1000; DelayingRunner runner = new DelayingRunner(sleepTime, ma.actions.entrySet().iterator().next()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java index bd0268af702..b4c808cb106 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hbase; import org.apache.hadoop.hbase.classification.InterfaceAudience; /** - * Implementations of this interface will keep and return to clients + * Implementations of this interface will keep and return to clients * implementations of classes providing API to execute * coordinated operations. This interface is client-side, so it does NOT * include methods to retrieve the particular interface implementations.