diff --git a/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java b/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
index 0477be8be1b..04e99b16116 100644
--- a/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
+++ b/src/main/java/org/apache/hadoop/hbase/HBaseConfiguration.java
@@ -61,6 +61,7 @@ public class HBaseConfiguration extends Configuration {
}
private static void checkDefaultsVersion(Configuration conf) {
+ if (true) return; // REMOVE
if (conf.getBoolean("hbase.defaults.for.version.skip", Boolean.FALSE)) return;
String defaultsVersion = conf.get("hbase.defaults.for.version");
String thisVersion = VersionInfo.getVersion();
diff --git a/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java b/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java
index 8ec5042ecff..4998c15998e 100644
--- a/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java
+++ b/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java
@@ -98,6 +98,10 @@ public class CatalogTracker {
// the other end). I made https://issues.apache.org/jira/browse/HBASE-4495 for
// doing CT fixup. St.Ack 09/30/2011.
//
+
+ // TODO: Timeouts have never been as advertised in here and its worse now
+ // with retries; i.e. the HConnection retries and pause goes ahead whatever
+ // the passed timeout is. Fix.
private static final Log LOG = LogFactory.getLog(CatalogTracker.class);
private final HConnection connection;
private final ZooKeeperWatcher zookeeper;
@@ -620,6 +624,8 @@ public class CatalogTracker {
return hostingServer.getRegionInfo(regionName) != null;
} catch (ConnectException e) {
t = e;
+ } catch (RetriesExhaustedException e) {
+ t = e;
} catch (RemoteException e) {
IOException ioe = e.unwrapRemoteException();
t = ioe;
@@ -679,6 +685,9 @@ public class CatalogTracker {
// Pass
} catch (ServerNotRunningYetException e) {
// Pass -- remote server is not up so can't be carrying .META.
+ } catch (RetriesExhaustedException e) {
+ // Pass -- failed after bunch of retries.
+ LOG.debug("Failed verify meta region location after retries", e);
}
return connection != null;
}
diff --git a/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 6cdeec1aab0..103e39b733f 100644
--- a/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -166,7 +166,7 @@ public class ClientScanner extends AbstractClientScanner {
// Close the previous scanner if it's open
if (this.callable != null) {
this.callable.setClose();
- getConnection().getRegionServerWithRetries(callable);
+ callable.withRetries();
this.callable = null;
}
@@ -202,7 +202,7 @@ public class ClientScanner extends AbstractClientScanner {
callable = getScannerCallable(localStartKey, nbRows);
// Open a scanner on the region server starting at the
// beginning of the region
- getConnection().getRegionServerWithRetries(callable);
+ callable.withRetries();
this.currentRegion = callable.getHRegionInfo();
if (this.scanMetrics != null) {
this.scanMetrics.countOfRegions.inc();
@@ -269,14 +269,14 @@ public class ClientScanner extends AbstractClientScanner {
// Skip only the first row (which was the last row of the last
// already-processed batch).
callable.setCaching(1);
- values = getConnection().getRegionServerWithRetries(callable);
+ values = callable.withRetries();
callable.setCaching(this.caching);
skipFirst = false;
}
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
- values = getConnection().getRegionServerWithRetries(callable);
+ values = callable.withRetries();
} catch (DoNotRetryIOException e) {
if (e instanceof UnknownScannerException) {
long timeout = lastNext + scannerTimeout;
@@ -364,7 +364,7 @@ public class ClientScanner extends AbstractClientScanner {
if (callable != null) {
callable.setClose();
try {
- getConnection().getRegionServerWithRetries(callable);
+ callable.withRetries();
} catch (IOException e) {
// We used to catch this error, interpret, and rethrow. However, we
// have since decided that it's not nice for a scanner's close to
diff --git a/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
new file mode 100644
index 00000000000..f5d774e1b20
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HConstants;
+
+
+/**
+ * Utility used by client connections such as {@link HConnection} and
+ * {@link ServerCallable}
+ */
+public class ConnectionUtils {
+ /**
+ * Calculate pause time.
+ * Built on {@link HConstants#RETRY_BACKOFF}.
+ * @param pause
+ * @param tries
+ * @return How long to wait after tries
retries
+ */
+ public static long getPauseTime(final long pause, final int tries) {
+ int ntries = tries;
+ if (ntries >= HConstants.RETRY_BACKOFF.length) {
+ ntries = HConstants.RETRY_BACKOFF.length - 1;
+ }
+ return pause * HConstants.RETRY_BACKOFF[ntries];
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnection.java b/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
index 0e78d96c3ba..c7b730b062d 100644
--- a/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
+++ b/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
@@ -253,6 +253,7 @@ public interface HConnection extends Abortable, Closeable {
* @return an object of type T
* @throws IOException if a remote or network exception occurs
* @throws RuntimeException other unspecified error
+ * @deprecated Use {@link HConnectionManager#withoutRetries(ServerCallable)}
*/
public T getRegionServerWithRetries(ServerCallable callable)
throws IOException, RuntimeException;
@@ -265,6 +266,7 @@ public interface HConnection extends Abortable, Closeable {
* @return an object of type T
* @throws IOException if a remote or network exception occurs
* @throws RuntimeException other unspecified error
+ * @deprecated Use {@link HConnectionManager#withoutRetries(ServerCallable)}
*/
public T getRegionServerWithoutRetries(ServerCallable callable)
throws IOException, RuntimeException;
@@ -373,4 +375,10 @@ public interface HConnection extends Abortable, Closeable {
* @return true if this connection is closed
*/
public boolean isClosed();
+
+ /**
+ * Clear any caches that pertain to server name sn
+ * @param sn A server name as hostname:port
+ */
+ public void clearCaches(final String sn);
}
diff --git a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
index 897f25cdd4c..99f90b233a4 100644
--- a/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
+++ b/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
@@ -23,9 +23,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;
-import java.net.ConnectException;
import java.net.InetSocketAddress;
-import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -562,7 +560,6 @@ public class HConnectionManager {
throw new UnsupportedOperationException(
"Unable to find region server interface " + serverClassName, e);
}
-
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
@@ -612,16 +609,9 @@ public class HConnectionManager {
return this.conf;
}
- private long getPauseTime(int tries) {
- int ntries = tries;
- if (ntries >= HConstants.RETRY_BACKOFF.length) {
- ntries = HConstants.RETRY_BACKOFF.length - 1;
- }
- return this.pause * HConstants.RETRY_BACKOFF[ntries];
- }
-
public HMasterInterface getMaster()
throws MasterNotRunningException, ZooKeeperConnectionException {
+ // TODO: REMOVE. MOVE TO HBaseAdmin and redo as a Callable!!!
// Check if we already have a good master connection
try {
@@ -669,18 +659,18 @@ public class HConnectionManager {
} catch (IOException e) {
if (tries == numRetries - 1) {
// This was our last chance - don't bother sleeping
- LOG.info("getMaster attempt " + tries + " of " + this.numRetries +
+ LOG.info("getMaster attempt " + tries + " of " + numRetries +
" failed; no more retrying.", e);
break;
}
- LOG.info("getMaster attempt " + tries + " of " + this.numRetries +
+ LOG.info("getMaster attempt " + tries + " of " + numRetries +
" failed; retrying after sleep of " +
- getPauseTime(tries), e);
+ ConnectionUtils.getPauseTime(this.pause, tries), e);
}
// Cannot connect to master or it is not running. Sleep & retry
try {
- this.masterLock.wait(getPauseTime(tries));
+ this.masterLock.wait(ConnectionUtils.getPauseTime(this.pause, tries));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Thread was interrupted while trying to connect to master.");
@@ -1025,8 +1015,7 @@ public class HConnectionManager {
throw e;
} catch (IOException e) {
if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException) e);
+ e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
if (tries < numRetries - 1) {
if (LOG.isDebugEnabled()) {
@@ -1035,7 +1024,7 @@ public class HConnectionManager {
((metaLocation == null)? "null": "{" + metaLocation + "}") +
", attempt=" + tries + " of " +
this.numRetries + " failed; retrying after sleep of " +
- getPauseTime(tries) + " because: " + e.getMessage());
+ ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
}
} else {
throw e;
@@ -1047,7 +1036,7 @@ public class HConnectionManager {
}
}
try{
- Thread.sleep(getPauseTime(tries));
+ Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Giving up trying to location region in " +
@@ -1147,14 +1136,18 @@ public class HConnectionManager {
}
}
+ @Override
+ public void clearCaches(String sn) {
+ clearCachedLocationForServer(sn);
+ }
+
/*
* Delete all cached entries of a table that maps to a specific location.
*
* @param tablename
* @param server
*/
- private void clearCachedLocationForServer(
- final String server) {
+ private void clearCachedLocationForServer(final String server) {
boolean deletedSomething = false;
synchronized (this.cachedRegionLocations) {
if (!cachedServers.contains(server)) {
@@ -1338,82 +1331,33 @@ public class HConnectionManager {
public T getRegionServerWithRetries(ServerCallable callable)
throws IOException, RuntimeException {
- List exceptions =
- new ArrayList();
- for(int tries = 0; tries < numRetries; tries++) {
- try {
- callable.beforeCall();
- callable.connect(tries != 0);
- return callable.call();
- } catch (Throwable t) {
- callable.shouldRetry(t);
- t = translateException(t);
- if (t instanceof SocketTimeoutException ||
- t instanceof ConnectException ||
- t instanceof RetriesExhaustedException) {
- // if thrown these exceptions, we clear all the cache entries that
- // map to that slow/dead server; otherwise, let cache miss and ask
- // .META. again to find the new location
- HRegionLocation hrl = callable.location;
- if (hrl != null) {
- clearCachedLocationForServer(hrl.getServerAddress().toString());
- }
- }
- RetriesExhaustedException.ThrowableWithExtraContext qt =
- new RetriesExhaustedException.ThrowableWithExtraContext(t,
- System.currentTimeMillis(), callable.toString());
- exceptions.add(qt);
- if (tries == numRetries - 1) {
- throw new RetriesExhaustedException(tries, exceptions);
- }
- } finally {
- callable.afterCall();
- }
- try {
- Thread.sleep(getPauseTime(tries));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Giving up after tries=" + tries, e);
- }
- }
- return null;
+ return callable.withRetries();
}
public T getRegionServerWithoutRetries(ServerCallable callable)
- throws IOException, RuntimeException {
- try {
- callable.beforeCall();
- callable.connect(false);
- return callable.call();
- } catch (Throwable t) {
- Throwable t2 = translateException(t);
- if (t2 instanceof IOException) {
- throw (IOException)t2;
- } else {
- throw new RuntimeException(t2);
- }
- } finally {
- callable.afterCall();
- }
+ throws IOException, RuntimeException {
+ return callable.withoutRetries();
}
private Callable createCallable(final HRegionLocation loc,
final MultiAction multi, final byte [] tableName) {
+ // TODO: This does not belong in here!!! St.Ack HConnections should
+ // not be dealing in Callables; Callables have HConnections, not other
+ // way around.
final HConnection connection = this;
return new Callable() {
public MultiResponse call() throws IOException {
- return getRegionServerWithoutRetries(
- new ServerCallable(connection, tableName, null) {
- public MultiResponse call() throws IOException {
- return server.multi(multi);
- }
- @Override
- public void connect(boolean reload) throws IOException {
- server =
- connection.getHRegionConnection(loc.getHostname(), loc.getPort());
- }
+ ServerCallable callable =
+ new ServerCallable(connection, tableName, null) {
+ public MultiResponse call() throws IOException {
+ return server.multi(multi);
}
- );
+ @Override
+ public void connect(boolean reload) throws IOException {
+ server = connection.getHRegionConnection(loc.getHostname(), loc.getPort());
+ }
+ };
+ return callable.withoutRetries();
}
};
}
@@ -1422,6 +1366,7 @@ public class HConnectionManager {
final byte[] tableName,
ExecutorService pool,
Object[] results) throws IOException, InterruptedException {
+ // This belongs in HTable!!! Not in here. St.Ack
// results must be the same size as list
if (results.length != list.size()) {
@@ -1508,6 +1453,7 @@ public class HConnectionManager {
Object[] results,
Batch.Callback callback)
throws IOException, InterruptedException {
+ // This belongs in HTable!!! Not in here. St.Ack
// results must be the same size as list
if (results.length != list.size()) {
@@ -1527,13 +1473,12 @@ public class HConnectionManager {
boolean retry = true;
// count that helps presize actions array
int actionCount = 0;
- Throwable singleRowCause = null;
for (int tries = 0; tries < numRetries && retry; ++tries) {
// sleep first, if this is a retry
if (tries >= 1) {
- long sleepTime = getPauseTime(tries);
+ long sleepTime = ConnectionUtils.getPauseTime(this.pause, tries);
LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
Thread.sleep(sleepTime);
}
@@ -1639,14 +1584,6 @@ public class HConnectionManager {
}
}
- if (retry) {
- // Simple little check for 1 item failures.
- if (singleRowCause != null) {
- throw new IOException(singleRowCause);
- }
- }
-
-
List exceptions = new ArrayList(actionCount);
List actions = new ArrayList(actionCount);
List addresses = new ArrayList(actionCount);
@@ -1666,19 +1603,6 @@ public class HConnectionManager {
}
}
- private Throwable translateException(Throwable t) throws IOException {
- if (t instanceof UndeclaredThrowableException) {
- t = t.getCause();
- }
- if (t instanceof RemoteException) {
- t = RemoteExceptionHandler.decodeRemoteException((RemoteException)t);
- }
- if (t instanceof DoNotRetryIOException) {
- throw (DoNotRetryIOException)t;
- }
- return t;
- }
-
/*
* Return the number of cached region for a table. It will only be called
* from a unit test.
@@ -1925,4 +1849,4 @@ public class HConnectionManager {
c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, retries);
log.debug("Set serverside HConnection retries=" + retries);
}
-}
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 839d79bd07a..b4f45953851 100644
--- a/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -395,7 +395,6 @@ public class HTable implements HTableInterface, Closeable {
* @return Pair of arrays of region starting and ending row keys
* @throws IOException if a remote or network exception occurs
*/
- @SuppressWarnings("unchecked")
public Pair getStartEndKeys() throws IOException {
final List startKeyList = new ArrayList();
final List endKeyList = new ArrayList();
@@ -569,13 +568,12 @@ public class HTable implements HTableInterface, Closeable {
@Override
public Result getRowOrBefore(final byte[] row, final byte[] family)
throws IOException {
- return connection.getRegionServerWithRetries(
- new ServerCallable(connection, tableName, row, operationTimeout) {
+ return new ServerCallable(connection, tableName, row, operationTimeout) {
public Result call() throws IOException {
return server.getClosestRowBefore(location.getRegionInfo().getRegionName(),
row, family);
}
- });
+ }.withRetries();
}
/**
@@ -616,13 +614,11 @@ public class HTable implements HTableInterface, Closeable {
*/
@Override
public Result get(final Get get) throws IOException {
- return connection.getRegionServerWithRetries(
- new ServerCallable(connection, tableName, get.getRow(), operationTimeout) {
+ return new ServerCallable(connection, tableName, get.getRow(), operationTimeout) {
public Result call() throws IOException {
return server.get(location.getRegionInfo().getRegionName(), get);
}
- }
- );
+ }.withRetries();
}
/**
@@ -672,14 +668,12 @@ public class HTable implements HTableInterface, Closeable {
@Override
public void delete(final Delete delete)
throws IOException {
- connection.getRegionServerWithRetries(
- new ServerCallable(connection, tableName, delete.getRow(), operationTimeout) {
+ new ServerCallable(connection, tableName, delete.getRow(), operationTimeout) {
public Boolean call() throws IOException {
server.delete(location.getRegionInfo().getRegionName(), delete);
return null; // FindBugs NP_BOOLEAN_RETURN_NULL
}
- }
- );
+ }.withRetries();
}
/**
@@ -749,14 +743,12 @@ public class HTable implements HTableInterface, Closeable {
throw new IOException(
"Invalid arguments to append, no columns specified");
}
- return connection.getRegionServerWithRetries(
- new ServerCallable(connection, tableName, append.getRow(), operationTimeout) {
+ return new ServerCallable(connection, tableName, append.getRow(), operationTimeout) {
public Result call() throws IOException {
return server.append(
location.getRegionInfo().getRegionName(), append);
}
- }
- );
+ }.withRetries();
}
/**
@@ -768,14 +760,12 @@ public class HTable implements HTableInterface, Closeable {
throw new IOException(
"Invalid arguments to increment, no columns specified");
}
- return connection.getRegionServerWithRetries(
- new ServerCallable(connection, tableName, increment.getRow(), operationTimeout) {
+ return new ServerCallable(connection, tableName, increment.getRow(), operationTimeout) {
public Result call() throws IOException {
return server.increment(
location.getRegionInfo().getRegionName(), increment);
}
- }
- );
+ }.withRetries();
}
/**
@@ -805,15 +795,13 @@ public class HTable implements HTableInterface, Closeable {
throw new IOException(
"Invalid arguments to incrementColumnValue", npe);
}
- return connection.getRegionServerWithRetries(
- new ServerCallable(connection, tableName, row, operationTimeout) {
+ return new ServerCallable(connection, tableName, row, operationTimeout) {
public Long call() throws IOException {
return server.incrementColumnValue(
location.getRegionInfo().getRegionName(), row, family,
qualifier, amount, writeToWAL);
}
- }
- );
+ }.withRetries();
}
/**
@@ -824,14 +812,12 @@ public class HTable implements HTableInterface, Closeable {
final byte [] family, final byte [] qualifier, final byte [] value,
final Put put)
throws IOException {
- return connection.getRegionServerWithRetries(
- new ServerCallable(connection, tableName, row, operationTimeout) {
+ return new ServerCallable(connection, tableName, row, operationTimeout) {
public Boolean call() throws IOException {
return server.checkAndPut(location.getRegionInfo().getRegionName(),
row, family, qualifier, value, put) ? Boolean.TRUE : Boolean.FALSE;
}
- }
- );
+ }.withRetries();
}
@@ -843,16 +829,14 @@ public class HTable implements HTableInterface, Closeable {
final byte [] family, final byte [] qualifier, final byte [] value,
final Delete delete)
throws IOException {
- return connection.getRegionServerWithRetries(
- new ServerCallable(connection, tableName, row, operationTimeout) {
+ return new ServerCallable(connection, tableName, row, operationTimeout) {
public Boolean call() throws IOException {
return server.checkAndDelete(
location.getRegionInfo().getRegionName(),
row, family, qualifier, value, delete)
? Boolean.TRUE : Boolean.FALSE;
}
- }
- );
+ }.withRetries();
}
/**
@@ -860,14 +844,12 @@ public class HTable implements HTableInterface, Closeable {
*/
@Override
public boolean exists(final Get get) throws IOException {
- return connection.getRegionServerWithRetries(
- new ServerCallable(connection, tableName, get.getRow(), operationTimeout) {
+ return new ServerCallable(connection, tableName, get.getRow(), operationTimeout) {
public Boolean call() throws IOException {
return server.
exists(location.getRegionInfo().getRegionName(), get);
}
- }
- );
+ }.withRetries();
}
/**
@@ -947,15 +929,13 @@ public class HTable implements HTableInterface, Closeable {
@Override
public RowLock lockRow(final byte [] row)
throws IOException {
- return connection.getRegionServerWithRetries(
- new ServerCallable(connection, tableName, row, operationTimeout) {
+ return new ServerCallable(connection, tableName, row, operationTimeout) {
public RowLock call() throws IOException {
long lockId =
server.lockRow(location.getRegionInfo().getRegionName(), row);
return new RowLock(row,lockId);
}
- }
- );
+ }.withRetries();
}
/**
@@ -964,15 +944,13 @@ public class HTable implements HTableInterface, Closeable {
@Override
public void unlockRow(final RowLock rl)
throws IOException {
- connection.getRegionServerWithRetries(
- new ServerCallable(connection, tableName, rl.getRow(), operationTimeout) {
+ new ServerCallable(connection, tableName, rl.getRow(), operationTimeout) {
public Boolean call() throws IOException {
server.unlockRow(location.getRegionInfo().getRegionName(),
rl.getLockId());
return null; // FindBugs NP_BOOLEAN_RETURN_NULL
}
- }
- );
+ }.withRetries();
}
/**
diff --git a/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java b/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java
index 4135e55d53a..f0c6828937a 100644
--- a/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java
+++ b/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java
@@ -23,21 +23,17 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
-import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
-import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
@@ -191,7 +187,7 @@ public class MetaScanner {
}
callable = new ScannerCallable(connection, metaTableName, scan, null);
// Open scanner
- connection.getRegionServerWithRetries(callable);
+ callable.withRetries();
int processedRows = 0;
try {
@@ -201,7 +197,7 @@ public class MetaScanner {
break;
}
//we have all the rows here
- Result [] rrs = connection.getRegionServerWithRetries(callable);
+ Result [] rrs = callable.withRetries();
if (rrs == null || rrs.length == 0 || rrs[0].size() == 0) {
break; //exit completely
}
@@ -220,7 +216,7 @@ public class MetaScanner {
} finally {
// Close scanner
callable.setClose();
- connection.getRegionServerWithRetries(callable);
+ callable.withRetries();
}
} while (Bytes.compareTo(startRow, HConstants.LAST_ROW) != 0);
}
diff --git a/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java b/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
index 9b568e3a68f..a8adf7221e6 100644
--- a/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
+++ b/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
@@ -21,14 +21,21 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.ConnectException;
import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.Callable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.ipc.RemoteException;
/**
* Abstract class that implements {@link Callable}. Implementation stipulates
@@ -123,4 +130,106 @@ public abstract class ServerCallable implements Callable {
this.callTimeout = ((int) (this.endTime - this.startTime));
}
}
-}
\ No newline at end of file
+
+ /**
+ * @return {@link HConnection} instance used by this Callable.
+ */
+ HConnection getConnection() {
+ return this.connection;
+ }
+
+ /**
+ * Run this instance with retries, timed waits,
+ * and refinds of missing regions.
+ *
+ * @param the type of the return value
+ * @return an object of type T
+ * @throws IOException if a remote or network exception occurs
+ * @throws RuntimeException other unspecified error
+ */
+ public T withRetries()
+ throws IOException, RuntimeException {
+ Configuration c = getConnection().getConfiguration();
+ final long pause = c.getLong(HConstants.HBASE_CLIENT_PAUSE,
+ HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+ final int numRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+ List exceptions =
+ new ArrayList();
+ for (int tries = 0; tries < numRetries; tries++) {
+ try {
+ beforeCall();
+ connect(tries != 0);
+ return call();
+ } catch (Throwable t) {
+ shouldRetry(t);
+ t = translateException(t);
+ if (t instanceof SocketTimeoutException ||
+ t instanceof ConnectException ||
+ t instanceof RetriesExhaustedException) {
+ // if thrown these exceptions, we clear all the cache entries that
+ // map to that slow/dead server; otherwise, let cache miss and ask
+ // .META. again to find the new location
+ HRegionLocation hrl = location;
+ if (hrl != null) {
+ getConnection().clearCaches(hrl.getServerAddress().toString());
+ }
+ }
+ RetriesExhaustedException.ThrowableWithExtraContext qt =
+ new RetriesExhaustedException.ThrowableWithExtraContext(t,
+ System.currentTimeMillis(), toString());
+ exceptions.add(qt);
+ if (tries == numRetries - 1) {
+ throw new RetriesExhaustedException(tries, exceptions);
+ }
+ } finally {
+ afterCall();
+ }
+ try {
+ Thread.sleep(ConnectionUtils.getPauseTime(pause, tries));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Giving up after tries=" + tries, e);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Run this instance against the server once.
+ * @param the type of the return value
+ * @return an object of type T
+ * @throws IOException if a remote or network exception occurs
+ * @throws RuntimeException other unspecified error
+ */
+ public T withoutRetries()
+ throws IOException, RuntimeException {
+ try {
+ beforeCall();
+ connect(false);
+ return call();
+ } catch (Throwable t) {
+ Throwable t2 = translateException(t);
+ if (t2 instanceof IOException) {
+ throw (IOException)t2;
+ } else {
+ throw new RuntimeException(t2);
+ }
+ } finally {
+ afterCall();
+ }
+ }
+
+ private static Throwable translateException(Throwable t) throws IOException {
+ if (t instanceof UndeclaredThrowableException) {
+ t = t.getCause();
+ }
+ if (t instanceof RemoteException) {
+ t = ((RemoteException)t).unwrapRemoteException();
+ }
+ if (t instanceof DoNotRetryIOException) {
+ throw (DoNotRetryIOException)t;
+ }
+ return t;
+ }
+}
diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java b/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java
index 3ad6cd552b8..b8b290c89ca 100644
--- a/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java
+++ b/src/main/java/org/apache/hadoop/hbase/ipc/ExecRPCInvoker.java
@@ -76,7 +76,7 @@ public class ExecRPCInvoker implements InvocationHandler {
exec);
}
};
- ExecResult result = connection.getRegionServerWithRetries(callable);
+ ExecResult result = callable.withRetries();
this.regionName = result.getRegionName();
LOG.debug("Result is region="+ Bytes.toStringBinary(regionName) +
", value="+result.getValue());
@@ -89,4 +89,4 @@ public class ExecRPCInvoker implements InvocationHandler {
public byte[] getRegionName() {
return regionName;
}
-}
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index bd574b2adda..2ef1cb2784c 100644
--- a/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -484,7 +484,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try {
List toRetry = new ArrayList();
- boolean success = conn.getRegionServerWithRetries(svrCallable);
+ boolean success = svrCallable.withRetries();
if (!success) {
LOG.warn("Attempt to bulk load region containing "
+ Bytes.toStringBinary(first) + " into table "
diff --git a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index d13daf021d0..caf5abc80f1 100644
--- a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -1393,9 +1393,11 @@ public class AssignmentManager extends ZooKeeperListener {
this.regionsInTransition.put(encodedName, state);
} else {
// If we are reassigning the node do not force in-memory state to OFFLINE.
- // Based on the znode state we will decide if to change
- // in-memory state to OFFLINE or not. It will
- // be done before setting the znode to OFFLINE state.
+ // Based on the znode state we will decide if to change in-memory state to
+ // OFFLINE or not. It will be done before setting znode to OFFLINE state.
+
+ // We often get here with state == CLOSED because ClosedRegionHandler will
+ // assign on its tail as part of the handling of a region close.
if (!hijack) {
LOG.debug("Forcing OFFLINE; was=" + state);
state.update(RegionState.State.OFFLINE);
diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
index 2dfc3e74fa6..88f207a1af4 100644
--- a/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
+++ b/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
@@ -97,7 +97,10 @@ public class ClosedRegionHandler extends EventHandler implements TotesHRegionInf
return;
}
// ZK Node is in CLOSED state, assign it.
+ // TODO: Should we remove the region from RIT too? We don't? Makes for
+ // a 'forcing' log message when we go to update state from CLOSED to OFFLINE
assignmentManager.setOffline(regionInfo);
+ // This below has to do w/ online enable/disable of a table
assignmentManager.removeClosedRegion(regionInfo);
assignmentManager.assign(regionInfo, true);
}
diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
index 2dd497b25a6..8f4f4b8f8b9 100644
--- a/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
+++ b/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
@@ -168,12 +168,14 @@ public class ServerShutdownHandler extends EventHandler {
@Override
public void process() throws IOException {
final ServerName serverName = this.serverName;
-
try {
-
try {
- LOG.info("Splitting logs for " + serverName);
- this.services.getMasterFileSystem().splitLog(serverName);
+ if (this.shouldSplitHlog) {
+ LOG.info("Splitting logs for " + serverName);
+ this.services.getMasterFileSystem().splitLog(serverName);
+ } else {
+ LOG.info("Skipping log splitting for " + serverName);
+ }
} catch (IOException ioe) {
this.services.getExecutorService().submit(this);
this.deadServers.add(serverName);
@@ -186,7 +188,7 @@ public class ServerShutdownHandler extends EventHandler {
LOG.info("Server " + serverName +
" was carrying ROOT. Trying to assign.");
this.services.getAssignmentManager().
- regionOffline(HRegionInfo.ROOT_REGIONINFO);
+ regionOffline(HRegionInfo.ROOT_REGIONINFO);
verifyAndAssignRootWithRetries();
}
@@ -195,7 +197,7 @@ public class ServerShutdownHandler extends EventHandler {
LOG.info("Server " + serverName +
" was carrying META. Trying to assign.");
this.services.getAssignmentManager().
- regionOffline(HRegionInfo.FIRST_META_REGIONINFO);
+ regionOffline(HRegionInfo.FIRST_META_REGIONINFO);
this.services.getAssignmentManager().assignMeta();
}
@@ -228,9 +230,8 @@ public class ServerShutdownHandler extends EventHandler {
// OFFLINE? -- and then others after like CLOSING that depend on log
// splitting.
List regionsInTransition =
- this.services.getAssignmentManager()
- .processServerShutdown(this.serverName);
-
+ this.services.getAssignmentManager().
+ processServerShutdown(this.serverName);
// Wait on meta to come online; we need it to progress.
// TODO: Best way to hold strictly here? We should build this retry logic
@@ -252,7 +253,7 @@ public class ServerShutdownHandler extends EventHandler {
try {
this.server.getCatalogTracker().waitForMeta();
hris = MetaReader.getServerUserRegions(this.server.getCatalogTracker(),
- this.serverName);
+ this.serverName);
break;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -267,8 +268,8 @@ public class ServerShutdownHandler extends EventHandler {
for (RegionState rit : regionsInTransition) {
if (!rit.isClosing() && !rit.isPendingClose()) {
LOG.debug("Removed " + rit.getRegion().getRegionNameAsString() +
- " from list of regions to assign because in RIT" + " region state: "
- + rit.getState());
+ " from list of regions to assign because in RIT; region state: " +
+ rit.getState());
if (hris != null) hris.remove(rit.getRegion());
}
}
diff --git a/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java b/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
index dada051ba5a..90fa45ac68f 100644
--- a/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
+++ b/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
@@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.util.Bytes;
@@ -68,6 +70,8 @@ public class TestCatalogTracker {
private Abortable abortable;
@BeforeClass public static void beforeClass() throws Exception {
+ // Set this down so tests run quicker
+ UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
UTIL.startMiniZKCluster();
}
@@ -130,7 +134,8 @@ public class TestCatalogTracker {
*/
@Test public void testInterruptWaitOnMetaAndRoot()
throws IOException, InterruptedException {
- HConnection connection = mockConnection(null);
+ HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
+ HConnection connection = mockConnection(implementation);
try {
final CatalogTracker ct = constructAndStartCatalogTracker(connection);
ServerName hsa = ct.getRootLocation();
@@ -180,21 +185,48 @@ public class TestCatalogTracker {
Mockito.when(implementation.getRegionInfo((byte[]) Mockito.any())).
thenThrow(new IOException("Server not running, aborting")).
thenReturn(new HRegionInfo());
+
// After we encounter the above 'Server not running', we should catch the
// IOE and go into retrying for the meta mode. We'll do gets on -ROOT- to
// get new meta location. Return something so this 'get' succeeds
// (here we mock up getRegionServerWithRetries, the wrapper around
// the actual get).
+
+ // TODO: Refactor. This method has been moved out of HConnection.
+ // It works for now but has been deprecated.
Mockito.when(connection.getRegionServerWithRetries((ServerCallable)Mockito.any())).
thenReturn(getMetaTableRowResult());
+
// Now start up the catalogtracker with our doctored Connection.
final CatalogTracker ct = constructAndStartCatalogTracker(connection);
try {
// Set a location for root and meta.
RootLocationEditor.setRootLocation(this.watcher, SN);
ct.setMetaLocation(SN);
- // Call the method that HBASE-4288 calls.
- Assert.assertFalse(ct.waitForMetaServerConnectionDefault() == null);
+ // Call the method that HBASE-4288 calls. It will try and verify the
+ // meta location and will fail on first attempt then go into a long wait.
+ // So, do this in a thread and then reset meta location to break it out
+ // of its wait after a bit of time.
+ final AtomicBoolean metaSet = new AtomicBoolean(false);
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ metaSet.set(ct.waitForMetaServerConnectionDefault() != null);
+ } catch (NotAllMetaRegionsOnlineException e) {
+ throw new RuntimeException(e);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ t.start();
+ while(!t.isAlive()) Threads.sleep(1);
+ Threads.sleep(1);
+ // Now reset the meta as though it were redeployed.
+ ct.setMetaLocation(SN);
+ t.join();
+ Assert.assertTrue(metaSet.get());
} finally {
// Clean out root and meta locations or later tests will be confused...
// they presume start fresh in zk.
@@ -280,7 +312,7 @@ public class TestCatalogTracker {
ct.waitForRoot(100);
}
- @Test (expected = NotAllMetaRegionsOnlineException.class)
+ @Test (expected = RetriesExhaustedException.class)
public void testTimeoutWaitForMeta()
throws IOException, InterruptedException {
HConnection connection =
@@ -345,6 +377,9 @@ public class TestCatalogTracker {
// location (no matter what the Get is). Same for getHRegionInfo -- always
// just return the meta region.
final Result result = getMetaTableRowResult();
+
+ // TODO: Refactor. This method has been moved out of HConnection.
+ // It works for now but has been deprecated.
Mockito.when(connection.getRegionServerWithRetries((ServerCallable)Mockito.any())).
thenReturn(result);
Mockito.when(implementation.getRegionInfo((byte[]) Mockito.any())).
diff --git a/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index c1a077f1d17..e34d8bcdd4f 100644
--- a/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ b/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -17,12 +17,16 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
-import org.junit.experimental.categories.Category;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.mockito.Mockito;
/**
@@ -60,6 +64,53 @@ public class HConnectionTestingUtility {
}
}
+ /**
+ * Calls {@link #getMockedConnection(Configuration)} and then mocks a few
+ * more of the popular {@link HConnection} methods so they do 'normal'
+ * operation (see return doc below for list). Be sure to shutdown the
+ * connection when done by calling
+ * {@link HConnectionManager#deleteConnection(Configuration, boolean)} else it
+ * will stick around; this is probably not what you want.
+ * @param implementation An {@link HRegionInterface} instance; you'll likely
+ * want to pass a mocked HRS; can be null.
+ *
+ * @param conf Configuration to use
+ * @param implementation An HRegionInterface; can be null but is usually
+ * itself a mock.
+ * @param sn ServerName to include in the region location returned by this
+ * implementation
+ * @param hri HRegionInfo to include in the location returned when
+ * getRegionLocation is called on the mocked connection
+ * @return Mock up a connection that returns a {@link Configuration} when
+ * {@link HConnection#getConfiguration()} is called, a 'location' when
+ * {@link HConnection#getRegionLocation(byte[], byte[], boolean)} is called,
+ * and that returns the passed {@link HRegionInterface} instance when
+ * {@link HConnection#getHRegionConnection(String, int)}
+ * is called (Be sure call
+ * {@link HConnectionManager#deleteConnection(org.apache.hadoop.conf.Configuration, boolean)}
+ * when done with this mocked Connection.
+ * @throws IOException
+ */
+ public static HConnection getMockedConnectionAndDecorate(final Configuration conf,
+ final HRegionInterface implementation, final ServerName sn, final HRegionInfo hri)
+ throws IOException {
+ HConnection c = HConnectionTestingUtility.getMockedConnection(conf);
+ Mockito.doNothing().when(c).close();
+ // Make it so we return a particular location when asked.
+ final HRegionLocation loc = new HRegionLocation(hri, sn.getHostname(), sn.getPort());
+ Mockito.when(c.getRegionLocation((byte[]) Mockito.any(),
+ (byte[]) Mockito.any(), Mockito.anyBoolean())).
+ thenReturn(loc);
+ Mockito.when(c.locateRegion((byte[]) Mockito.any(), (byte[]) Mockito.any())).
+ thenReturn(loc);
+ if (implementation != null) {
+ // If a call to getHRegionConnection, return this implementation.
+ Mockito.when(c.getHRegionConnection(Mockito.anyString(), Mockito.anyInt())).
+ thenReturn(implementation);
+ }
+ return c;
+ }
+
/**
* Get a Mockito spied-upon {@link HConnection} that goes with the passed
* conf
configuration instance.
@@ -85,7 +136,6 @@ public class HConnectionTestingUtility {
}
}
-
/**
* @return Count of extant connection instances
*/
@@ -94,4 +144,4 @@ public class HConnectionTestingUtility {
return HConnectionManager.HBASE_INSTANCES.size();
}
}
-}
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
index 5e3e994b655..301ee27f1a9 100644
--- a/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
+++ b/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
@@ -20,9 +20,6 @@ package org.apache.hadoop.hbase.mapreduce;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -35,16 +32,23 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
import org.apache.hadoop.hbase.util.Bytes;
@@ -52,9 +56,10 @@ import org.apache.hadoop.hbase.util.Pair;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
import com.google.common.collect.Multimap;
-import org.junit.experimental.categories.Category;
/**
* Test cases for the atomic load error handling of the bulk load functionality.
@@ -226,13 +231,13 @@ public class TestLoadIncrementalHFilesSplitRecovery {
util.getConfiguration()) {
protected List tryAtomicRegionLoad(final HConnection conn,
- byte[] tableName, final byte[] first, Collection lqis) throws IOException {
+ byte[] tableName, final byte[] first, Collection lqis)
+ throws IOException {
int i = attmptedCalls.incrementAndGet();
if (i == 1) {
- HConnection errConn = mock(HConnection.class);
+ HConnection errConn = null;
try {
- doThrow(new IOException("injecting bulk load error")).when(errConn)
- .getRegionServerWithRetries((ServerCallable) anyObject());
+ errConn = getMockedConnection(util.getConfiguration());
} catch (Exception e) {
LOG.fatal("mocking cruft, should never happen", e);
throw new RuntimeException("mocking cruft, should never happen");
@@ -253,6 +258,27 @@ public class TestLoadIncrementalHFilesSplitRecovery {
fail("doBulkLoad should have thrown an exception");
}
+ private HConnection getMockedConnection(final Configuration conf)
+ throws IOException {
+ HConnection c = Mockito.mock(HConnection.class);
+ Mockito.when(c.getConfiguration()).thenReturn(conf);
+ Mockito.doNothing().when(c).close();
+ // Make it so we return a particular location when asked.
+ final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
+ "example.org", 1234);
+ Mockito.when(c.getRegionLocation((byte[]) Mockito.any(),
+ (byte[]) Mockito.any(), Mockito.anyBoolean())).
+ thenReturn(loc);
+ Mockito.when(c.locateRegion((byte[]) Mockito.any(), (byte[]) Mockito.any())).
+ thenReturn(loc);
+ HRegionInterface hri = Mockito.mock(HRegionInterface.class);
+ Mockito.when(hri.bulkLoadHFiles(Mockito.anyList(), (byte [])Mockito.any())).
+ thenThrow(new IOException("injecting bulk load error"));
+ Mockito.when(c.getHRegionConnection(Mockito.anyString(), Mockito.anyInt())).
+ thenReturn(hri);
+ return c;
+ }
+
/**
* This test exercises the path where there is a split after initial
* validation but before the atomic bulk load call. We cannot use presplitting
diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java b/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
index 82b32aa9054..a067bed195b 100644
--- a/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
+++ b/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
@@ -17,20 +17,36 @@
*/
package org.apache.hadoop.hbase.master;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.executor.RegionTransitionData;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
+import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -51,12 +67,17 @@ import org.mockito.Mockito;
@Category(SmallTests.class)
public class TestAssignmentManager {
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
- private static final ServerName RANDOM_SERVERNAME =
+ private static final ServerName SERVERNAME_A =
new ServerName("example.org", 1234, 5678);
+ private static final ServerName SERVERNAME_B =
+ new ServerName("example.org", 0, 5678);
+ private static final HRegionInfo REGIONINFO =
+ new HRegionInfo(Bytes.toBytes("t"),
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
+
+ // Mocked objects or; get redone for each test.
private Server server;
private ServerManager serverManager;
- private CatalogTracker ct;
- private ExecutorService executor;
private ZooKeeperWatcher watcher;
@BeforeClass
@@ -71,22 +92,38 @@ public class TestAssignmentManager {
@Before
public void before() throws ZooKeeperConnectionException, IOException {
+ // TODO: Make generic versions of what we do below and put up in a mocking
+ // utility class or move up into HBaseTestingUtility.
+
// Mock a Server. Have it return a legit Configuration and ZooKeeperWatcher.
// If abort is called, be sure to fail the test (don't just swallow it
// silently as is mockito default).
this.server = Mockito.mock(Server.class);
Mockito.when(server.getConfiguration()).thenReturn(HTU.getConfiguration());
this.watcher =
- new ZooKeeperWatcher(HTU.getConfiguration(), "mocked server", this.server, true);
+ new ZooKeeperWatcher(HTU.getConfiguration(), "mockedServer", this.server, true);
Mockito.when(server.getZooKeeper()).thenReturn(this.watcher);
Mockito.doThrow(new RuntimeException("Aborted")).
when(server).abort(Mockito.anyString(), (Throwable)Mockito.anyObject());
- // Mock a ServerManager. Say the server RANDOME_SERVERNAME is online.
- // Also, when someone sends sendRegionClose, say true it succeeded.
+
+ // Mock a ServerManager. Say server SERVERNAME_{A,B} are online. Also
+ // make it so if close or open, we return 'success'.
this.serverManager = Mockito.mock(ServerManager.class);
- Mockito.when(this.serverManager.isServerOnline(RANDOM_SERVERNAME)).thenReturn(true);
- this.ct = Mockito.mock(CatalogTracker.class);
- this.executor = Mockito.mock(ExecutorService.class);
+ Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(true);
+ Mockito.when(this.serverManager.isServerOnline(SERVERNAME_B)).thenReturn(true);
+ final List onlineServers = new ArrayList();
+ onlineServers.add(SERVERNAME_B);
+ onlineServers.add(SERVERNAME_A);
+ Mockito.when(this.serverManager.getOnlineServersList()).thenReturn(onlineServers);
+ Mockito.when(this.serverManager.sendRegionClose(SERVERNAME_A, REGIONINFO, -1)).
+ thenReturn(true);
+ Mockito.when(this.serverManager.sendRegionClose(SERVERNAME_B, REGIONINFO, -1)).
+ thenReturn(true);
+ // Ditto on open.
+ Mockito.when(this.serverManager.sendRegionOpen(SERVERNAME_A, REGIONINFO, -1)).
+ thenReturn(RegionOpeningState.OPENED);
+ Mockito.when(this.serverManager.sendRegionOpen(SERVERNAME_B, REGIONINFO, -1)).
+ thenReturn(RegionOpeningState.OPENED);
}
@After
@@ -94,34 +131,204 @@ public class TestAssignmentManager {
if (this.watcher != null) this.watcher.close();
}
+ /**
+ * Tests AssignmentManager balance function. Runs a balance moving a region
+ * from one server to another mocking regionserver responding over zk.
+ * @throws IOException
+ * @throws KeeperException
+ */
+ @Test
+ public void testBalance()
+ throws IOException, KeeperException {
+ // Create and startup an executor. This is used by AssignmentManager
+ // handling zk callbacks.
+ ExecutorService executor = startupMasterExecutor("testBalanceExecutor");
+
+ // We need a mocked catalog tracker.
+ CatalogTracker ct = Mockito.mock(CatalogTracker.class);
+ // Create an AM.
+ AssignmentManager am =
+ new AssignmentManager(this.server, this.serverManager, ct, executor);
+ try {
+ // Make sure our new AM gets callbacks; once registered, can't unregister.
+ // Thats ok because we make a new zk watcher for each test.
+ this.watcher.registerListenerFirst(am);
+ // Call the balance function but fake the region being online first at
+ // SERVERNAME_A. Create a balance plan.
+ am.regionOnline(REGIONINFO, SERVERNAME_A);
+ // Balance region from A to B.
+ RegionPlan plan = new RegionPlan(REGIONINFO, SERVERNAME_A, SERVERNAME_B);
+ am.balance(plan);
+
+ // Now fake the region closing successfully over on the regionserver; the
+ // regionserver will have set the region in CLOSED state. This will
+ // trigger callback into AM. The below zk close call is from the RS close
+ // region handler duplicated here because its down deep in a private
+ // method hard to expose.
+ int versionid =
+ ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1);
+ assertNotSame(versionid, -1);
+ // AM is going to notice above CLOSED and queue up a new assign. The
+ // assign will go to open the region in the new location set by the
+ // balancer. The zk node will be OFFLINE waiting for regionserver to
+ // transition it through OPENING, OPENED. Wait till we see the OFFLINE
+ // zk node before we proceed.
+ while (!ZKAssign.verifyRegionState(this.watcher, REGIONINFO, EventType.M_ZK_REGION_OFFLINE)) {
+ Threads.sleep(1);
+ }
+ // Get current versionid else will fail on transition from OFFLINE to OPENING below
+ versionid = ZKAssign.getVersion(this.watcher, REGIONINFO);
+ assertNotSame(-1, versionid);
+ // This uglyness below is what the openregionhandler on RS side does.
+ versionid = ZKAssign.transitionNode(server.getZooKeeper(), REGIONINFO,
+ SERVERNAME_A, EventType.M_ZK_REGION_OFFLINE,
+ EventType.RS_ZK_REGION_OPENING, versionid);
+ assertNotSame(-1, versionid);
+ // Move znode from OPENING to OPENED as RS does on successful open.
+ versionid =
+ ZKAssign.transitionNodeOpened(this.watcher, REGIONINFO, SERVERNAME_B, versionid);
+ assertNotSame(-1, versionid);
+ // Wait on the handler removing the OPENED znode.
+ while(am.isRegionInTransition(REGIONINFO) != null) Threads.sleep(1);
+ } finally {
+ executor.shutdown();
+ am.shutdown();
+ // Clean up all znodes
+ ZKAssign.deleteAllNodes(this.watcher);
+ }
+ }
+
+ /**
+ * Run a simple server shutdown handler.
+ * @throws KeeperException
+ * @throws IOException
+ */
+ @Test
+ public void testShutdownHandler() throws KeeperException, IOException {
+ // Create and startup an executor. This is used by AssignmentManager
+ // handling zk callbacks.
+ ExecutorService executor = startupMasterExecutor("testShutdownHandler");
+
+ // We need a mocked catalog tracker.
+ CatalogTracker ct = Mockito.mock(CatalogTracker.class);
+ // Create an AM.
+ AssignmentManager am =
+ new AssignmentManager(this.server, this.serverManager, ct, executor);
+ try {
+ // Make sure our new AM gets callbacks; once registered, can't unregister.
+ // Thats ok because we make a new zk watcher for each test.
+ this.watcher.registerListenerFirst(am);
+
+ // Need to set up a fake scan of meta for the servershutdown handler
+ // Make an RS Interface implementation. Make it so a scanner can go against it.
+ HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
+ // Get a meta row result that has region up on SERVERNAME_A
+ Result r = getMetaTableRowResult(REGIONINFO, SERVERNAME_A);
+ Mockito.when(implementation.openScanner((byte [])Mockito.any(), (Scan)Mockito.any())).
+ thenReturn(System.currentTimeMillis());
+ // Return a good result first and then return null to indicate end of scan
+ Mockito.when(implementation.next(Mockito.anyLong(), Mockito.anyInt())).
+ thenReturn(new Result [] {r}, (Result [])null);
+
+ // Get a connection w/ mocked up common methods.
+ HConnection connection =
+ HConnectionTestingUtility.getMockedConnectionAndDecorate(HTU.getConfiguration(),
+ implementation, SERVERNAME_B, REGIONINFO);
+
+ // Make it so we can get a catalogtracker from servermanager.. .needed
+ // down in guts of server shutdown handler.
+ Mockito.when(ct.getConnection()).thenReturn(connection);
+ Mockito.when(this.server.getCatalogTracker()).thenReturn(ct);
+
+ // Now make a server shutdown handler instance and invoke process.
+ // Have it that SERVERNAME_A died.
+ DeadServer deadServers = new DeadServer();
+ deadServers.add(SERVERNAME_A);
+ // I need a services instance that will return the AM
+ MasterServices services = Mockito.mock(MasterServices.class);
+ Mockito.when(services.getAssignmentManager()).thenReturn(am);
+ ServerShutdownHandler handler = new ServerShutdownHandler(this.server,
+ services, deadServers, SERVERNAME_A, false);
+ handler.process();
+ // The region in r will have been assigned. It'll be up in zk as unassigned.
+ } finally {
+ executor.shutdown();
+ am.shutdown();
+ // Clean up all znodes
+ ZKAssign.deleteAllNodes(this.watcher);
+ }
+ }
+
+ /**
+ * @param sn ServerName to use making startcode and server in meta
+ * @param hri Region to serialize into HRegionInfo
+ * @return A mocked up Result that fakes a Get on a row in the
+ * .META.
table.
+ * @throws IOException
+ */
+ private Result getMetaTableRowResult(final HRegionInfo hri,
+ final ServerName sn)
+ throws IOException {
+ // TODO: Move to a utilities class. More than one test case can make use
+ // of this facility.
+ List kvs = new ArrayList();
+ kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
+ HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
+ Writables.getBytes(hri)));
+ kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
+ HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
+ Bytes.toBytes(sn.getHostAndPort())));
+ kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
+ HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
+ Bytes.toBytes(sn.getStartcode())));
+ return new Result(kvs);
+ }
+
+ /**
+ * Create and startup executor pools. Start same set as master does (just
+ * run a few less).
+ * @param name Name to give our executor
+ * @return Created executor (be sure to call shutdown when done).
+ */
+ private ExecutorService startupMasterExecutor(final String name) {
+ // TODO: Move up into HBaseTestingUtility? Generally useful.
+ ExecutorService executor = new ExecutorService(name);
+ executor.startExecutorService(ExecutorType.MASTER_OPEN_REGION, 3);
+ executor.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, 3);
+ executor.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, 3);
+ executor.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, 3);
+ return executor;
+ }
+
@Test
public void testUnassignWithSplitAtSameTime() throws KeeperException, IOException {
// Region to use in test.
final HRegionInfo hri = HRegionInfo.FIRST_META_REGIONINFO;
// First amend the servermanager mock so that when we do send close of the
- // first meta region on RANDOM_SERVERNAME, it will return true rather than
+ // first meta region on SERVERNAME_A, it will return true rather than
// default null.
- Mockito.when(this.serverManager.sendRegionClose(RANDOM_SERVERNAME, hri, -1))
- .thenReturn(true);
+ Mockito.when(this.serverManager.sendRegionClose(SERVERNAME_A, hri, -1)).thenReturn(true);
+ // Need a mocked catalog tracker.
+ CatalogTracker ct = Mockito.mock(CatalogTracker.class);
// Create an AM.
AssignmentManager am =
- new AssignmentManager(this.server, this.serverManager, this.ct, this.executor);
+ new AssignmentManager(this.server, this.serverManager, ct, null);
try {
// First make sure my mock up basically works. Unassign a region.
- unassign(am, RANDOM_SERVERNAME, hri);
+ unassign(am, SERVERNAME_A, hri);
// This delete will fail if the previous unassign did wrong thing.
ZKAssign.deleteClosingNode(this.watcher, hri);
// Now put a SPLITTING region in the way. I don't have to assert it
// go put in place. This method puts it in place then asserts it still
// owns it by moving state from SPLITTING to SPLITTING.
- int version = createNodeSplitting(this.watcher, hri, RANDOM_SERVERNAME);
+ int version = createNodeSplitting(this.watcher, hri, SERVERNAME_A);
// Now, retry the unassign with the SPLTTING in place. It should just
// complete without fail; a sort of 'silent' recognition that the
// region to unassign has been split and no longer exists: TOOD: what if
// the split fails and the parent region comes back to life?
- unassign(am, RANDOM_SERVERNAME, hri);
+ unassign(am, SERVERNAME_A, hri);
// This transition should fail if the znode has been messed with.
- ZKAssign.transitionNode(this.watcher, hri, RANDOM_SERVERNAME,
+ ZKAssign.transitionNode(this.watcher, hri, SERVERNAME_A,
EventType.RS_ZK_REGION_SPLITTING, EventType.RS_ZK_REGION_SPLITTING, version);
assertTrue(am.isRegionInTransition(hri) == null);
} finally {
diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
index c359f4b8da2..c4fa1d49e7c 100644
--- a/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
+++ b/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
@@ -72,7 +72,11 @@ public class TestCatalogJanitor {
// Mock an HConnection and a HRegionInterface implementation. Have the
// HConnection return the HRI. Have the HRI return a few mocked up responses
// to make our test work.
- this.connection = HConnectionTestingUtility.getMockedConnection(this.c);
+ this.connection =
+ HConnectionTestingUtility.getMockedConnectionAndDecorate(this.c,
+ Mockito.mock(HRegionInterface.class),
+ new ServerName("example.org,12345,6789"),
+ HRegionInfo.FIRST_META_REGIONINFO);
// Set hbase.rootdir into test dir.
FileSystem fs = FileSystem.get(this.c);
Path rootdir = fs.makeQualified(new Path(this.c.get(HConstants.HBASE_DIR)));
diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
index 0a343710a46..322deeef03c 100644
--- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
+++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
@@ -135,7 +134,7 @@ public class TestHRegionServerBulkLoad {
// bulk load HFiles
HConnection conn = UTIL.getHBaseAdmin().getConnection();
byte[] tbl = Bytes.toBytes(tableName);
- conn.getRegionServerWithRetries(new ServerCallable(conn, tbl, Bytes
+ new ServerCallable(conn, tbl, Bytes
.toBytes("aaa")) {
@Override
public Void call() throws Exception {
@@ -145,12 +144,12 @@ public class TestHRegionServerBulkLoad {
server.bulkLoadHFiles(famPaths, regionName);
return null;
}
- });
+ }.withRetries();
// Periodically do compaction to reduce the number of open file handles.
if (numBulkLoads.get() % 10 == 0) {
// 10 * 50 = 500 open file handles!
- conn.getRegionServerWithRetries(new ServerCallable(conn, tbl,
+ new ServerCallable(conn, tbl,
Bytes.toBytes("aaa")) {
@Override
public Void call() throws Exception {
@@ -160,7 +159,7 @@ public class TestHRegionServerBulkLoad {
numCompactions.incrementAndGet();
return null;
}
- });
+ }.withRetries();
}
}
}