diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionLocation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionLocation.java index 6af221c4005..59004081408 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionLocation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionLocation.java @@ -55,8 +55,8 @@ public class HRegionLocation implements Comparable { */ @Override public String toString() { - return "region=" + this.regionInfo.getRegionNameAsString() + - ", hostname=" + this.serverName + ", seqNum=" + seqNum; + return "region=" + (this.regionInfo == null ? "null" : this.regionInfo.getRegionNameAsString()) + + ", hostname=" + this.serverName + ", seqNum=" + seqNum; } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index ed73d776ad9..1d367b75d8a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -157,7 +157,7 @@ class AsyncProcess { * the current process to be stopped without proceeding with the other operations in * the queue. */ - boolean failure(int originalIndex, byte[] region, Row row, Throwable t); + boolean failure(int originalIndex, Row row, Throwable t); /** * Called on a failure we plan to retry. This allows the user to stop retrying. Will be @@ -165,7 +165,7 @@ class AsyncProcess { * * @return false if we should retry, true otherwise. */ - boolean retriableFailure(int originalIndex, Row row, byte[] region, Throwable exception); + boolean retriableFailure(int originalIndex, Row row, Throwable exception); } private static class BatchErrors { @@ -173,14 +173,14 @@ class AsyncProcess { private final List actions = new ArrayList(); private final List addresses = new ArrayList(); - public synchronized void add(Throwable ex, Row row, HRegionLocation location) { + public synchronized void add(Throwable ex, Row row, ServerName serverName) { if (row == null){ - throw new IllegalArgumentException("row cannot be null. location=" + location); + throw new IllegalArgumentException("row cannot be null. location=" + serverName); } throwables.add(ex); actions.add(row); - addresses.add(location != null ? location.getServerName().toString() : "null location"); + addresses.add(serverName != null ? serverName.toString() : "null"); } private synchronized RetriesExhaustedWithDetailsException makeException() { @@ -267,10 +267,8 @@ class AsyncProcess { return; } - // This looks like we are keying by region but HRegionLocation has a comparator that compares - // on the server portion only (hostname + port) so this Map collects regions by server. - Map> actionsByServer = - new HashMap>(); + Map> actionsByServer = + new HashMap>(); List> retainedActions = new ArrayList>(rows.size()); long currentTaskCnt = tasksDone.get(); @@ -324,13 +322,13 @@ class AsyncProcess { * @param actionsByServer the multiaction per server * @param ng Nonce generator, or null if no nonces are needed. */ - private void addAction(HRegionLocation loc, Action action, Map> actionsByServer, NonceGenerator ng) { + private void addAction(HRegionLocation loc, Action action, + Map> actionsByServer, NonceGenerator ng) { final byte[] regionName = loc.getRegionInfo().getRegionName(); - MultiAction multiAction = actionsByServer.get(loc); + MultiAction multiAction = actionsByServer.get(loc.getServerName()); if (multiAction == null) { multiAction = new MultiAction(); - actionsByServer.put(loc, multiAction); + actionsByServer.put(loc.getServerName(), multiAction); } if (action.hasNonce() && !multiAction.hasNonceGroup()) { // TODO: this code executes for every (re)try, and calls getNonceGroup again @@ -484,8 +482,8 @@ class AsyncProcess { } // group per location => regions server - final Map> actionsByServer = - new HashMap>(); + final Map> actionsByServer = + new HashMap>(); NonceGenerator ng = this.hConnection.getNonceGenerator(); for (Action action : currentActions) { @@ -509,43 +507,43 @@ class AsyncProcess { * @param numAttempt the attempt number. */ public void sendMultiAction(final List> initialActions, - Map> actionsByServer, + Map> actionsByServer, final int numAttempt, final HConnectionManager.ServerErrorTracker errorsByServer) { // Send the queries and add them to the inProgress list // This iteration is by server (the HRegionLocation comparator is by server portion only). - for (Map.Entry> e : actionsByServer.entrySet()) { - final HRegionLocation loc = e.getKey(); + for (Map.Entry> e : actionsByServer.entrySet()) { + final ServerName server = e.getKey(); final MultiAction multiAction = e.getValue(); - incTaskCounters(multiAction.getRegions(), loc.getServerName()); + incTaskCounters(multiAction.getRegions(), server); Runnable runnable = Trace.wrap("AsyncProcess.sendMultiAction", new Runnable() { @Override public void run() { MultiResponse res; try { - MultiServerCallable callable = createCallable(loc, multiAction); + MultiServerCallable callable = createCallable(server, multiAction); try { res = createCaller(callable).callWithoutRetries(callable); } catch (IOException e) { // The service itself failed . It may be an error coming from the communication // layer, but, as well, a functional error raised by the server. - receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, e, + receiveGlobalFailure(initialActions, multiAction, server, numAttempt, e, errorsByServer); return; } catch (Throwable t) { // This should not happen. Let's log & retry anyway. LOG.error("#" + id + ", Caught throwable while calling. This is unexpected." + - " Retrying. Server is " + loc.getServerName() + ", tableName=" + tableName, t); - receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, t, + " Retrying. Server is " + server.getServerName() + ", tableName=" + tableName, t); + receiveGlobalFailure(initialActions, multiAction, server, numAttempt, t, errorsByServer); return; } // Nominal case: we received an answer from the server, and it's not an exception. - receiveMultiAction(initialActions, multiAction, loc, res, numAttempt, errorsByServer); + receiveMultiAction(initialActions, multiAction, server, res, numAttempt, errorsByServer); } finally { - decTaskCounters(multiAction.getRegions(), loc.getServerName()); + decTaskCounters(multiAction.getRegions(), server); } } }); @@ -555,12 +553,12 @@ class AsyncProcess { } catch (RejectedExecutionException ree) { // This should never happen. But as the pool is provided by the end user, let's secure // this a little. - decTaskCounters(multiAction.getRegions(), loc.getServerName()); + decTaskCounters(multiAction.getRegions(), server); LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." + - " Server is " + loc.getServerName(), ree); + " Server is " + server.getServerName(), ree); // We're likely to fail again, but this will increment the attempt counter, so it will // finish. - receiveGlobalFailure(initialActions, multiAction, loc, numAttempt, ree, errorsByServer); + receiveGlobalFailure(initialActions, multiAction, server, numAttempt, ree, errorsByServer); } } } @@ -568,9 +566,9 @@ class AsyncProcess { /** * Create a callable. Isolated to be easily overridden in the tests. */ - protected MultiServerCallable createCallable(final HRegionLocation location, - final MultiAction multi) { - return new MultiServerCallable(hConnection, tableName, location, multi); + protected MultiServerCallable createCallable( + final ServerName server, final MultiAction multi) { + return new MultiServerCallable(hConnection, tableName, server, multi); } /** @@ -589,29 +587,24 @@ class AsyncProcess { * @param row the row * @param canRetry if false, we won't retry whatever the settings. * @param throwable the throwable, if any (can be null) - * @param location the location, if any (can be null) + * @param server the location, if any (can be null) * @return true if the action can be retried, false otherwise. */ - private boolean manageError(int originalIndex, Row row, boolean canRetry, - Throwable throwable, HRegionLocation location) { + private boolean manageError(int originalIndex, Row row, + boolean canRetry, Throwable throwable, ServerName server) { if (canRetry && throwable != null && throwable instanceof DoNotRetryIOException) { canRetry = false; } - byte[] region = null; if (canRetry && callback != null) { - region = location == null ? null : location.getRegionInfo().getEncodedNameAsBytes(); - canRetry = callback.retriableFailure(originalIndex, row, region, throwable); + canRetry = callback.retriableFailure(originalIndex, row, throwable); } if (!canRetry) { if (callback != null) { - if (region == null && location != null) { - region = location.getRegionInfo().getEncodedNameAsBytes(); - } - callback.failure(originalIndex, region, row, throwable); + callback.failure(originalIndex, row, throwable); } - errors.add(throwable, row, location); + errors.add(throwable, row, server); this.hasError.set(true); } @@ -623,29 +616,29 @@ class AsyncProcess { * * @param initialActions the full initial action list * @param rsActions the actions still to do from the initial list - * @param location the destination + * @param server the destination * @param numAttempt the number of attempts so far * @param t the throwable (if any) that caused the resubmit */ private void receiveGlobalFailure(List> initialActions, MultiAction rsActions, - HRegionLocation location, int numAttempt, Throwable t, + ServerName server, int numAttempt, Throwable t, HConnectionManager.ServerErrorTracker errorsByServer) { // Do not use the exception for updating cache because it might be coming from // any of the regions in the MultiAction. hConnection.updateCachedLocations(tableName, - rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, location); - errorsByServer.reportServerError(location); + rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, server); + errorsByServer.reportServerError(server); List> toReplay = new ArrayList>(initialActions.size()); for (Map.Entry>> e : rsActions.actions.entrySet()) { for (Action action : e.getValue()) { - if (manageError(action.getOriginalIndex(), action.getAction(), true, t, location)) { + if (manageError(action.getOriginalIndex(), action.getAction(), true, t, server)) { toReplay.add(action); } } } - logAndResubmit(initialActions, location, toReplay, numAttempt, rsActions.size(), + logAndResubmit(initialActions, server, toReplay, numAttempt, rsActions.size(), t, errorsByServer); } @@ -653,7 +646,7 @@ class AsyncProcess { * Log as many info as possible, and, if there is something to replay, submit it again after * a back off sleep. */ - private void logAndResubmit(List> initialActions, HRegionLocation oldLocation, + private void logAndResubmit(List> initialActions, ServerName oldLocation, List> toReplay, int numAttempt, int failureCount, Throwable throwable, HConnectionManager.ServerErrorTracker errorsByServer) { @@ -662,13 +655,11 @@ class AsyncProcess { if (failureCount != 0) { // We have a failure but nothing to retry. We're done, it's a final failure.. LOG.warn(createLog(numAttempt, failureCount, toReplay.size(), - oldLocation.getServerName(), throwable, -1, false, - errorsByServer.getStartTrackingTime())); + oldLocation, throwable, -1, false, errorsByServer.getStartTrackingTime())); } else if (numAttempt > startLogErrorsCnt + 1) { // The operation was successful, but needed several attempts. Let's log this. LOG.info(createLog(numAttempt, failureCount, 0, - oldLocation.getServerName(), throwable, -1, false, - errorsByServer.getStartTrackingTime())); + oldLocation, throwable, -1, false, errorsByServer.getStartTrackingTime())); } return; } @@ -686,8 +677,7 @@ class AsyncProcess { // We use this value to have some logs when we have multiple failures, but not too many // logs, as errors are to be expected when a region moves, splits and so on LOG.info(createLog(numAttempt, failureCount, toReplay.size(), - oldLocation.getServerName(), throwable, backOffTime, true, - errorsByServer.getStartTrackingTime())); + oldLocation, throwable, backOffTime, true, errorsByServer.getStartTrackingTime())); } try { @@ -706,12 +696,12 @@ class AsyncProcess { * * @param initialActions - the whole action list * @param multiAction - the multiAction we sent - * @param location - the location. It's used as a server name. + * @param server - the location. * @param responses - the response, if any * @param numAttempt - the attempt */ private void receiveMultiAction(List> initialActions, MultiAction multiAction, - HRegionLocation location, + ServerName server, MultiResponse responses, int numAttempt, HConnectionManager.ServerErrorTracker errorsByServer) { assert responses != null; @@ -743,15 +733,15 @@ class AsyncProcess { if (!regionFailureRegistered) { // We're doing this once per location. regionFailureRegistered= true; // The location here is used as a server name. - hConnection.updateCachedLocations(this.tableName, row.getRow(), result, location); + hConnection.updateCachedLocations(this.tableName, row.getRow(), result, server); if (failureCount == 1) { - errorsByServer.reportServerError(location); + errorsByServer.reportServerError(server); canRetry = errorsByServer.canRetryMore(numAttempt); } } if (manageError(correspondingAction.getOriginalIndex(), row, canRetry, - throwable, location)) { + throwable, server)) { toReplay.add(correspondingAction); } } else { // success @@ -779,22 +769,22 @@ class AsyncProcess { } if (failureCount == 0) { - errorsByServer.reportServerError(location); + errorsByServer.reportServerError(server); canRetry = errorsByServer.canRetryMore(numAttempt); } hConnection.updateCachedLocations(this.tableName, actions.get(0).getAction().getRow(), - throwable, location); + throwable, server); failureCount += actions.size(); for (Action action : actions) { Row row = action.getAction(); - if (manageError(action.getOriginalIndex(), row, canRetry, throwable, location)) { + if (manageError(action.getOriginalIndex(), row, canRetry, throwable, server)) { toReplay.add(action); } } } - logAndResubmit(initialActions, location, toReplay, numAttempt, failureCount, + logAndResubmit(initialActions, server, toReplay, numAttempt, failureCount, throwable, errorsByServer); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java index 4a1cc1fb546..cca3be682c8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnection.java @@ -296,6 +296,10 @@ public interface HConnection extends Abortable, Closeable { HRegionLocation relocateRegion(final byte[] tableName, final byte [] row) throws IOException; + @Deprecated + void updateCachedLocations(TableName tableName, byte[] rowkey, + Object exception, HRegionLocation source); + /** * Update the location cache. This is used internally by HBase, in most cases it should not be * used by the client application. @@ -305,7 +309,7 @@ public interface HConnection extends Abortable, Closeable { * @param source the previous location */ void updateCachedLocations(TableName tableName, byte[] rowkey, - Object exception, HRegionLocation source); + Object exception, ServerName source); @Deprecated void updateCachedLocations(byte[] tableName, byte[] rowkey, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java index f3ad615493a..79ad7817833 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java @@ -1444,7 +1444,7 @@ public class HConnectionManager { * @param source the source of the new location, if it's not coming from meta * @param location the new location */ - private void cacheLocation(final TableName tableName, final HRegionLocation source, + private void cacheLocation(final TableName tableName, final ServerName source, final HRegionLocation location) { boolean isFromMeta = (source == null); byte [] startKey = location.getRegionInfo().getStartKey(); @@ -1457,7 +1457,7 @@ public class HConnectionManager { } boolean updateCache; // If the server in cache sends us a redirect, assume it's always valid. - if (oldLocation.equals(source)) { + if (oldLocation.getServerName().equals(source)) { updateCache = true; } else { long newLocationSeqNum = location.getSeqNum(); @@ -2193,7 +2193,7 @@ public class HConnectionManager { } } - void updateCachedLocation(HRegionInfo hri, HRegionLocation source, + void updateCachedLocation(HRegionInfo hri, ServerName source, ServerName serverName, long seqNum) { HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum); cacheLocation(hri.getTable(), source, newHrl); @@ -2204,14 +2204,13 @@ public class HConnectionManager { * @param hri The region in question. * @param source The source of the error that prompts us to invalidate cache. */ - void deleteCachedLocation(HRegionInfo hri, HRegionLocation source) { - ConcurrentMap tableLocations = getTableLocations(hri.getTable()); - tableLocations.remove(hri.getStartKey(), source); + void deleteCachedLocation(HRegionInfo hri, ServerName source) { + getTableLocations(hri.getTable()).remove(hri.getStartKey()); } @Override public void deleteCachedRegionLocation(final HRegionLocation location) { - if (location == null) { + if (location == null || location.getRegionInfo() == null) { return; } @@ -2227,6 +2226,12 @@ public class HConnectionManager { } } + @Override + public void updateCachedLocations(final TableName tableName, byte[] rowkey, + final Object exception, final HRegionLocation source) { + updateCachedLocations(tableName, rowkey, exception, source.getServerName()); + } + /** * Update the location with the new value (if the exception is a RegionMovedException) * or delete it from the cache. Does nothing if we can be sure from the exception that @@ -2237,21 +2242,21 @@ public class HConnectionManager { */ @Override public void updateCachedLocations(final TableName tableName, byte[] rowkey, - final Object exception, final HRegionLocation source) { + final Object exception, final ServerName source) { if (rowkey == null || tableName == null) { LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) + ", tableName=" + (tableName == null ? "null" : tableName)); return; } - if (source == null || source.getServerName() == null){ + if (source == null) { // This should not happen, but let's secure ourselves. return; } // Is it something we have already updated? final HRegionLocation oldLocation = getCachedLocation(tableName, rowkey); - if (oldLocation == null || !source.getServerName().equals(oldLocation.getServerName())) { + if (oldLocation == null || !source.equals(oldLocation.getServerName())) { // There is no such location in the cache (it's been removed already) or // the cache has already been refreshed with a different location. => nothing to do return; @@ -2270,7 +2275,7 @@ public class HConnectionManager { if (LOG.isTraceEnabled()) { LOG.trace("Region " + regionInfo.getRegionNameAsString() + " moved to " + rme.getHostname() + ":" + rme.getPort() + - " according to " + source.getHostnamePort()); + " according to " + source.getHostAndPort()); } // We know that the region is not anymore on this region server, but we know // the new location. @@ -2391,7 +2396,7 @@ public class HConnectionManager { } @Override - public boolean failure(int pos, byte[] region, Row row, Throwable t) { + public boolean failure(int pos, Row row, Throwable t) { assert pos < results.length; results[pos] = t; //Batch.Callback was not called on failure in 0.94. We keep this. @@ -2399,8 +2404,7 @@ public class HConnectionManager { } @Override - public boolean retriableFailure(int originalIndex, Row row, byte[] region, - Throwable exception) { + public boolean retriableFailure(int originalIndex, Row row, Throwable exception) { return true; // we retry } } @@ -2681,8 +2685,8 @@ public class HConnectionManager { */ static class ServerErrorTracker { // We need a concurrent map here, as we could have multiple threads updating it in parallel. - private final ConcurrentMap errorsByServer = - new ConcurrentHashMap(); + private final ConcurrentMap errorsByServer = + new ConcurrentHashMap(); private final long canRetryUntil; private final int maxRetries; private final String startTrackingTime; @@ -2709,7 +2713,7 @@ public class HConnectionManager { * @param basePause The default hci pause. * @return The time to wait before sending next request. */ - long calculateBackoffTime(HRegionLocation server, long basePause) { + long calculateBackoffTime(ServerName server, long basePause) { long result; ServerErrors errorStats = errorsByServer.get(server); if (errorStats != null) { @@ -2725,7 +2729,7 @@ public class HConnectionManager { * * @param server The server in question. */ - void reportServerError(HRegionLocation server) { + void reportServerError(ServerName server) { ServerErrors errors = errorsByServer.get(server); if (errors != null) { errors.addError(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index bb9d40652b3..94c8e286ea7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -27,7 +27,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -52,13 +54,26 @@ class MultiServerCallable extends RegionServerCallable { private final boolean cellBlock; MultiServerCallable(final HConnection connection, final TableName tableName, - final HRegionLocation location, final MultiAction multi) { + final ServerName location, final MultiAction multi) { super(connection, tableName, null); this.multiAction = multi; - setLocation(location); + // RegionServerCallable has HRegionLocation field, but this is a multi-region request. + // Using region info from parent HRegionLocation would be a mistake for this class; so + // we will store the server here, and throw if someone tries to obtain location/regioninfo. + this.location = new HRegionLocation(null, location); this.cellBlock = isCellBlock(); } + @Override + protected HRegionLocation getLocation() { + throw new RuntimeException("Cannot get region location for multi-region request"); + } + + @Override + public HRegionInfo getHRegionInfo() { + throw new RuntimeException("Cannot get region info for multi-region request"); + }; + MultiAction getMulti() { return this.multiAction; } @@ -133,6 +148,6 @@ class MultiServerCallable extends RegionServerCallable { @Override public void prepare(boolean reload) throws IOException { // Use the location we were given in the constructor rather than go look it up. - setStub(getConnection().getClient(getLocation().getServerName())); + setStub(getConnection().getClient(this.location.getServerName())); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index 5abc8d0b933..74c1e4a8993 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -37,6 +37,11 @@ import org.apache.hadoop.hbase.util.Bytes; /** * Implementations call a RegionServer and implement {@link #call()}. * Passed to a {@link RpcRetryingCaller} so we retry on fail. + * TODO: this class is actually tied to one region, because most of the paths make use of + * the regioninfo part of location when building requests. The only reason it works for + * multi-region requests (e.g. batch) is that they happen to not use the region parts. + * This could be done cleaner (e.g. having a generic parameter and 2 derived classes, + * RegionCallable and actual RegionServerCallable with ServerName. * @param the class that the ServerCallable handles */ @InterfaceAudience.Private @@ -74,7 +79,7 @@ public abstract class RegionServerCallable implements RetryingCallable { throw new IOException("Failed to find location, tableName=" + tableName + ", row=" + Bytes.toString(row) + ", reload=" + reload); } - setStub(getConnection().getClient(getLocation().getServerName())); + setStub(getConnection().getClient(this.location.getServerName())); } /** @@ -119,7 +124,7 @@ public abstract class RegionServerCallable implements RetryingCallable { // hbase:meta again to find the new location if (this.location != null) getConnection().clearCaches(location.getServerName()); } else if (t instanceof RegionMovedException) { - getConnection().updateCachedLocations(tableName, row, t, location); + getConnection().updateCachedLocations(tableName, row, t, location.getServerName()); } else if (t instanceof NotServingRegionException && !retrying) { // Purge cache entries for this specific region from hbase:meta cache // since we don't call connect(true) when number of retries is 1. diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 17fb92afadd..11e602f02c9 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -104,8 +104,8 @@ public class TestAsyncProcess { @Override protected RpcRetryingCaller createCaller(MultiServerCallable callable) { - final MultiResponse mr = createMultiResponse(callable.getLocation(), callable.getMulti(), - nbMultiResponse, nbActions); + final MultiResponse mr = createMultiResponse( + callable.getMulti(), nbMultiResponse, nbActions); return new RpcRetryingCaller(conf) { @Override public MultiResponse callWithoutRetries( RetryingCallable callable) @@ -123,17 +123,18 @@ public class TestAsyncProcess { } } - static MultiResponse createMultiResponse(final HRegionLocation loc, + static MultiResponse createMultiResponse( final MultiAction multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions) { final MultiResponse mr = new MultiResponse(); nbMultiResponse.incrementAndGet(); for (Map.Entry>> entry : multi.actions.entrySet()) { - for (Action a : entry.getValue()) { + byte[] regionName = entry.getKey(); + for (Action a : entry.getValue()) { nbActions.incrementAndGet(); if (Arrays.equals(FAILS, a.getAction().getRow())) { - mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), failure); + mr.add(regionName, a.getOriginalIndex(), failure); } else { - mr.add(loc.getRegionInfo().getRegionName(), a.getOriginalIndex(), success); + mr.add(regionName, a.getOriginalIndex(), success); } } } @@ -387,9 +388,11 @@ public class TestAsyncProcess { ap.submit(puts, false); Assert.assertTrue(puts.isEmpty()); - while (!ap.hasError()) { + long cutoff = System.currentTimeMillis() + 60000; + while (!ap.hasError() && System.currentTimeMillis() < cutoff) { Thread.sleep(1); } + Assert.assertTrue(ap.hasError()); ap.waitUntilDone(); Assert.assertEquals(mcb.successCalled.get(), 2); @@ -496,14 +499,13 @@ public class TestAsyncProcess { } @Override - public boolean failure(int originalIndex, byte[] region, Row row, Throwable t) { + public boolean failure(int originalIndex, Row row, Throwable t) { failureCalled.incrementAndGet(); return true; } @Override - public boolean retriableFailure(int originalIndex, Row row, byte[] region, - Throwable exception) { + public boolean retriableFailure(int originalIndex, Row row, Throwable exception) { // We retry once only. return (retriableFailure.incrementAndGet() < 2); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java index c3e32373567..9d8724da3d7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java @@ -248,6 +248,11 @@ public class CoprocessorHConnection implements HConnection { return delegate.relocateRegion(tableName, row); } + public void updateCachedLocations(TableName tableName, byte[] rowkey, Object exception, + ServerName source) { + delegate.updateCachedLocations(tableName, rowkey, exception, source); + } + public void updateCachedLocations(TableName tableName, byte[] rowkey, Object exception, HRegionLocation source) { delegate.updateCachedLocations(tableName, rowkey, exception, source); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 37a59dbbf87..503d4d02a1e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -361,7 +361,8 @@ public class TestHCM { final int nextPort = conn.getCachedLocation(TABLE_NAME, ROW).getPort() + 1; HRegionLocation loc = conn.getCachedLocation(TABLE_NAME, ROW); - conn.updateCachedLocation(loc.getRegionInfo(), loc, ServerName.valueOf("127.0.0.1", nextPort, + conn.updateCachedLocation(loc.getRegionInfo(), loc.getServerName(), + ServerName.valueOf("127.0.0.1", nextPort, HConstants.LATEST_TIMESTAMP), HConstants.LATEST_TIMESTAMP); Assert.assertEquals(conn.getCachedLocation(TABLE_NAME, ROW).getPort(), nextPort); @@ -553,19 +554,18 @@ public class TestHCM { HRegionLocation location = conn.getCachedLocation(TABLE_NAME2, ROW); assertNotNull(location); - HRegionLocation anySource = new HRegionLocation(location.getRegionInfo(), ServerName.valueOf( - location.getHostname(), location.getPort() - 1, 0L)); + ServerName anySource = ServerName.valueOf(location.getHostname(), location.getPort() - 1, 0L); // Same server as already in cache reporting - overwrites any value despite seqNum. int nextPort = location.getPort() + 1; - conn.updateCachedLocation(location.getRegionInfo(), location, + conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(), ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1); location = conn.getCachedLocation(TABLE_NAME2, ROW); Assert.assertEquals(nextPort, location.getPort()); // No source specified - same. nextPort = location.getPort() + 1; - conn.updateCachedLocation(location.getRegionInfo(), location, + conn.updateCachedLocation(location.getRegionInfo(), location.getServerName(), ServerName.valueOf("127.0.0.1", nextPort, 0), location.getSeqNum() - 1); location = conn.getCachedLocation(TABLE_NAME2, ROW); Assert.assertEquals(nextPort, location.getPort()); @@ -867,9 +867,8 @@ public class TestHCM { public void testErrorBackoffTimeCalculation() throws Exception { // TODO: This test would seem to presume hardcoded RETRY_BACKOFF which it should not. final long ANY_PAUSE = 100; - HRegionInfo ri = new HRegionInfo(TABLE_NAME); - HRegionLocation location = new HRegionLocation(ri, ServerName.valueOf("127.0.0.1", 1, 0)); - HRegionLocation diffLocation = new HRegionLocation(ri, ServerName.valueOf("127.0.0.1", 2, 0)); + ServerName location = ServerName.valueOf("127.0.0.1", 1, 0); + ServerName diffLocation = ServerName.valueOf("127.0.0.1", 2, 0); ManualEnvironmentEdge timeMachine = new ManualEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(timeMachine); @@ -891,16 +890,10 @@ public class TestHCM { assertEqualsWithJitter(ANY_PAUSE * 5, tracker.calculateBackoffTime(location, ANY_PAUSE)); // All of this shouldn't affect backoff for different location. - assertEquals(0, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE)); tracker.reportServerError(diffLocation); assertEqualsWithJitter(ANY_PAUSE, tracker.calculateBackoffTime(diffLocation, ANY_PAUSE)); - // But should still work for a different region in the same location. - HRegionInfo ri2 = new HRegionInfo(TABLE_NAME2); - HRegionLocation diffRegion = new HRegionLocation(ri2, location.getServerName()); - assertEqualsWithJitter(ANY_PAUSE * 5, tracker.calculateBackoffTime(diffRegion, ANY_PAUSE)); - // Check with different base. assertEqualsWithJitter(ANY_PAUSE * 10, tracker.calculateBackoffTime(location, ANY_PAUSE * 2));