HBASE-9049: Generalize ServerCallable creation to support custom callables

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1509015 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jyates 2013-07-31 20:16:43 +00:00
parent 4c05a177b0
commit ab0abfa711
13 changed files with 179 additions and 71 deletions

View File

@ -101,6 +101,7 @@ class AsyncProcess<CResult> {
protected int numTries;
protected final boolean useServerTrackerForRetries;
protected int serverTrackerTimeout;
protected RpcRetryingCallerFactory rpcCallerFactory;
/**
@ -167,7 +168,8 @@ class AsyncProcess<CResult> {
}
public AsyncProcess(HConnection hc, byte[] tableName, ExecutorService pool,
AsyncProcessCallback<CResult> callback, Configuration conf) {
AsyncProcessCallback<CResult> callback, Configuration conf,
RpcRetryingCallerFactory rpcCaller) {
this.hConnection = hc;
this.tableName = tableName;
this.pool = pool;
@ -201,6 +203,8 @@ class AsyncProcess<CResult> {
serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
}
}
this.rpcCallerFactory = rpcCaller;
}
/**
@ -452,7 +456,7 @@ class AsyncProcess<CResult> {
*/
protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
// callable is unused.
return new RpcRetryingCaller<MultiResponse>();
return rpcCallerFactory.<MultiResponse> newCaller();
}
/**

View File

@ -66,7 +66,7 @@ public class ClientScanner extends AbstractClientScanner {
private final byte[] tableName;
private final int scannerTimeout;
private boolean scanMetricsPublished = false;
private ScannerCaller caller = new ScannerCaller();
private RpcRetryingCaller<Result []> caller;
/**
* Create a new ClientScanner for the specified table. An HConnection will be
@ -83,6 +83,7 @@ public class ClientScanner extends AbstractClientScanner {
this(conf, scan, tableName, HConnectionManager.getConnection(conf));
}
/**
* Create a new ClientScanner for the specified table
* Note that the passed {@link Scan}'s start row maybe changed changed.
@ -93,8 +94,22 @@ public class ClientScanner extends AbstractClientScanner {
* @param connection Connection identifying the cluster
* @throws IOException
*/
public ClientScanner(final Configuration conf, final Scan scan,
final byte[] tableName, HConnection connection) throws IOException {
public ClientScanner(final Configuration conf, final Scan scan, final byte[] tableName,
HConnection connection) throws IOException {
this(conf, scan, tableName, connection, new RpcRetryingCallerFactory(conf));
}
/**
* Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start
* row maybe changed changed.
* @param conf The {@link Configuration} to use.
* @param scan {@link Scan} to use in this scanner
* @param tableName The table that we wish to scan
* @param connection Connection identifying the cluster
* @throws IOException
*/
public ClientScanner(final Configuration conf, final Scan scan, final byte[] tableName,
HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Scan table=" + Bytes.toString(tableName)
+ ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
@ -131,6 +146,8 @@ public class ClientScanner extends AbstractClientScanner {
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
}
this.caller = rpcFactory.<Result[]> newCaller();
// initialize the scanner
nextScanner(false);
}
@ -180,7 +197,7 @@ public class ClientScanner extends AbstractClientScanner {
// Close the previous scanner if it's open
if (this.callable != null) {
this.callable.setClose();
this.caller.callWithRetries(callable, getConnection().getConfiguration());
this.caller.callWithRetries(callable);
this.callable = null;
}
@ -217,7 +234,7 @@ public class ClientScanner extends AbstractClientScanner {
callable = getScannerCallable(localStartKey);
// Open a scanner on the region server starting at the
// beginning of the region
this.caller.callWithRetries(callable, getConnection().getConfiguration());
this.caller.callWithRetries(callable);
this.currentRegion = callable.getHRegionInfo();
if (this.scanMetrics != null) {
this.scanMetrics.countOfRegions.incrementAndGet();
@ -277,10 +294,10 @@ public class ClientScanner extends AbstractClientScanner {
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
values = this.caller.callWithRetries(callable, getConnection().getConfiguration());
values = this.caller.callWithRetries(callable);
if (skipFirst && values != null && values.length == 1) {
skipFirst = false; // Already skipped, unset it before scanning again
values = this.caller.callWithRetries(callable, getConnection().getConfiguration());
values = this.caller.callWithRetries(callable);
}
retryAfterOutOfOrderException = true;
} catch (DoNotRetryIOException e) {
@ -403,7 +420,7 @@ public class ClientScanner extends AbstractClientScanner {
if (callable != null) {
callable.setClose();
try {
this.caller.callWithRetries(callable, getConnection().getConfiguration());
this.caller.callWithRetries(callable);
} catch (IOException e) {
// We used to catch this error, interpret, and rethrow. However, we
// have since decided that it's not nice for a scanner's close to

View File

@ -17,5 +17,38 @@
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
class ScannerCaller extends RpcRetryingCaller<Result []> {}
public class DelegatingRetryingCallable<T, D extends RetryingCallable<T>> implements
RetryingCallable<T> {
protected final D delegate;
public DelegatingRetryingCallable(D delegate) {
this.delegate = delegate;
}
@Override
public T call() throws Exception {
return delegate.call();
}
@Override
public void prepare(boolean reload) throws IOException {
delegate.prepare(reload);
}
@Override
public void throwable(Throwable t, boolean retrying) {
delegate.throwable(t, retrying);
}
@Override
public String getExceptionMessageAdditionalDetail() {
return delegate.getExceptionMessageAdditionalDetail();
}
@Override
public long sleep(long pause, int tries) {
return delegate.sleep(pause, tries);
}
}

View File

@ -154,6 +154,8 @@ public class HBaseAdmin implements Abortable, Closeable {
private boolean aborted;
private boolean cleanupConnectionOnClose = false; // close the connection in close()
private RpcRetryingCallerFactory rpcCallerFactory;
/**
* Constructor.
* See {@link #HBaseAdmin(HConnection connection)}
@ -186,6 +188,7 @@ public class HBaseAdmin implements Abortable, Closeable {
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.retryLongerMultiplier = this.conf.getInt(
"hbase.client.retries.longer.multiplier", 10);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
}
/**
@ -2653,10 +2656,9 @@ public class HBaseAdmin implements Abortable, Closeable {
*/
abstract static class MasterAdminCallable<V> extends MasterCallable<V> {
protected MasterAdminKeepAliveConnection masterAdmin;
private final HConnection connection;
public MasterAdminCallable(final HConnection connection) {
this.connection = connection;
super(connection);
}
@Override
@ -2675,10 +2677,9 @@ public class HBaseAdmin implements Abortable, Closeable {
*/
abstract static class MasterMonitorCallable<V> extends MasterCallable<V> {
protected MasterMonitorKeepAliveConnection masterMonitor;
private final HConnection connection;
public MasterMonitorCallable(final HConnection connection) {
this.connection = connection;
super(connection);
}
@Override
@ -2698,6 +2699,12 @@ public class HBaseAdmin implements Abortable, Closeable {
* @param <V>
*/
abstract static class MasterCallable<V> implements RetryingCallable<V>, Closeable {
protected HConnection connection;
public MasterCallable(final HConnection connection) {
this.connection = connection;
}
@Override
public void throwable(Throwable t, boolean retrying) {
}
@ -2714,9 +2721,9 @@ public class HBaseAdmin implements Abortable, Closeable {
}
private <V> V executeCallable(MasterCallable<V> callable) throws IOException {
RpcRetryingCaller<V> caller = new RpcRetryingCaller<V>();
RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller();
try {
return caller.callWithRetries(callable, getConfiguration());
return caller.callWithRetries(callable);
} finally {
callable.close();
}

View File

@ -69,7 +69,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionRequest;
@ -2100,7 +2099,8 @@ public class HConnectionManager {
// For tests.
protected <R> AsyncProcess createAsyncProcess(byte[] tableName, ExecutorService pool,
AsyncProcess.AsyncProcessCallback<R> callback, Configuration conf) {
return new AsyncProcess<R>(this, tableName, pool, callback, conf);
return new AsyncProcess<R>(this, tableName, pool, callback, conf,
RpcRetryingCallerFactory.instantiate(conf));
}

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import java.io.Closeable;
@ -135,6 +136,7 @@ public class HTable implements HTableInterface {
/** The Async process for puts with autoflush set to false or multiputs */
protected AsyncProcess<Object> ap;
private RpcRetryingCallerFactory rpcCallerFactory;
/**
* Creates an object to access a HBase table.
@ -267,7 +269,9 @@ public class HTable implements HTableInterface {
HConstants.HBASE_CLIENT_SCANNER_CACHING,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
ap = new AsyncProcess<Object>(connection, tableName, pool, null, configuration);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
ap = new AsyncProcess<Object>(connection, tableName, pool, null,
configuration, rpcCallerFactory);
this.maxKeyValueSize = this.configuration.getInt(
"hbase.client.keyvalue.maxsize", -1);
@ -596,8 +600,7 @@ public class HTable implements HTableInterface {
getLocation().getRegionInfo().getRegionName(), row, family);
}
};
return new RpcRetryingCaller<Result>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
@ -643,8 +646,7 @@ public class HTable implements HTableInterface {
return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get);
}
};
return new RpcRetryingCaller<Result>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
@ -719,8 +721,7 @@ public class HTable implements HTableInterface {
}
}
};
new RpcRetryingCaller<Boolean>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
@ -856,8 +857,7 @@ public class HTable implements HTableInterface {
return null;
}
};
new RpcRetryingCaller<Void>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
rpcCallerFactory.<Void> newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
@ -884,8 +884,7 @@ public class HTable implements HTableInterface {
}
}
};
return new RpcRetryingCaller<Result>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
@ -911,8 +910,7 @@ public class HTable implements HTableInterface {
}
}
};
return new RpcRetryingCaller<Result>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
@ -962,8 +960,7 @@ public class HTable implements HTableInterface {
}
}
};
return new RpcRetryingCaller<Long>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
@ -988,8 +985,7 @@ public class HTable implements HTableInterface {
}
}
};
return new RpcRetryingCaller<Boolean>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
}
@ -1015,8 +1011,7 @@ public class HTable implements HTableInterface {
}
}
};
return new RpcRetryingCaller<Boolean>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
@ -1037,8 +1032,7 @@ public class HTable implements HTableInterface {
}
}
};
return new RpcRetryingCaller<Boolean>().
callWithRetries(callable, getConfiguration(), this.operationTimeout);
return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout);
}
/**
@ -1144,8 +1138,8 @@ public class HTable implements HTableInterface {
}
}
};
return new RpcRetryingCaller<List<Boolean>>().
callWithRetries(callable, getConfiguration(), operationTimeout);
return rpcCallerFactory.<List<Boolean>> newCaller().callWithRetries(callable,
operationTimeout);
}
};
futures.put(getsByRegionEntry.getKey(), pool.submit(callable));

View File

@ -62,8 +62,16 @@ public class RpcRetryingCaller<T> {
private long startTime, endTime;
private final static int MIN_RPC_TIMEOUT = 2000;
public RpcRetryingCaller() {
private final long pause;
private final int retries;
public RpcRetryingCaller(Configuration conf) {
super();
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.retries =
conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
}
private void beforeCall() {
@ -83,32 +91,20 @@ public class RpcRetryingCaller<T> {
this.endTime = EnvironmentEdgeManager.currentTimeMillis();
}
public synchronized T callWithRetries(RetryingCallable<T> callable, final Configuration conf)
throws IOException, RuntimeException {
return callWithRetries(callable, conf, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
}
public synchronized T callWithRetries(RetryingCallable<T> callable, final Configuration conf,
final int callTimeout)
throws IOException, RuntimeException {
final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
final int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
return callWithRetries(callable, callTimeout, pause, numRetries);
public synchronized T callWithRetries(RetryingCallable<T> callable) throws IOException,
RuntimeException {
return callWithRetries(callable, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
}
/**
* Retries if invocation fails.
* @param conf
* @param callTimeout Timeout for this call
* @param callable The {@link RetryingCallable} to run.
* @return an object of type T
* @throws IOException if a remote or network exception occurs
* @throws RuntimeException other unspecified error
*/
synchronized T callWithRetries(RetryingCallable<T> callable, int callTimeout, final long pause,
final int retries)
public synchronized T callWithRetries(RetryingCallable<T> callable, int callTimeout)
throws IOException, RuntimeException {
this.callTimeout = callTimeout;
List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
* Factory to create an {@link RpcRetryingCaller}
*/
public class RpcRetryingCallerFactory {
/** Configuration key for a custom {@link RpcRetryingCaller} */
public static final String CUSTOM_CALLER_CONF_KEY = "hbase.rpc.callerfactory.class";
protected final Configuration conf;
public RpcRetryingCallerFactory(Configuration conf) {
this.conf = conf;
}
public <T> RpcRetryingCaller<T> newCaller() {
return new RpcRetryingCaller<T>(conf);
}
public static RpcRetryingCallerFactory instantiate(Configuration configuration) {
String rpcCallerFactoryClazz =
configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY,
RpcRetryingCallerFactory.class.getName());
return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
new Class[] { Configuration.class }, new Object[] { configuration });
}
}

View File

@ -25,7 +25,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
@ -52,10 +52,13 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
private final byte[] row;
private byte[] lastRegion;
private RpcRetryingCallerFactory rpcFactory;
public RegionCoprocessorRpcChannel(HConnection conn, byte[] table, byte[] row) {
this.connection = conn;
this.table = table;
this.row = row;
this.rpcFactory = RpcRetryingCallerFactory.instantiate(conn.getConfiguration());
}
@Override
@ -83,8 +86,8 @@ public class RegionCoprocessorRpcChannel extends CoprocessorRpcChannel{
return ProtobufUtil.execService(getStub(), call, regionName);
}
};
CoprocessorServiceResponse result = new RpcRetryingCaller<CoprocessorServiceResponse>().
callWithRetries(callable, this.connection.getConfiguration());
CoprocessorServiceResponse result = rpcFactory.<CoprocessorServiceResponse> newCaller()
.callWithRetries(callable);
Message response = null;
if (result.getValue().hasValue()) {
response = responsePrototype.newBuilderForType()

View File

@ -51,7 +51,7 @@ public class TestAsyncProcess {
private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
private static final byte[] FAILS = "FAILS".getBytes();
private Configuration conf = new Configuration();
private static final Configuration conf = new Configuration();
private static ServerName sn = new ServerName("localhost:10,1254");
@ -67,13 +67,13 @@ public class TestAsyncProcess {
public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback, Configuration conf) {
super(hc, DUMMY_TABLE, new ThreadPoolExecutor(1, 10, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("test-TestAsyncProcess")),
callback, conf);
callback, conf, new RpcRetryingCallerFactory(conf));
}
@Override
protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) {
final MultiResponse mr = createMultiResponse(callable.getLocation(), callable.getMulti());
return new RpcRetryingCaller<MultiResponse>() {
return new RpcRetryingCaller<MultiResponse>(conf) {
@Override
public MultiResponse callWithoutRetries( RetryingCallable<MultiResponse> callable)
throws IOException, RuntimeException {

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
import org.apache.hadoop.hbase.io.Reference;
@ -588,7 +589,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try {
List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
boolean success = new RpcRetryingCaller<Boolean>().callWithRetries(svrCallable, getConf());
Configuration conf = getConf();
boolean success = RpcRetryingCallerFactory.instantiate(conf).<Boolean> newCaller()
.callWithRetries(svrCallable);
if (!success) {
LOG.warn("Attempt to bulk load region containing "
+ Bytes.toStringBinary(first) + " into table "

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
@ -162,9 +163,10 @@ public class WALEditsReplaySink {
private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
final List<Action<Row>> actions) throws IOException {
try {
RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf);
ReplayServerCallable<MultiResponse> callable = new ReplayServerCallable<MultiResponse>(
this.conn, this.tableName, regionLoc, regionInfo, actions);
new RpcRetryingCaller<MultiResponse>().callWithRetries(callable, conf, this.replayTimeout);
factory.<MultiResponse> newCaller().callWithRetries(callable, this.replayTimeout);
} catch (IOException ie) {
if (skipErrors) {
LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.io.compress.Compression;
@ -157,8 +158,9 @@ public class TestHRegionServerBulkLoad {
return null;
}
};
RpcRetryingCaller<Void> caller = new RpcRetryingCaller<Void>();
caller.callWithRetries(callable, UTIL.getConfiguration());
RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
caller.callWithRetries(callable);
// Periodically do compaction to reduce the number of open file handles.
if (numBulkLoads.get() % 10 == 0) {
@ -178,7 +180,7 @@ public class TestHRegionServerBulkLoad {
return null;
}
};
caller.callWithRetries(callable, UTIL.getConfiguration());
caller.callWithRetries(callable);
}
}
}