HBASE-5134 Remove getRegionServerWithoutRetries and getRegionServerWithRetries from HConnection Interface
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1229683 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e06aaf4ecc
commit
94342eea88
|
@ -61,6 +61,7 @@ public class HBaseConfiguration extends Configuration {
|
|||
}
|
||||
|
||||
private static void checkDefaultsVersion(Configuration conf) {
|
||||
if (true) return; // REMOVE
|
||||
if (conf.getBoolean("hbase.defaults.for.version.skip", Boolean.FALSE)) return;
|
||||
String defaultsVersion = conf.get("hbase.defaults.for.version");
|
||||
String thisVersion = VersionInfo.getVersion();
|
||||
|
|
|
@ -98,6 +98,10 @@ public class CatalogTracker {
|
|||
// the other end). I made https://issues.apache.org/jira/browse/HBASE-4495 for
|
||||
// doing CT fixup. St.Ack 09/30/2011.
|
||||
//
|
||||
|
||||
// TODO: Timeouts have never been as advertised in here and its worse now
|
||||
// with retries; i.e. the HConnection retries and pause goes ahead whatever
|
||||
// the passed timeout is. Fix.
|
||||
private static final Log LOG = LogFactory.getLog(CatalogTracker.class);
|
||||
private final HConnection connection;
|
||||
private final ZooKeeperWatcher zookeeper;
|
||||
|
@ -620,6 +624,8 @@ public class CatalogTracker {
|
|||
return hostingServer.getRegionInfo(regionName) != null;
|
||||
} catch (ConnectException e) {
|
||||
t = e;
|
||||
} catch (RetriesExhaustedException e) {
|
||||
t = e;
|
||||
} catch (RemoteException e) {
|
||||
IOException ioe = e.unwrapRemoteException();
|
||||
t = ioe;
|
||||
|
@ -679,6 +685,9 @@ public class CatalogTracker {
|
|||
// Pass
|
||||
} catch (ServerNotRunningYetException e) {
|
||||
// Pass -- remote server is not up so can't be carrying .META.
|
||||
} catch (RetriesExhaustedException e) {
|
||||
// Pass -- failed after bunch of retries.
|
||||
LOG.debug("Failed verify meta region location after retries", e);
|
||||
}
|
||||
return connection != null;
|
||||
}
|
||||
|
|
|
@ -166,7 +166,7 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
// Close the previous scanner if it's open
|
||||
if (this.callable != null) {
|
||||
this.callable.setClose();
|
||||
getConnection().getRegionServerWithRetries(callable);
|
||||
callable.withRetries();
|
||||
this.callable = null;
|
||||
}
|
||||
|
||||
|
@ -202,7 +202,7 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
callable = getScannerCallable(localStartKey, nbRows);
|
||||
// Open a scanner on the region server starting at the
|
||||
// beginning of the region
|
||||
getConnection().getRegionServerWithRetries(callable);
|
||||
callable.withRetries();
|
||||
this.currentRegion = callable.getHRegionInfo();
|
||||
if (this.scanMetrics != null) {
|
||||
this.scanMetrics.countOfRegions.inc();
|
||||
|
@ -269,14 +269,14 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
// Skip only the first row (which was the last row of the last
|
||||
// already-processed batch).
|
||||
callable.setCaching(1);
|
||||
values = getConnection().getRegionServerWithRetries(callable);
|
||||
values = callable.withRetries();
|
||||
callable.setCaching(this.caching);
|
||||
skipFirst = false;
|
||||
}
|
||||
// Server returns a null values if scanning is to stop. Else,
|
||||
// returns an empty array if scanning is to go on and we've just
|
||||
// exhausted current region.
|
||||
values = getConnection().getRegionServerWithRetries(callable);
|
||||
values = callable.withRetries();
|
||||
} catch (DoNotRetryIOException e) {
|
||||
if (e instanceof UnknownScannerException) {
|
||||
long timeout = lastNext + scannerTimeout;
|
||||
|
@ -364,7 +364,7 @@ public class ClientScanner extends AbstractClientScanner {
|
|||
if (callable != null) {
|
||||
callable.setClose();
|
||||
try {
|
||||
getConnection().getRegionServerWithRetries(callable);
|
||||
callable.withRetries();
|
||||
} catch (IOException e) {
|
||||
// We used to catch this error, interpret, and rethrow. However, we
|
||||
// have since decided that it's not nice for a scanner's close to
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
|
||||
|
||||
/**
|
||||
* Utility used by client connections such as {@link HConnection} and
|
||||
* {@link ServerCallable}
|
||||
*/
|
||||
public class ConnectionUtils {
|
||||
/**
|
||||
* Calculate pause time.
|
||||
* Built on {@link HConstants#RETRY_BACKOFF}.
|
||||
* @param pause
|
||||
* @param tries
|
||||
* @return How long to wait after <code>tries</code> retries
|
||||
*/
|
||||
public static long getPauseTime(final long pause, final int tries) {
|
||||
int ntries = tries;
|
||||
if (ntries >= HConstants.RETRY_BACKOFF.length) {
|
||||
ntries = HConstants.RETRY_BACKOFF.length - 1;
|
||||
}
|
||||
return pause * HConstants.RETRY_BACKOFF[ntries];
|
||||
}
|
||||
}
|
|
@ -253,6 +253,7 @@ public interface HConnection extends Abortable, Closeable {
|
|||
* @return an object of type T
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
* @throws RuntimeException other unspecified error
|
||||
* @deprecated Use {@link HConnectionManager#withoutRetries(ServerCallable)}
|
||||
*/
|
||||
public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
|
||||
throws IOException, RuntimeException;
|
||||
|
@ -265,6 +266,7 @@ public interface HConnection extends Abortable, Closeable {
|
|||
* @return an object of type T
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
* @throws RuntimeException other unspecified error
|
||||
* @deprecated Use {@link HConnectionManager#withoutRetries(ServerCallable)}
|
||||
*/
|
||||
public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
|
||||
throws IOException, RuntimeException;
|
||||
|
@ -373,4 +375,10 @@ public interface HConnection extends Abortable, Closeable {
|
|||
* @return true if this connection is closed
|
||||
*/
|
||||
public boolean isClosed();
|
||||
|
||||
/**
|
||||
* Clear any caches that pertain to server name <code>sn</code>
|
||||
* @param sn A server name as hostname:port
|
||||
*/
|
||||
public void clearCaches(final String sn);
|
||||
}
|
||||
|
|
|
@ -23,9 +23,7 @@ import java.io.Closeable;
|
|||
import java.io.IOException;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
@ -562,7 +560,6 @@ public class HConnectionManager {
|
|||
throw new UnsupportedOperationException(
|
||||
"Unable to find region server interface " + serverClassName, e);
|
||||
}
|
||||
|
||||
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
|
||||
this.numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||
|
@ -612,16 +609,9 @@ public class HConnectionManager {
|
|||
return this.conf;
|
||||
}
|
||||
|
||||
private long getPauseTime(int tries) {
|
||||
int ntries = tries;
|
||||
if (ntries >= HConstants.RETRY_BACKOFF.length) {
|
||||
ntries = HConstants.RETRY_BACKOFF.length - 1;
|
||||
}
|
||||
return this.pause * HConstants.RETRY_BACKOFF[ntries];
|
||||
}
|
||||
|
||||
public HMasterInterface getMaster()
|
||||
throws MasterNotRunningException, ZooKeeperConnectionException {
|
||||
// TODO: REMOVE. MOVE TO HBaseAdmin and redo as a Callable!!!
|
||||
|
||||
// Check if we already have a good master connection
|
||||
try {
|
||||
|
@ -669,18 +659,18 @@ public class HConnectionManager {
|
|||
} catch (IOException e) {
|
||||
if (tries == numRetries - 1) {
|
||||
// This was our last chance - don't bother sleeping
|
||||
LOG.info("getMaster attempt " + tries + " of " + this.numRetries +
|
||||
LOG.info("getMaster attempt " + tries + " of " + numRetries +
|
||||
" failed; no more retrying.", e);
|
||||
break;
|
||||
}
|
||||
LOG.info("getMaster attempt " + tries + " of " + this.numRetries +
|
||||
LOG.info("getMaster attempt " + tries + " of " + numRetries +
|
||||
" failed; retrying after sleep of " +
|
||||
getPauseTime(tries), e);
|
||||
ConnectionUtils.getPauseTime(this.pause, tries), e);
|
||||
}
|
||||
|
||||
// Cannot connect to master or it is not running. Sleep & retry
|
||||
try {
|
||||
this.masterLock.wait(getPauseTime(tries));
|
||||
this.masterLock.wait(ConnectionUtils.getPauseTime(this.pause, tries));
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException("Thread was interrupted while trying to connect to master.");
|
||||
|
@ -1025,8 +1015,7 @@ public class HConnectionManager {
|
|||
throw e;
|
||||
} catch (IOException e) {
|
||||
if (e instanceof RemoteException) {
|
||||
e = RemoteExceptionHandler.decodeRemoteException(
|
||||
(RemoteException) e);
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
}
|
||||
if (tries < numRetries - 1) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -1035,7 +1024,7 @@ public class HConnectionManager {
|
|||
((metaLocation == null)? "null": "{" + metaLocation + "}") +
|
||||
", attempt=" + tries + " of " +
|
||||
this.numRetries + " failed; retrying after sleep of " +
|
||||
getPauseTime(tries) + " because: " + e.getMessage());
|
||||
ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
|
||||
}
|
||||
} else {
|
||||
throw e;
|
||||
|
@ -1047,7 +1036,7 @@ public class HConnectionManager {
|
|||
}
|
||||
}
|
||||
try{
|
||||
Thread.sleep(getPauseTime(tries));
|
||||
Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IOException("Giving up trying to location region in " +
|
||||
|
@ -1147,14 +1136,18 @@ public class HConnectionManager {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clearCaches(String sn) {
|
||||
clearCachedLocationForServer(sn);
|
||||
}
|
||||
|
||||
/*
|
||||
* Delete all cached entries of a table that maps to a specific location.
|
||||
*
|
||||
* @param tablename
|
||||
* @param server
|
||||
*/
|
||||
private void clearCachedLocationForServer(
|
||||
final String server) {
|
||||
private void clearCachedLocationForServer(final String server) {
|
||||
boolean deletedSomething = false;
|
||||
synchronized (this.cachedRegionLocations) {
|
||||
if (!cachedServers.contains(server)) {
|
||||
|
@ -1338,82 +1331,33 @@ public class HConnectionManager {
|
|||
|
||||
public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
|
||||
throws IOException, RuntimeException {
|
||||
List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
|
||||
new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
|
||||
for(int tries = 0; tries < numRetries; tries++) {
|
||||
try {
|
||||
callable.beforeCall();
|
||||
callable.connect(tries != 0);
|
||||
return callable.call();
|
||||
} catch (Throwable t) {
|
||||
callable.shouldRetry(t);
|
||||
t = translateException(t);
|
||||
if (t instanceof SocketTimeoutException ||
|
||||
t instanceof ConnectException ||
|
||||
t instanceof RetriesExhaustedException) {
|
||||
// if thrown these exceptions, we clear all the cache entries that
|
||||
// map to that slow/dead server; otherwise, let cache miss and ask
|
||||
// .META. again to find the new location
|
||||
HRegionLocation hrl = callable.location;
|
||||
if (hrl != null) {
|
||||
clearCachedLocationForServer(hrl.getServerAddress().toString());
|
||||
}
|
||||
}
|
||||
RetriesExhaustedException.ThrowableWithExtraContext qt =
|
||||
new RetriesExhaustedException.ThrowableWithExtraContext(t,
|
||||
System.currentTimeMillis(), callable.toString());
|
||||
exceptions.add(qt);
|
||||
if (tries == numRetries - 1) {
|
||||
throw new RetriesExhaustedException(tries, exceptions);
|
||||
}
|
||||
} finally {
|
||||
callable.afterCall();
|
||||
}
|
||||
try {
|
||||
Thread.sleep(getPauseTime(tries));
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IOException("Giving up after tries=" + tries, e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
return callable.withRetries();
|
||||
}
|
||||
|
||||
public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
|
||||
throws IOException, RuntimeException {
|
||||
try {
|
||||
callable.beforeCall();
|
||||
callable.connect(false);
|
||||
return callable.call();
|
||||
} catch (Throwable t) {
|
||||
Throwable t2 = translateException(t);
|
||||
if (t2 instanceof IOException) {
|
||||
throw (IOException)t2;
|
||||
} else {
|
||||
throw new RuntimeException(t2);
|
||||
}
|
||||
} finally {
|
||||
callable.afterCall();
|
||||
}
|
||||
throws IOException, RuntimeException {
|
||||
return callable.withoutRetries();
|
||||
}
|
||||
|
||||
private <R> Callable<MultiResponse> createCallable(final HRegionLocation loc,
|
||||
final MultiAction<R> multi, final byte [] tableName) {
|
||||
// TODO: This does not belong in here!!! St.Ack HConnections should
|
||||
// not be dealing in Callables; Callables have HConnections, not other
|
||||
// way around.
|
||||
final HConnection connection = this;
|
||||
return new Callable<MultiResponse>() {
|
||||
public MultiResponse call() throws IOException {
|
||||
return getRegionServerWithoutRetries(
|
||||
new ServerCallable<MultiResponse>(connection, tableName, null) {
|
||||
public MultiResponse call() throws IOException {
|
||||
return server.multi(multi);
|
||||
}
|
||||
@Override
|
||||
public void connect(boolean reload) throws IOException {
|
||||
server =
|
||||
connection.getHRegionConnection(loc.getHostname(), loc.getPort());
|
||||
}
|
||||
ServerCallable<MultiResponse> callable =
|
||||
new ServerCallable<MultiResponse>(connection, tableName, null) {
|
||||
public MultiResponse call() throws IOException {
|
||||
return server.multi(multi);
|
||||
}
|
||||
);
|
||||
@Override
|
||||
public void connect(boolean reload) throws IOException {
|
||||
server = connection.getHRegionConnection(loc.getHostname(), loc.getPort());
|
||||
}
|
||||
};
|
||||
return callable.withoutRetries();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -1422,6 +1366,7 @@ public class HConnectionManager {
|
|||
final byte[] tableName,
|
||||
ExecutorService pool,
|
||||
Object[] results) throws IOException, InterruptedException {
|
||||
// This belongs in HTable!!! Not in here. St.Ack
|
||||
|
||||
// results must be the same size as list
|
||||
if (results.length != list.size()) {
|
||||
|
@ -1508,6 +1453,7 @@ public class HConnectionManager {
|
|||
Object[] results,
|
||||
Batch.Callback<R> callback)
|
||||
throws IOException, InterruptedException {
|
||||
// This belongs in HTable!!! Not in here. St.Ack
|
||||
|
||||
// results must be the same size as list
|
||||
if (results.length != list.size()) {
|
||||
|
@ -1527,13 +1473,12 @@ public class HConnectionManager {
|
|||
boolean retry = true;
|
||||
// count that helps presize actions array
|
||||
int actionCount = 0;
|
||||
Throwable singleRowCause = null;
|
||||
|
||||
for (int tries = 0; tries < numRetries && retry; ++tries) {
|
||||
|
||||
// sleep first, if this is a retry
|
||||
if (tries >= 1) {
|
||||
long sleepTime = getPauseTime(tries);
|
||||
long sleepTime = ConnectionUtils.getPauseTime(this.pause, tries);
|
||||
LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
|
||||
Thread.sleep(sleepTime);
|
||||
}
|
||||
|
@ -1639,14 +1584,6 @@ public class HConnectionManager {
|
|||
}
|
||||
}
|
||||
|
||||
if (retry) {
|
||||
// Simple little check for 1 item failures.
|
||||
if (singleRowCause != null) {
|
||||
throw new IOException(singleRowCause);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
List<Throwable> exceptions = new ArrayList<Throwable>(actionCount);
|
||||
List<Row> actions = new ArrayList<Row>(actionCount);
|
||||
List<String> addresses = new ArrayList<String>(actionCount);
|
||||
|
@ -1666,19 +1603,6 @@ public class HConnectionManager {
|
|||
}
|
||||
}
|
||||
|
||||
private Throwable translateException(Throwable t) throws IOException {
|
||||
if (t instanceof UndeclaredThrowableException) {
|
||||
t = t.getCause();
|
||||
}
|
||||
if (t instanceof RemoteException) {
|
||||
t = RemoteExceptionHandler.decodeRemoteException((RemoteException)t);
|
||||
}
|
||||
if (t instanceof DoNotRetryIOException) {
|
||||
throw (DoNotRetryIOException)t;
|
||||
}
|
||||
return t;
|
||||
}
|
||||
|
||||
/*
|
||||
* Return the number of cached region for a table. It will only be called
|
||||
* from a unit test.
|
||||
|
|
|
@ -395,7 +395,6 @@ public class HTable implements HTableInterface, Closeable {
|
|||
* @return Pair of arrays of region starting and ending row keys
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
|
||||
final List<byte[]> startKeyList = new ArrayList<byte[]>();
|
||||
final List<byte[]> endKeyList = new ArrayList<byte[]>();
|
||||
|
@ -569,13 +568,12 @@ public class HTable implements HTableInterface, Closeable {
|
|||
@Override
|
||||
public Result getRowOrBefore(final byte[] row, final byte[] family)
|
||||
throws IOException {
|
||||
return connection.getRegionServerWithRetries(
|
||||
new ServerCallable<Result>(connection, tableName, row, operationTimeout) {
|
||||
return new ServerCallable<Result>(connection, tableName, row, operationTimeout) {
|
||||
public Result call() throws IOException {
|
||||
return server.getClosestRowBefore(location.getRegionInfo().getRegionName(),
|
||||
row, family);
|
||||
}
|
||||
});
|
||||
}.withRetries();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -616,13 +614,11 @@ public class HTable implements HTableInterface, Closeable {
|
|||
*/
|
||||
@Override
|
||||
public Result get(final Get get) throws IOException {
|
||||
return connection.getRegionServerWithRetries(
|
||||
new ServerCallable<Result>(connection, tableName, get.getRow(), operationTimeout) {
|
||||
return new ServerCallable<Result>(connection, tableName, get.getRow(), operationTimeout) {
|
||||
public Result call() throws IOException {
|
||||
return server.get(location.getRegionInfo().getRegionName(), get);
|
||||
}
|
||||
}
|
||||
);
|
||||
}.withRetries();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -672,14 +668,12 @@ public class HTable implements HTableInterface, Closeable {
|
|||
@Override
|
||||
public void delete(final Delete delete)
|
||||
throws IOException {
|
||||
connection.getRegionServerWithRetries(
|
||||
new ServerCallable<Boolean>(connection, tableName, delete.getRow(), operationTimeout) {
|
||||
new ServerCallable<Boolean>(connection, tableName, delete.getRow(), operationTimeout) {
|
||||
public Boolean call() throws IOException {
|
||||
server.delete(location.getRegionInfo().getRegionName(), delete);
|
||||
return null; // FindBugs NP_BOOLEAN_RETURN_NULL
|
||||
}
|
||||
}
|
||||
);
|
||||
}.withRetries();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -749,14 +743,12 @@ public class HTable implements HTableInterface, Closeable {
|
|||
throw new IOException(
|
||||
"Invalid arguments to append, no columns specified");
|
||||
}
|
||||
return connection.getRegionServerWithRetries(
|
||||
new ServerCallable<Result>(connection, tableName, append.getRow(), operationTimeout) {
|
||||
return new ServerCallable<Result>(connection, tableName, append.getRow(), operationTimeout) {
|
||||
public Result call() throws IOException {
|
||||
return server.append(
|
||||
location.getRegionInfo().getRegionName(), append);
|
||||
}
|
||||
}
|
||||
);
|
||||
}.withRetries();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -768,14 +760,12 @@ public class HTable implements HTableInterface, Closeable {
|
|||
throw new IOException(
|
||||
"Invalid arguments to increment, no columns specified");
|
||||
}
|
||||
return connection.getRegionServerWithRetries(
|
||||
new ServerCallable<Result>(connection, tableName, increment.getRow(), operationTimeout) {
|
||||
return new ServerCallable<Result>(connection, tableName, increment.getRow(), operationTimeout) {
|
||||
public Result call() throws IOException {
|
||||
return server.increment(
|
||||
location.getRegionInfo().getRegionName(), increment);
|
||||
}
|
||||
}
|
||||
);
|
||||
}.withRetries();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -805,15 +795,13 @@ public class HTable implements HTableInterface, Closeable {
|
|||
throw new IOException(
|
||||
"Invalid arguments to incrementColumnValue", npe);
|
||||
}
|
||||
return connection.getRegionServerWithRetries(
|
||||
new ServerCallable<Long>(connection, tableName, row, operationTimeout) {
|
||||
return new ServerCallable<Long>(connection, tableName, row, operationTimeout) {
|
||||
public Long call() throws IOException {
|
||||
return server.incrementColumnValue(
|
||||
location.getRegionInfo().getRegionName(), row, family,
|
||||
qualifier, amount, writeToWAL);
|
||||
}
|
||||
}
|
||||
);
|
||||
}.withRetries();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -824,14 +812,12 @@ public class HTable implements HTableInterface, Closeable {
|
|||
final byte [] family, final byte [] qualifier, final byte [] value,
|
||||
final Put put)
|
||||
throws IOException {
|
||||
return connection.getRegionServerWithRetries(
|
||||
new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
|
||||
return new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
|
||||
public Boolean call() throws IOException {
|
||||
return server.checkAndPut(location.getRegionInfo().getRegionName(),
|
||||
row, family, qualifier, value, put) ? Boolean.TRUE : Boolean.FALSE;
|
||||
}
|
||||
}
|
||||
);
|
||||
}.withRetries();
|
||||
}
|
||||
|
||||
|
||||
|
@ -843,16 +829,14 @@ public class HTable implements HTableInterface, Closeable {
|
|||
final byte [] family, final byte [] qualifier, final byte [] value,
|
||||
final Delete delete)
|
||||
throws IOException {
|
||||
return connection.getRegionServerWithRetries(
|
||||
new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
|
||||
return new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
|
||||
public Boolean call() throws IOException {
|
||||
return server.checkAndDelete(
|
||||
location.getRegionInfo().getRegionName(),
|
||||
row, family, qualifier, value, delete)
|
||||
? Boolean.TRUE : Boolean.FALSE;
|
||||
}
|
||||
}
|
||||
);
|
||||
}.withRetries();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -860,14 +844,12 @@ public class HTable implements HTableInterface, Closeable {
|
|||
*/
|
||||
@Override
|
||||
public boolean exists(final Get get) throws IOException {
|
||||
return connection.getRegionServerWithRetries(
|
||||
new ServerCallable<Boolean>(connection, tableName, get.getRow(), operationTimeout) {
|
||||
return new ServerCallable<Boolean>(connection, tableName, get.getRow(), operationTimeout) {
|
||||
public Boolean call() throws IOException {
|
||||
return server.
|
||||
exists(location.getRegionInfo().getRegionName(), get);
|
||||
}
|
||||
}
|
||||
);
|
||||
}.withRetries();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -947,15 +929,13 @@ public class HTable implements HTableInterface, Closeable {
|
|||
@Override
|
||||
public RowLock lockRow(final byte [] row)
|
||||
throws IOException {
|
||||
return connection.getRegionServerWithRetries(
|
||||
new ServerCallable<RowLock>(connection, tableName, row, operationTimeout) {
|
||||
return new ServerCallable<RowLock>(connection, tableName, row, operationTimeout) {
|
||||
public RowLock call() throws IOException {
|
||||
long lockId =
|
||||
server.lockRow(location.getRegionInfo().getRegionName(), row);
|
||||
return new RowLock(row,lockId);
|
||||
}
|
||||
}
|
||||
);
|
||||
}.withRetries();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -964,15 +944,13 @@ public class HTable implements HTableInterface, Closeable {
|
|||
@Override
|
||||
public void unlockRow(final RowLock rl)
|
||||
throws IOException {
|
||||
connection.getRegionServerWithRetries(
|
||||
new ServerCallable<Boolean>(connection, tableName, rl.getRow(), operationTimeout) {
|
||||
new ServerCallable<Boolean>(connection, tableName, rl.getRow(), operationTimeout) {
|
||||
public Boolean call() throws IOException {
|
||||
server.unlockRow(location.getRegionInfo().getRegionName(),
|
||||
rl.getLockId());
|
||||
return null; // FindBugs NP_BOOLEAN_RETURN_NULL
|
||||
}
|
||||
}
|
||||
);
|
||||
}.withRetries();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -23,21 +23,17 @@ package org.apache.hadoop.hbase.client;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
|
||||
import org.apache.hadoop.hbase.util.Addressing;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
|
||||
|
@ -191,7 +187,7 @@ public class MetaScanner {
|
|||
}
|
||||
callable = new ScannerCallable(connection, metaTableName, scan, null);
|
||||
// Open scanner
|
||||
connection.getRegionServerWithRetries(callable);
|
||||
callable.withRetries();
|
||||
|
||||
int processedRows = 0;
|
||||
try {
|
||||
|
@ -201,7 +197,7 @@ public class MetaScanner {
|
|||
break;
|
||||
}
|
||||
//we have all the rows here
|
||||
Result [] rrs = connection.getRegionServerWithRetries(callable);
|
||||
Result [] rrs = callable.withRetries();
|
||||
if (rrs == null || rrs.length == 0 || rrs[0].size() == 0) {
|
||||
break; //exit completely
|
||||
}
|
||||
|
@ -220,7 +216,7 @@ public class MetaScanner {
|
|||
} finally {
|
||||
// Close scanner
|
||||
callable.setClose();
|
||||
connection.getRegionServerWithRetries(callable);
|
||||
callable.withRetries();
|
||||
}
|
||||
} while (Bytes.compareTo(startRow, HConstants.LAST_ROW) != 0);
|
||||
}
|
||||
|
|
|
@ -21,14 +21,21 @@
|
|||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRPC;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
||||
/**
|
||||
* Abstract class that implements {@link Callable}. Implementation stipulates
|
||||
|
@ -123,4 +130,106 @@ public abstract class ServerCallable<T> implements Callable<T> {
|
|||
this.callTimeout = ((int) (this.endTime - this.startTime));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return {@link HConnection} instance used by this Callable.
|
||||
*/
|
||||
HConnection getConnection() {
|
||||
return this.connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run this instance with retries, timed waits,
|
||||
* and refinds of missing regions.
|
||||
*
|
||||
* @param <T> the type of the return value
|
||||
* @return an object of type T
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
* @throws RuntimeException other unspecified error
|
||||
*/
|
||||
public T withRetries()
|
||||
throws IOException, RuntimeException {
|
||||
Configuration c = getConnection().getConfiguration();
|
||||
final long pause = c.getLong(HConstants.HBASE_CLIENT_PAUSE,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
|
||||
final int numRetries = c.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
|
||||
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
|
||||
List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
|
||||
new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
|
||||
for (int tries = 0; tries < numRetries; tries++) {
|
||||
try {
|
||||
beforeCall();
|
||||
connect(tries != 0);
|
||||
return call();
|
||||
} catch (Throwable t) {
|
||||
shouldRetry(t);
|
||||
t = translateException(t);
|
||||
if (t instanceof SocketTimeoutException ||
|
||||
t instanceof ConnectException ||
|
||||
t instanceof RetriesExhaustedException) {
|
||||
// if thrown these exceptions, we clear all the cache entries that
|
||||
// map to that slow/dead server; otherwise, let cache miss and ask
|
||||
// .META. again to find the new location
|
||||
HRegionLocation hrl = location;
|
||||
if (hrl != null) {
|
||||
getConnection().clearCaches(hrl.getServerAddress().toString());
|
||||
}
|
||||
}
|
||||
RetriesExhaustedException.ThrowableWithExtraContext qt =
|
||||
new RetriesExhaustedException.ThrowableWithExtraContext(t,
|
||||
System.currentTimeMillis(), toString());
|
||||
exceptions.add(qt);
|
||||
if (tries == numRetries - 1) {
|
||||
throw new RetriesExhaustedException(tries, exceptions);
|
||||
}
|
||||
} finally {
|
||||
afterCall();
|
||||
}
|
||||
try {
|
||||
Thread.sleep(ConnectionUtils.getPauseTime(pause, tries));
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IOException("Giving up after tries=" + tries, e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run this instance against the server once.
|
||||
* @param <T> the type of the return value
|
||||
* @return an object of type T
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
* @throws RuntimeException other unspecified error
|
||||
*/
|
||||
public T withoutRetries()
|
||||
throws IOException, RuntimeException {
|
||||
try {
|
||||
beforeCall();
|
||||
connect(false);
|
||||
return call();
|
||||
} catch (Throwable t) {
|
||||
Throwable t2 = translateException(t);
|
||||
if (t2 instanceof IOException) {
|
||||
throw (IOException)t2;
|
||||
} else {
|
||||
throw new RuntimeException(t2);
|
||||
}
|
||||
} finally {
|
||||
afterCall();
|
||||
}
|
||||
}
|
||||
|
||||
private static Throwable translateException(Throwable t) throws IOException {
|
||||
if (t instanceof UndeclaredThrowableException) {
|
||||
t = t.getCause();
|
||||
}
|
||||
if (t instanceof RemoteException) {
|
||||
t = ((RemoteException)t).unwrapRemoteException();
|
||||
}
|
||||
if (t instanceof DoNotRetryIOException) {
|
||||
throw (DoNotRetryIOException)t;
|
||||
}
|
||||
return t;
|
||||
}
|
||||
}
|
|
@ -76,7 +76,7 @@ public class ExecRPCInvoker implements InvocationHandler {
|
|||
exec);
|
||||
}
|
||||
};
|
||||
ExecResult result = connection.getRegionServerWithRetries(callable);
|
||||
ExecResult result = callable.withRetries();
|
||||
this.regionName = result.getRegionName();
|
||||
LOG.debug("Result is region="+ Bytes.toStringBinary(regionName) +
|
||||
", value="+result.getValue());
|
||||
|
|
|
@ -484,7 +484,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
|
|||
|
||||
try {
|
||||
List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
|
||||
boolean success = conn.getRegionServerWithRetries(svrCallable);
|
||||
boolean success = svrCallable.withRetries();
|
||||
if (!success) {
|
||||
LOG.warn("Attempt to bulk load region containing "
|
||||
+ Bytes.toStringBinary(first) + " into table "
|
||||
|
|
|
@ -1393,9 +1393,11 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
this.regionsInTransition.put(encodedName, state);
|
||||
} else {
|
||||
// If we are reassigning the node do not force in-memory state to OFFLINE.
|
||||
// Based on the znode state we will decide if to change
|
||||
// in-memory state to OFFLINE or not. It will
|
||||
// be done before setting the znode to OFFLINE state.
|
||||
// Based on the znode state we will decide if to change in-memory state to
|
||||
// OFFLINE or not. It will be done before setting znode to OFFLINE state.
|
||||
|
||||
// We often get here with state == CLOSED because ClosedRegionHandler will
|
||||
// assign on its tail as part of the handling of a region close.
|
||||
if (!hijack) {
|
||||
LOG.debug("Forcing OFFLINE; was=" + state);
|
||||
state.update(RegionState.State.OFFLINE);
|
||||
|
|
|
@ -97,7 +97,10 @@ public class ClosedRegionHandler extends EventHandler implements TotesHRegionInf
|
|||
return;
|
||||
}
|
||||
// ZK Node is in CLOSED state, assign it.
|
||||
// TODO: Should we remove the region from RIT too? We don't? Makes for
|
||||
// a 'forcing' log message when we go to update state from CLOSED to OFFLINE
|
||||
assignmentManager.setOffline(regionInfo);
|
||||
// This below has to do w/ online enable/disable of a table
|
||||
assignmentManager.removeClosedRegion(regionInfo);
|
||||
assignmentManager.assign(regionInfo, true);
|
||||
}
|
||||
|
|
|
@ -168,12 +168,14 @@ public class ServerShutdownHandler extends EventHandler {
|
|||
@Override
|
||||
public void process() throws IOException {
|
||||
final ServerName serverName = this.serverName;
|
||||
|
||||
try {
|
||||
|
||||
try {
|
||||
LOG.info("Splitting logs for " + serverName);
|
||||
this.services.getMasterFileSystem().splitLog(serverName);
|
||||
if (this.shouldSplitHlog) {
|
||||
LOG.info("Splitting logs for " + serverName);
|
||||
this.services.getMasterFileSystem().splitLog(serverName);
|
||||
} else {
|
||||
LOG.info("Skipping log splitting for " + serverName);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
this.services.getExecutorService().submit(this);
|
||||
this.deadServers.add(serverName);
|
||||
|
@ -186,7 +188,7 @@ public class ServerShutdownHandler extends EventHandler {
|
|||
LOG.info("Server " + serverName +
|
||||
" was carrying ROOT. Trying to assign.");
|
||||
this.services.getAssignmentManager().
|
||||
regionOffline(HRegionInfo.ROOT_REGIONINFO);
|
||||
regionOffline(HRegionInfo.ROOT_REGIONINFO);
|
||||
verifyAndAssignRootWithRetries();
|
||||
}
|
||||
|
||||
|
@ -195,7 +197,7 @@ public class ServerShutdownHandler extends EventHandler {
|
|||
LOG.info("Server " + serverName +
|
||||
" was carrying META. Trying to assign.");
|
||||
this.services.getAssignmentManager().
|
||||
regionOffline(HRegionInfo.FIRST_META_REGIONINFO);
|
||||
regionOffline(HRegionInfo.FIRST_META_REGIONINFO);
|
||||
this.services.getAssignmentManager().assignMeta();
|
||||
}
|
||||
|
||||
|
@ -228,9 +230,8 @@ public class ServerShutdownHandler extends EventHandler {
|
|||
// OFFLINE? -- and then others after like CLOSING that depend on log
|
||||
// splitting.
|
||||
List<RegionState> regionsInTransition =
|
||||
this.services.getAssignmentManager()
|
||||
.processServerShutdown(this.serverName);
|
||||
|
||||
this.services.getAssignmentManager().
|
||||
processServerShutdown(this.serverName);
|
||||
|
||||
// Wait on meta to come online; we need it to progress.
|
||||
// TODO: Best way to hold strictly here? We should build this retry logic
|
||||
|
@ -252,7 +253,7 @@ public class ServerShutdownHandler extends EventHandler {
|
|||
try {
|
||||
this.server.getCatalogTracker().waitForMeta();
|
||||
hris = MetaReader.getServerUserRegions(this.server.getCatalogTracker(),
|
||||
this.serverName);
|
||||
this.serverName);
|
||||
break;
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
|
@ -267,8 +268,8 @@ public class ServerShutdownHandler extends EventHandler {
|
|||
for (RegionState rit : regionsInTransition) {
|
||||
if (!rit.isClosing() && !rit.isPendingClose()) {
|
||||
LOG.debug("Removed " + rit.getRegion().getRegionNameAsString() +
|
||||
" from list of regions to assign because in RIT" + " region state: "
|
||||
+ rit.getState());
|
||||
" from list of regions to assign because in RIT; region state: " +
|
||||
rit.getState());
|
||||
if (hris != null) hris.remove(rit.getRegion());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.io.IOException;
|
|||
import java.net.ConnectException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.client.HConnection;
|
|||
import org.apache.hadoop.hbase.client.HConnectionManager;
|
||||
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
|
||||
import org.apache.hadoop.hbase.client.ServerCallable;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -68,6 +70,8 @@ public class TestCatalogTracker {
|
|||
private Abortable abortable;
|
||||
|
||||
@BeforeClass public static void beforeClass() throws Exception {
|
||||
// Set this down so tests run quicker
|
||||
UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
|
||||
UTIL.startMiniZKCluster();
|
||||
}
|
||||
|
||||
|
@ -130,7 +134,8 @@ public class TestCatalogTracker {
|
|||
*/
|
||||
@Test public void testInterruptWaitOnMetaAndRoot()
|
||||
throws IOException, InterruptedException {
|
||||
HConnection connection = mockConnection(null);
|
||||
HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
|
||||
HConnection connection = mockConnection(implementation);
|
||||
try {
|
||||
final CatalogTracker ct = constructAndStartCatalogTracker(connection);
|
||||
ServerName hsa = ct.getRootLocation();
|
||||
|
@ -180,21 +185,48 @@ public class TestCatalogTracker {
|
|||
Mockito.when(implementation.getRegionInfo((byte[]) Mockito.any())).
|
||||
thenThrow(new IOException("Server not running, aborting")).
|
||||
thenReturn(new HRegionInfo());
|
||||
|
||||
// After we encounter the above 'Server not running', we should catch the
|
||||
// IOE and go into retrying for the meta mode. We'll do gets on -ROOT- to
|
||||
// get new meta location. Return something so this 'get' succeeds
|
||||
// (here we mock up getRegionServerWithRetries, the wrapper around
|
||||
// the actual get).
|
||||
|
||||
// TODO: Refactor. This method has been moved out of HConnection.
|
||||
// It works for now but has been deprecated.
|
||||
Mockito.when(connection.getRegionServerWithRetries((ServerCallable<Result>)Mockito.any())).
|
||||
thenReturn(getMetaTableRowResult());
|
||||
|
||||
// Now start up the catalogtracker with our doctored Connection.
|
||||
final CatalogTracker ct = constructAndStartCatalogTracker(connection);
|
||||
try {
|
||||
// Set a location for root and meta.
|
||||
RootLocationEditor.setRootLocation(this.watcher, SN);
|
||||
ct.setMetaLocation(SN);
|
||||
// Call the method that HBASE-4288 calls.
|
||||
Assert.assertFalse(ct.waitForMetaServerConnectionDefault() == null);
|
||||
// Call the method that HBASE-4288 calls. It will try and verify the
|
||||
// meta location and will fail on first attempt then go into a long wait.
|
||||
// So, do this in a thread and then reset meta location to break it out
|
||||
// of its wait after a bit of time.
|
||||
final AtomicBoolean metaSet = new AtomicBoolean(false);
|
||||
Thread t = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
metaSet.set(ct.waitForMetaServerConnectionDefault() != null);
|
||||
} catch (NotAllMetaRegionsOnlineException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
t.start();
|
||||
while(!t.isAlive()) Threads.sleep(1);
|
||||
Threads.sleep(1);
|
||||
// Now reset the meta as though it were redeployed.
|
||||
ct.setMetaLocation(SN);
|
||||
t.join();
|
||||
Assert.assertTrue(metaSet.get());
|
||||
} finally {
|
||||
// Clean out root and meta locations or later tests will be confused...
|
||||
// they presume start fresh in zk.
|
||||
|
@ -280,7 +312,7 @@ public class TestCatalogTracker {
|
|||
ct.waitForRoot(100);
|
||||
}
|
||||
|
||||
@Test (expected = NotAllMetaRegionsOnlineException.class)
|
||||
@Test (expected = RetriesExhaustedException.class)
|
||||
public void testTimeoutWaitForMeta()
|
||||
throws IOException, InterruptedException {
|
||||
HConnection connection =
|
||||
|
@ -345,6 +377,9 @@ public class TestCatalogTracker {
|
|||
// location (no matter what the Get is). Same for getHRegionInfo -- always
|
||||
// just return the meta region.
|
||||
final Result result = getMetaTableRowResult();
|
||||
|
||||
// TODO: Refactor. This method has been moved out of HConnection.
|
||||
// It works for now but has been deprecated.
|
||||
Mockito.when(connection.getRegionServerWithRetries((ServerCallable<Result>)Mockito.any())).
|
||||
thenReturn(result);
|
||||
Mockito.when(implementation.getRegionInfo((byte[]) Mockito.any())).
|
||||
|
|
|
@ -17,12 +17,16 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation;
|
||||
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionKey;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
/**
|
||||
|
@ -60,6 +64,53 @@ public class HConnectionTestingUtility {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls {@link #getMockedConnection(Configuration)} and then mocks a few
|
||||
* more of the popular {@link HConnection} methods so they do 'normal'
|
||||
* operation (see return doc below for list). Be sure to shutdown the
|
||||
* connection when done by calling
|
||||
* {@link HConnectionManager#deleteConnection(Configuration, boolean)} else it
|
||||
* will stick around; this is probably not what you want.
|
||||
* @param implementation An {@link HRegionInterface} instance; you'll likely
|
||||
* want to pass a mocked HRS; can be null.
|
||||
*
|
||||
* @param conf Configuration to use
|
||||
* @param implementation An HRegionInterface; can be null but is usually
|
||||
* itself a mock.
|
||||
* @param sn ServerName to include in the region location returned by this
|
||||
* <code>implementation</code>
|
||||
* @param hri HRegionInfo to include in the location returned when
|
||||
* getRegionLocation is called on the mocked connection
|
||||
* @return Mock up a connection that returns a {@link Configuration} when
|
||||
* {@link HConnection#getConfiguration()} is called, a 'location' when
|
||||
* {@link HConnection#getRegionLocation(byte[], byte[], boolean)} is called,
|
||||
* and that returns the passed {@link HRegionInterface} instance when
|
||||
* {@link HConnection#getHRegionConnection(String, int)}
|
||||
* is called (Be sure call
|
||||
* {@link HConnectionManager#deleteConnection(org.apache.hadoop.conf.Configuration, boolean)}
|
||||
* when done with this mocked Connection.
|
||||
* @throws IOException
|
||||
*/
|
||||
public static HConnection getMockedConnectionAndDecorate(final Configuration conf,
|
||||
final HRegionInterface implementation, final ServerName sn, final HRegionInfo hri)
|
||||
throws IOException {
|
||||
HConnection c = HConnectionTestingUtility.getMockedConnection(conf);
|
||||
Mockito.doNothing().when(c).close();
|
||||
// Make it so we return a particular location when asked.
|
||||
final HRegionLocation loc = new HRegionLocation(hri, sn.getHostname(), sn.getPort());
|
||||
Mockito.when(c.getRegionLocation((byte[]) Mockito.any(),
|
||||
(byte[]) Mockito.any(), Mockito.anyBoolean())).
|
||||
thenReturn(loc);
|
||||
Mockito.when(c.locateRegion((byte[]) Mockito.any(), (byte[]) Mockito.any())).
|
||||
thenReturn(loc);
|
||||
if (implementation != null) {
|
||||
// If a call to getHRegionConnection, return this implementation.
|
||||
Mockito.when(c.getHRegionConnection(Mockito.anyString(), Mockito.anyInt())).
|
||||
thenReturn(implementation);
|
||||
}
|
||||
return c;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a Mockito spied-upon {@link HConnection} that goes with the passed
|
||||
* <code>conf</code> configuration instance.
|
||||
|
@ -85,7 +136,6 @@ public class HConnectionTestingUtility {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return Count of extant connection instances
|
||||
*/
|
||||
|
|
|
@ -20,9 +20,6 @@ package org.apache.hadoop.hbase.mapreduce;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
@ -35,16 +32,23 @@ import java.util.concurrent.atomic.AtomicInteger;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.LargeTests;
|
||||
import org.apache.hadoop.hbase.TableExistsException;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.ServerCallable;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -52,9 +56,10 @@ import org.apache.hadoop.hbase.util.Pair;
|
|||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.common.collect.Multimap;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
/**
|
||||
* Test cases for the atomic load error handling of the bulk load functionality.
|
||||
|
@ -226,13 +231,13 @@ public class TestLoadIncrementalHFilesSplitRecovery {
|
|||
util.getConfiguration()) {
|
||||
|
||||
protected List<LoadQueueItem> tryAtomicRegionLoad(final HConnection conn,
|
||||
byte[] tableName, final byte[] first, Collection<LoadQueueItem> lqis) throws IOException {
|
||||
byte[] tableName, final byte[] first, Collection<LoadQueueItem> lqis)
|
||||
throws IOException {
|
||||
int i = attmptedCalls.incrementAndGet();
|
||||
if (i == 1) {
|
||||
HConnection errConn = mock(HConnection.class);
|
||||
HConnection errConn = null;
|
||||
try {
|
||||
doThrow(new IOException("injecting bulk load error")).when(errConn)
|
||||
.getRegionServerWithRetries((ServerCallable) anyObject());
|
||||
errConn = getMockedConnection(util.getConfiguration());
|
||||
} catch (Exception e) {
|
||||
LOG.fatal("mocking cruft, should never happen", e);
|
||||
throw new RuntimeException("mocking cruft, should never happen");
|
||||
|
@ -253,6 +258,27 @@ public class TestLoadIncrementalHFilesSplitRecovery {
|
|||
fail("doBulkLoad should have thrown an exception");
|
||||
}
|
||||
|
||||
private HConnection getMockedConnection(final Configuration conf)
|
||||
throws IOException {
|
||||
HConnection c = Mockito.mock(HConnection.class);
|
||||
Mockito.when(c.getConfiguration()).thenReturn(conf);
|
||||
Mockito.doNothing().when(c).close();
|
||||
// Make it so we return a particular location when asked.
|
||||
final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
|
||||
"example.org", 1234);
|
||||
Mockito.when(c.getRegionLocation((byte[]) Mockito.any(),
|
||||
(byte[]) Mockito.any(), Mockito.anyBoolean())).
|
||||
thenReturn(loc);
|
||||
Mockito.when(c.locateRegion((byte[]) Mockito.any(), (byte[]) Mockito.any())).
|
||||
thenReturn(loc);
|
||||
HRegionInterface hri = Mockito.mock(HRegionInterface.class);
|
||||
Mockito.when(hri.bulkLoadHFiles(Mockito.anyList(), (byte [])Mockito.any())).
|
||||
thenThrow(new IOException("injecting bulk load error"));
|
||||
Mockito.when(c.getHRegionConnection(Mockito.anyString(), Mockito.anyInt())).
|
||||
thenReturn(hri);
|
||||
return c;
|
||||
}
|
||||
|
||||
/**
|
||||
* This test exercises the path where there is a split after initial
|
||||
* validation but before the atomic bulk load call. We cannot use presplitting
|
||||
|
|
|
@ -17,20 +17,36 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.SmallTests;
|
||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
|
||||
import org.apache.hadoop.hbase.executor.RegionTransitionData;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
|
@ -51,12 +67,17 @@ import org.mockito.Mockito;
|
|||
@Category(SmallTests.class)
|
||||
public class TestAssignmentManager {
|
||||
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
|
||||
private static final ServerName RANDOM_SERVERNAME =
|
||||
private static final ServerName SERVERNAME_A =
|
||||
new ServerName("example.org", 1234, 5678);
|
||||
private static final ServerName SERVERNAME_B =
|
||||
new ServerName("example.org", 0, 5678);
|
||||
private static final HRegionInfo REGIONINFO =
|
||||
new HRegionInfo(Bytes.toBytes("t"),
|
||||
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
|
||||
|
||||
// Mocked objects or; get redone for each test.
|
||||
private Server server;
|
||||
private ServerManager serverManager;
|
||||
private CatalogTracker ct;
|
||||
private ExecutorService executor;
|
||||
private ZooKeeperWatcher watcher;
|
||||
|
||||
@BeforeClass
|
||||
|
@ -71,22 +92,38 @@ public class TestAssignmentManager {
|
|||
|
||||
@Before
|
||||
public void before() throws ZooKeeperConnectionException, IOException {
|
||||
// TODO: Make generic versions of what we do below and put up in a mocking
|
||||
// utility class or move up into HBaseTestingUtility.
|
||||
|
||||
// Mock a Server. Have it return a legit Configuration and ZooKeeperWatcher.
|
||||
// If abort is called, be sure to fail the test (don't just swallow it
|
||||
// silently as is mockito default).
|
||||
this.server = Mockito.mock(Server.class);
|
||||
Mockito.when(server.getConfiguration()).thenReturn(HTU.getConfiguration());
|
||||
this.watcher =
|
||||
new ZooKeeperWatcher(HTU.getConfiguration(), "mocked server", this.server, true);
|
||||
new ZooKeeperWatcher(HTU.getConfiguration(), "mockedServer", this.server, true);
|
||||
Mockito.when(server.getZooKeeper()).thenReturn(this.watcher);
|
||||
Mockito.doThrow(new RuntimeException("Aborted")).
|
||||
when(server).abort(Mockito.anyString(), (Throwable)Mockito.anyObject());
|
||||
// Mock a ServerManager. Say the server RANDOME_SERVERNAME is online.
|
||||
// Also, when someone sends sendRegionClose, say true it succeeded.
|
||||
|
||||
// Mock a ServerManager. Say server SERVERNAME_{A,B} are online. Also
|
||||
// make it so if close or open, we return 'success'.
|
||||
this.serverManager = Mockito.mock(ServerManager.class);
|
||||
Mockito.when(this.serverManager.isServerOnline(RANDOM_SERVERNAME)).thenReturn(true);
|
||||
this.ct = Mockito.mock(CatalogTracker.class);
|
||||
this.executor = Mockito.mock(ExecutorService.class);
|
||||
Mockito.when(this.serverManager.isServerOnline(SERVERNAME_A)).thenReturn(true);
|
||||
Mockito.when(this.serverManager.isServerOnline(SERVERNAME_B)).thenReturn(true);
|
||||
final List<ServerName> onlineServers = new ArrayList<ServerName>();
|
||||
onlineServers.add(SERVERNAME_B);
|
||||
onlineServers.add(SERVERNAME_A);
|
||||
Mockito.when(this.serverManager.getOnlineServersList()).thenReturn(onlineServers);
|
||||
Mockito.when(this.serverManager.sendRegionClose(SERVERNAME_A, REGIONINFO, -1)).
|
||||
thenReturn(true);
|
||||
Mockito.when(this.serverManager.sendRegionClose(SERVERNAME_B, REGIONINFO, -1)).
|
||||
thenReturn(true);
|
||||
// Ditto on open.
|
||||
Mockito.when(this.serverManager.sendRegionOpen(SERVERNAME_A, REGIONINFO, -1)).
|
||||
thenReturn(RegionOpeningState.OPENED);
|
||||
Mockito.when(this.serverManager.sendRegionOpen(SERVERNAME_B, REGIONINFO, -1)).
|
||||
thenReturn(RegionOpeningState.OPENED);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -94,34 +131,204 @@ public class TestAssignmentManager {
|
|||
if (this.watcher != null) this.watcher.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests AssignmentManager balance function. Runs a balance moving a region
|
||||
* from one server to another mocking regionserver responding over zk.
|
||||
* @throws IOException
|
||||
* @throws KeeperException
|
||||
*/
|
||||
@Test
|
||||
public void testBalance()
|
||||
throws IOException, KeeperException {
|
||||
// Create and startup an executor. This is used by AssignmentManager
|
||||
// handling zk callbacks.
|
||||
ExecutorService executor = startupMasterExecutor("testBalanceExecutor");
|
||||
|
||||
// We need a mocked catalog tracker.
|
||||
CatalogTracker ct = Mockito.mock(CatalogTracker.class);
|
||||
// Create an AM.
|
||||
AssignmentManager am =
|
||||
new AssignmentManager(this.server, this.serverManager, ct, executor);
|
||||
try {
|
||||
// Make sure our new AM gets callbacks; once registered, can't unregister.
|
||||
// Thats ok because we make a new zk watcher for each test.
|
||||
this.watcher.registerListenerFirst(am);
|
||||
// Call the balance function but fake the region being online first at
|
||||
// SERVERNAME_A. Create a balance plan.
|
||||
am.regionOnline(REGIONINFO, SERVERNAME_A);
|
||||
// Balance region from A to B.
|
||||
RegionPlan plan = new RegionPlan(REGIONINFO, SERVERNAME_A, SERVERNAME_B);
|
||||
am.balance(plan);
|
||||
|
||||
// Now fake the region closing successfully over on the regionserver; the
|
||||
// regionserver will have set the region in CLOSED state. This will
|
||||
// trigger callback into AM. The below zk close call is from the RS close
|
||||
// region handler duplicated here because its down deep in a private
|
||||
// method hard to expose.
|
||||
int versionid =
|
||||
ZKAssign.transitionNodeClosed(this.watcher, REGIONINFO, SERVERNAME_A, -1);
|
||||
assertNotSame(versionid, -1);
|
||||
// AM is going to notice above CLOSED and queue up a new assign. The
|
||||
// assign will go to open the region in the new location set by the
|
||||
// balancer. The zk node will be OFFLINE waiting for regionserver to
|
||||
// transition it through OPENING, OPENED. Wait till we see the OFFLINE
|
||||
// zk node before we proceed.
|
||||
while (!ZKAssign.verifyRegionState(this.watcher, REGIONINFO, EventType.M_ZK_REGION_OFFLINE)) {
|
||||
Threads.sleep(1);
|
||||
}
|
||||
// Get current versionid else will fail on transition from OFFLINE to OPENING below
|
||||
versionid = ZKAssign.getVersion(this.watcher, REGIONINFO);
|
||||
assertNotSame(-1, versionid);
|
||||
// This uglyness below is what the openregionhandler on RS side does.
|
||||
versionid = ZKAssign.transitionNode(server.getZooKeeper(), REGIONINFO,
|
||||
SERVERNAME_A, EventType.M_ZK_REGION_OFFLINE,
|
||||
EventType.RS_ZK_REGION_OPENING, versionid);
|
||||
assertNotSame(-1, versionid);
|
||||
// Move znode from OPENING to OPENED as RS does on successful open.
|
||||
versionid =
|
||||
ZKAssign.transitionNodeOpened(this.watcher, REGIONINFO, SERVERNAME_B, versionid);
|
||||
assertNotSame(-1, versionid);
|
||||
// Wait on the handler removing the OPENED znode.
|
||||
while(am.isRegionInTransition(REGIONINFO) != null) Threads.sleep(1);
|
||||
} finally {
|
||||
executor.shutdown();
|
||||
am.shutdown();
|
||||
// Clean up all znodes
|
||||
ZKAssign.deleteAllNodes(this.watcher);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a simple server shutdown handler.
|
||||
* @throws KeeperException
|
||||
* @throws IOException
|
||||
*/
|
||||
@Test
|
||||
public void testShutdownHandler() throws KeeperException, IOException {
|
||||
// Create and startup an executor. This is used by AssignmentManager
|
||||
// handling zk callbacks.
|
||||
ExecutorService executor = startupMasterExecutor("testShutdownHandler");
|
||||
|
||||
// We need a mocked catalog tracker.
|
||||
CatalogTracker ct = Mockito.mock(CatalogTracker.class);
|
||||
// Create an AM.
|
||||
AssignmentManager am =
|
||||
new AssignmentManager(this.server, this.serverManager, ct, executor);
|
||||
try {
|
||||
// Make sure our new AM gets callbacks; once registered, can't unregister.
|
||||
// Thats ok because we make a new zk watcher for each test.
|
||||
this.watcher.registerListenerFirst(am);
|
||||
|
||||
// Need to set up a fake scan of meta for the servershutdown handler
|
||||
// Make an RS Interface implementation. Make it so a scanner can go against it.
|
||||
HRegionInterface implementation = Mockito.mock(HRegionInterface.class);
|
||||
// Get a meta row result that has region up on SERVERNAME_A
|
||||
Result r = getMetaTableRowResult(REGIONINFO, SERVERNAME_A);
|
||||
Mockito.when(implementation.openScanner((byte [])Mockito.any(), (Scan)Mockito.any())).
|
||||
thenReturn(System.currentTimeMillis());
|
||||
// Return a good result first and then return null to indicate end of scan
|
||||
Mockito.when(implementation.next(Mockito.anyLong(), Mockito.anyInt())).
|
||||
thenReturn(new Result [] {r}, (Result [])null);
|
||||
|
||||
// Get a connection w/ mocked up common methods.
|
||||
HConnection connection =
|
||||
HConnectionTestingUtility.getMockedConnectionAndDecorate(HTU.getConfiguration(),
|
||||
implementation, SERVERNAME_B, REGIONINFO);
|
||||
|
||||
// Make it so we can get a catalogtracker from servermanager.. .needed
|
||||
// down in guts of server shutdown handler.
|
||||
Mockito.when(ct.getConnection()).thenReturn(connection);
|
||||
Mockito.when(this.server.getCatalogTracker()).thenReturn(ct);
|
||||
|
||||
// Now make a server shutdown handler instance and invoke process.
|
||||
// Have it that SERVERNAME_A died.
|
||||
DeadServer deadServers = new DeadServer();
|
||||
deadServers.add(SERVERNAME_A);
|
||||
// I need a services instance that will return the AM
|
||||
MasterServices services = Mockito.mock(MasterServices.class);
|
||||
Mockito.when(services.getAssignmentManager()).thenReturn(am);
|
||||
ServerShutdownHandler handler = new ServerShutdownHandler(this.server,
|
||||
services, deadServers, SERVERNAME_A, false);
|
||||
handler.process();
|
||||
// The region in r will have been assigned. It'll be up in zk as unassigned.
|
||||
} finally {
|
||||
executor.shutdown();
|
||||
am.shutdown();
|
||||
// Clean up all znodes
|
||||
ZKAssign.deleteAllNodes(this.watcher);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param sn ServerName to use making startcode and server in meta
|
||||
* @param hri Region to serialize into HRegionInfo
|
||||
* @return A mocked up Result that fakes a Get on a row in the
|
||||
* <code>.META.</code> table.
|
||||
* @throws IOException
|
||||
*/
|
||||
private Result getMetaTableRowResult(final HRegionInfo hri,
|
||||
final ServerName sn)
|
||||
throws IOException {
|
||||
// TODO: Move to a utilities class. More than one test case can make use
|
||||
// of this facility.
|
||||
List<KeyValue> kvs = new ArrayList<KeyValue>();
|
||||
kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
|
||||
HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
|
||||
Writables.getBytes(hri)));
|
||||
kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
|
||||
HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
|
||||
Bytes.toBytes(sn.getHostAndPort())));
|
||||
kvs.add(new KeyValue(HConstants.EMPTY_BYTE_ARRAY,
|
||||
HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
|
||||
Bytes.toBytes(sn.getStartcode())));
|
||||
return new Result(kvs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create and startup executor pools. Start same set as master does (just
|
||||
* run a few less).
|
||||
* @param name Name to give our executor
|
||||
* @return Created executor (be sure to call shutdown when done).
|
||||
*/
|
||||
private ExecutorService startupMasterExecutor(final String name) {
|
||||
// TODO: Move up into HBaseTestingUtility? Generally useful.
|
||||
ExecutorService executor = new ExecutorService(name);
|
||||
executor.startExecutorService(ExecutorType.MASTER_OPEN_REGION, 3);
|
||||
executor.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, 3);
|
||||
executor.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, 3);
|
||||
executor.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, 3);
|
||||
return executor;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnassignWithSplitAtSameTime() throws KeeperException, IOException {
|
||||
// Region to use in test.
|
||||
final HRegionInfo hri = HRegionInfo.FIRST_META_REGIONINFO;
|
||||
// First amend the servermanager mock so that when we do send close of the
|
||||
// first meta region on RANDOM_SERVERNAME, it will return true rather than
|
||||
// first meta region on SERVERNAME_A, it will return true rather than
|
||||
// default null.
|
||||
Mockito.when(this.serverManager.sendRegionClose(RANDOM_SERVERNAME, hri, -1))
|
||||
.thenReturn(true);
|
||||
Mockito.when(this.serverManager.sendRegionClose(SERVERNAME_A, hri, -1)).thenReturn(true);
|
||||
// Need a mocked catalog tracker.
|
||||
CatalogTracker ct = Mockito.mock(CatalogTracker.class);
|
||||
// Create an AM.
|
||||
AssignmentManager am =
|
||||
new AssignmentManager(this.server, this.serverManager, this.ct, this.executor);
|
||||
new AssignmentManager(this.server, this.serverManager, ct, null);
|
||||
try {
|
||||
// First make sure my mock up basically works. Unassign a region.
|
||||
unassign(am, RANDOM_SERVERNAME, hri);
|
||||
unassign(am, SERVERNAME_A, hri);
|
||||
// This delete will fail if the previous unassign did wrong thing.
|
||||
ZKAssign.deleteClosingNode(this.watcher, hri);
|
||||
// Now put a SPLITTING region in the way. I don't have to assert it
|
||||
// go put in place. This method puts it in place then asserts it still
|
||||
// owns it by moving state from SPLITTING to SPLITTING.
|
||||
int version = createNodeSplitting(this.watcher, hri, RANDOM_SERVERNAME);
|
||||
int version = createNodeSplitting(this.watcher, hri, SERVERNAME_A);
|
||||
// Now, retry the unassign with the SPLTTING in place. It should just
|
||||
// complete without fail; a sort of 'silent' recognition that the
|
||||
// region to unassign has been split and no longer exists: TOOD: what if
|
||||
// the split fails and the parent region comes back to life?
|
||||
unassign(am, RANDOM_SERVERNAME, hri);
|
||||
unassign(am, SERVERNAME_A, hri);
|
||||
// This transition should fail if the znode has been messed with.
|
||||
ZKAssign.transitionNode(this.watcher, hri, RANDOM_SERVERNAME,
|
||||
ZKAssign.transitionNode(this.watcher, hri, SERVERNAME_A,
|
||||
EventType.RS_ZK_REGION_SPLITTING, EventType.RS_ZK_REGION_SPLITTING, version);
|
||||
assertTrue(am.isRegionInTransition(hri) == null);
|
||||
} finally {
|
||||
|
|
|
@ -72,7 +72,11 @@ public class TestCatalogJanitor {
|
|||
// Mock an HConnection and a HRegionInterface implementation. Have the
|
||||
// HConnection return the HRI. Have the HRI return a few mocked up responses
|
||||
// to make our test work.
|
||||
this.connection = HConnectionTestingUtility.getMockedConnection(this.c);
|
||||
this.connection =
|
||||
HConnectionTestingUtility.getMockedConnectionAndDecorate(this.c,
|
||||
Mockito.mock(HRegionInterface.class),
|
||||
new ServerName("example.org,12345,6789"),
|
||||
HRegionInfo.FIRST_META_REGIONINFO);
|
||||
// Set hbase.rootdir into test dir.
|
||||
FileSystem fs = FileSystem.get(this.c);
|
||||
Path rootdir = fs.makeQualified(new Path(this.c.get(HConstants.HBASE_DIR)));
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.hadoop.fs.Path;
|
|||
import org.apache.hadoop.hbase.*;
|
||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
|
||||
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HConnection;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
|
@ -135,7 +134,7 @@ public class TestHRegionServerBulkLoad {
|
|||
// bulk load HFiles
|
||||
HConnection conn = UTIL.getHBaseAdmin().getConnection();
|
||||
byte[] tbl = Bytes.toBytes(tableName);
|
||||
conn.getRegionServerWithRetries(new ServerCallable<Void>(conn, tbl, Bytes
|
||||
new ServerCallable<Void>(conn, tbl, Bytes
|
||||
.toBytes("aaa")) {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
|
@ -145,12 +144,12 @@ public class TestHRegionServerBulkLoad {
|
|||
server.bulkLoadHFiles(famPaths, regionName);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}.withRetries();
|
||||
|
||||
// Periodically do compaction to reduce the number of open file handles.
|
||||
if (numBulkLoads.get() % 10 == 0) {
|
||||
// 10 * 50 = 500 open file handles!
|
||||
conn.getRegionServerWithRetries(new ServerCallable<Void>(conn, tbl,
|
||||
new ServerCallable<Void>(conn, tbl,
|
||||
Bytes.toBytes("aaa")) {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
|
@ -160,7 +159,7 @@ public class TestHRegionServerBulkLoad {
|
|||
numCompactions.incrementAndGet();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}.withRetries();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue