HBASE-21579 Use AsyncClusterConnection for HBaseInterClusterReplicationEndpoint
This commit is contained in:
parent
5d872d3422
commit
04f737d9bd
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -94,9 +95,9 @@ public class AsyncRegionServerAdmin {
|
|||
void call(AdminService.Interface stub, HBaseRpcController controller, RpcCallback<RESP> done);
|
||||
}
|
||||
|
||||
private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
|
||||
private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall, CellScanner cellScanner) {
|
||||
CompletableFuture<RESP> future = new CompletableFuture<>();
|
||||
HBaseRpcController controller = conn.rpcControllerFactory.newController();
|
||||
HBaseRpcController controller = conn.rpcControllerFactory.newController(cellScanner);
|
||||
try {
|
||||
rpcCall.call(conn.getAdminStub(server), controller, new RpcCallback<RESP>() {
|
||||
|
||||
|
@ -115,6 +116,10 @@ public class AsyncRegionServerAdmin {
|
|||
return future;
|
||||
}
|
||||
|
||||
private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
|
||||
return call(rpcCall, null);
|
||||
}
|
||||
|
||||
public CompletableFuture<GetRegionInfoResponse> getRegionInfo(GetRegionInfoRequest request) {
|
||||
return call((stub, controller, done) -> stub.getRegionInfo(controller, request, done));
|
||||
}
|
||||
|
@ -154,8 +159,9 @@ public class AsyncRegionServerAdmin {
|
|||
}
|
||||
|
||||
public CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
|
||||
ReplicateWALEntryRequest request) {
|
||||
return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done));
|
||||
ReplicateWALEntryRequest request, CellScanner cellScanner) {
|
||||
return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done),
|
||||
cellScanner);
|
||||
}
|
||||
|
||||
public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request) {
|
||||
|
|
|
@ -20,51 +20,54 @@ package org.apache.hadoop.hbase.protobuf;
|
|||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.PrivateCellUtil;
|
||||
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
|
||||
import org.apache.hadoop.hbase.io.SizedCellScanner;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||
import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
|
||||
|
||||
@InterfaceAudience.Private
|
||||
public class ReplicationProtbufUtil {
|
||||
|
||||
/**
|
||||
* A helper to replicate a list of WAL entries using admin protocol.
|
||||
* @param admin Admin service
|
||||
* A helper to replicate a list of WAL entries using region server admin
|
||||
* @param admin the region server admin
|
||||
* @param entries Array of WAL entries to be replicated
|
||||
* @param replicationClusterId Id which will uniquely identify source cluster FS client
|
||||
* configurations in the replication configuration directory
|
||||
* @param sourceBaseNamespaceDir Path to source cluster base namespace directory
|
||||
* @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
|
||||
* @throws java.io.IOException
|
||||
*/
|
||||
public static void replicateWALEntry(final AdminService.BlockingInterface admin,
|
||||
final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir,
|
||||
Path sourceHFileArchiveDir) throws IOException {
|
||||
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
|
||||
buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir,
|
||||
sourceHFileArchiveDir);
|
||||
HBaseRpcController controller = new HBaseRpcControllerImpl(p.getSecond());
|
||||
public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries,
|
||||
String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir)
|
||||
throws IOException {
|
||||
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(
|
||||
entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
|
||||
try {
|
||||
admin.replicateWALEntry(controller, p.getFirst());
|
||||
} catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
|
||||
throw ProtobufUtil.getServiceException(e);
|
||||
admin.replicateWALEntry(p.getFirst(), p.getSecond()).get();
|
||||
} catch (InterruptedException e) {
|
||||
throw (IOException) new InterruptedIOException().initCause(e);
|
||||
} catch (ExecutionException e) {
|
||||
Throwable cause = e.getCause();
|
||||
Throwables.propagateIfPossible(cause, IOException.class);
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -39,7 +39,6 @@ import java.util.regex.Matcher;
|
|||
import java.util.regex.Pattern;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -48,13 +47,16 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
|
||||
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
|
@ -65,8 +67,6 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
|
||||
|
||||
/**
|
||||
* A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
|
||||
* implementation for replicating to another HBase cluster.
|
||||
|
@ -85,8 +85,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
|
||||
private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
|
||||
|
||||
private ClusterConnection conn;
|
||||
private Configuration localConf;
|
||||
private AsyncClusterConnection conn;
|
||||
private Configuration conf;
|
||||
// How long should we sleep for each retry
|
||||
private long sleepForRetries;
|
||||
|
@ -117,7 +116,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
public void init(Context context) throws IOException {
|
||||
super.init(context);
|
||||
this.conf = HBaseConfiguration.create(ctx.getConfiguration());
|
||||
this.localConf = HBaseConfiguration.create(ctx.getLocalConfiguration());
|
||||
decorateConf();
|
||||
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
|
||||
this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier",
|
||||
|
@ -132,12 +130,13 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
// TODO: This connection is replication specific or we should make it particular to
|
||||
// replication and make replication specific settings such as compression or codec to use
|
||||
// passing Cells.
|
||||
this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
|
||||
this.conn =
|
||||
ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent());
|
||||
this.sleepForRetries =
|
||||
this.conf.getLong("replication.source.sleepforretries", 1000);
|
||||
this.metrics = context.getMetrics();
|
||||
// ReplicationQueueInfo parses the peerId out of the znode for us
|
||||
this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
|
||||
this.replicationSinkMgr = new ReplicationSinkManager(conn, this, this.conf);
|
||||
// per sink thread pool
|
||||
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
|
||||
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
|
||||
|
@ -284,9 +283,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
}
|
||||
|
||||
private void reconnectToPeerCluster() {
|
||||
ClusterConnection connection = null;
|
||||
AsyncClusterConnection connection = null;
|
||||
try {
|
||||
connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
|
||||
connection =
|
||||
ClusterConnectionFactory.createAsyncClusterConnection(conf, null, User.getCurrent());
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Failed to create connection for peer cluster", ioe);
|
||||
}
|
||||
|
@ -366,7 +366,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
}
|
||||
continue;
|
||||
}
|
||||
if (this.conn == null || this.conn.isClosed()) {
|
||||
if (this.conn == null) {
|
||||
reconnectToPeerCluster();
|
||||
}
|
||||
try {
|
||||
|
@ -470,10 +470,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
entriesHashCode, entries.size(), size, replicationClusterId);
|
||||
}
|
||||
sinkPeer = replicationSinkMgr.getReplicationSink();
|
||||
BlockingInterface rrs = sinkPeer.getRegionServer();
|
||||
AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
|
||||
try {
|
||||
ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
|
||||
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
|
||||
ReplicationProtbufUtil.replicateWALEntry(rsAdmin,
|
||||
entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir,
|
||||
hfileArchiveDir);
|
||||
LOG.trace("Completed replicating batch {}", entriesHashCode);
|
||||
} catch (IOException e) {
|
||||
LOG.trace("Failed replicating batch {}", entriesHashCode, e);
|
||||
|
|
|
@ -21,11 +21,11 @@ import java.io.IOException;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
|
||||
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -35,8 +35,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
|||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
||||
|
||||
/**
|
||||
* Maintains a collection of peers to replicate to, and randomly selects a
|
||||
* single peer to replicate to per set of data to replicate. Also handles
|
||||
|
@ -61,9 +59,7 @@ public class ReplicationSinkManager {
|
|||
static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.5f;
|
||||
|
||||
|
||||
private final Connection conn;
|
||||
|
||||
private final String peerClusterId;
|
||||
private final AsyncClusterConnection conn;
|
||||
|
||||
private final HBaseReplicationEndpoint endpoint;
|
||||
|
||||
|
@ -77,8 +73,6 @@ public class ReplicationSinkManager {
|
|||
// replication sinks is refreshed
|
||||
private final int badSinkThreshold;
|
||||
|
||||
private final Random random;
|
||||
|
||||
// A timestamp of the last time the list of replication peers changed
|
||||
private long lastUpdateToPeers;
|
||||
|
||||
|
@ -88,26 +82,22 @@ public class ReplicationSinkManager {
|
|||
/**
|
||||
* Instantiate for a single replication peer cluster.
|
||||
* @param conn connection to the peer cluster
|
||||
* @param peerClusterId identifier of the peer cluster
|
||||
* @param endpoint replication endpoint for inter cluster replication
|
||||
* @param conf HBase configuration, used for determining replication source ratio and bad peer
|
||||
* threshold
|
||||
*/
|
||||
public ReplicationSinkManager(ClusterConnection conn, String peerClusterId,
|
||||
HBaseReplicationEndpoint endpoint, Configuration conf) {
|
||||
public ReplicationSinkManager(AsyncClusterConnection conn, HBaseReplicationEndpoint endpoint,
|
||||
Configuration conf) {
|
||||
this.conn = conn;
|
||||
this.peerClusterId = peerClusterId;
|
||||
this.endpoint = endpoint;
|
||||
this.badReportCounts = Maps.newHashMap();
|
||||
this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
|
||||
this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
|
||||
DEFAULT_BAD_SINK_THRESHOLD);
|
||||
this.random = new Random();
|
||||
this.badSinkThreshold =
|
||||
conf.getInt("replication.bad.sink.threshold", DEFAULT_BAD_SINK_THRESHOLD);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a randomly-chosen replication sink to replicate to.
|
||||
*
|
||||
* @return a replication sink to replicate to
|
||||
*/
|
||||
public synchronized SinkPeer getReplicationSink() throws IOException {
|
||||
|
@ -119,8 +109,8 @@ public class ReplicationSinkManager {
|
|||
if (sinks.isEmpty()) {
|
||||
throw new IOException("No replication sinks are available");
|
||||
}
|
||||
ServerName serverName = sinks.get(random.nextInt(sinks.size()));
|
||||
return new SinkPeer(serverName, ((ClusterConnection) conn).getAdmin(serverName));
|
||||
ServerName serverName = sinks.get(ThreadLocalRandom.current().nextInt(sinks.size()));
|
||||
return new SinkPeer(serverName, conn.getRegionServerAdmin(serverName));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -160,7 +150,7 @@ public class ReplicationSinkManager {
|
|||
*/
|
||||
public synchronized void chooseSinks() {
|
||||
List<ServerName> slaveAddresses = endpoint.getRegionServers();
|
||||
Collections.shuffle(slaveAddresses, random);
|
||||
Collections.shuffle(slaveAddresses, ThreadLocalRandom.current());
|
||||
int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
|
||||
sinks = slaveAddresses.subList(0, numSinks);
|
||||
lastUpdateToPeers = System.currentTimeMillis();
|
||||
|
@ -182,9 +172,9 @@ public class ReplicationSinkManager {
|
|||
*/
|
||||
public static class SinkPeer {
|
||||
private ServerName serverName;
|
||||
private AdminService.BlockingInterface regionServer;
|
||||
private AsyncRegionServerAdmin regionServer;
|
||||
|
||||
public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) {
|
||||
public SinkPeer(ServerName serverName, AsyncRegionServerAdmin regionServer) {
|
||||
this.serverName = serverName;
|
||||
this.regionServer = regionServer;
|
||||
}
|
||||
|
@ -193,10 +183,8 @@ public class ReplicationSinkManager {
|
|||
return serverName;
|
||||
}
|
||||
|
||||
public AdminService.BlockingInterface getRegionServer() {
|
||||
public AsyncRegionServerAdmin getRegionServer() {
|
||||
return regionServer;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.StartMiniClusterOption;
|
|||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
|
@ -250,19 +250,19 @@ public class SyncReplicationTestBase {
|
|||
protected final void verifyReplicationRequestRejection(HBaseTestingUtility utility,
|
||||
boolean expectedRejection) throws Exception {
|
||||
HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME);
|
||||
ClusterConnection connection = regionServer.getClusterConnection();
|
||||
AsyncClusterConnection connection = regionServer.getAsyncClusterConnection();
|
||||
Entry[] entries = new Entry[10];
|
||||
for (int i = 0; i < entries.length; i++) {
|
||||
entries[i] =
|
||||
new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
|
||||
}
|
||||
if (!expectedRejection) {
|
||||
ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
|
||||
entries, null, null, null);
|
||||
ReplicationProtbufUtil.replicateWALEntry(
|
||||
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null);
|
||||
} else {
|
||||
try {
|
||||
ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
|
||||
entries, null, null, null);
|
||||
ReplicationProtbufUtil.replicateWALEntry(
|
||||
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null);
|
||||
fail("Should throw IOException when sync-replication state is in A or DA");
|
||||
} catch (DoNotRetryIOException e) {
|
||||
assertTrue(e.getMessage().contains("Reject to apply to sink cluster"));
|
||||
|
|
|
@ -25,7 +25,8 @@ import java.util.List;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
|
||||
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
|
@ -37,8 +38,6 @@ import org.junit.experimental.categories.Category;
|
|||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
|
||||
|
||||
@Category({ReplicationTests.class, SmallTests.class})
|
||||
public class TestReplicationSinkManager {
|
||||
|
||||
|
@ -46,16 +45,14 @@ public class TestReplicationSinkManager {
|
|||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestReplicationSinkManager.class);
|
||||
|
||||
private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
|
||||
|
||||
private HBaseReplicationEndpoint replicationEndpoint;
|
||||
private ReplicationSinkManager sinkManager;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
replicationEndpoint = mock(HBaseReplicationEndpoint.class);
|
||||
sinkManager = new ReplicationSinkManager(mock(ClusterConnection.class),
|
||||
PEER_CLUSTER_ID, replicationEndpoint, new Configuration());
|
||||
sinkManager = new ReplicationSinkManager(mock(AsyncClusterConnection.class),
|
||||
replicationEndpoint, new Configuration());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -100,7 +97,7 @@ public class TestReplicationSinkManager {
|
|||
// Sanity check
|
||||
assertEquals(1, sinkManager.getNumSinks());
|
||||
|
||||
SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
|
||||
SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
|
||||
|
||||
sinkManager.reportBadSink(sinkPeer);
|
||||
|
||||
|
@ -131,7 +128,7 @@ public class TestReplicationSinkManager {
|
|||
|
||||
ServerName serverName = sinkManager.getSinksForTesting().get(0);
|
||||
|
||||
SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
|
||||
SinkPeer sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class));
|
||||
|
||||
sinkManager.reportSinkSuccess(sinkPeer); // has no effect, counter does not go negative
|
||||
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
|
||||
|
@ -147,7 +144,7 @@ public class TestReplicationSinkManager {
|
|||
//
|
||||
serverName = sinkManager.getSinksForTesting().get(0);
|
||||
|
||||
sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
|
||||
sinkPeer = new SinkPeer(serverName, mock(AsyncRegionServerAdmin.class));
|
||||
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD-1; i++) {
|
||||
sinkManager.reportBadSink(sinkPeer);
|
||||
}
|
||||
|
@ -188,8 +185,8 @@ public class TestReplicationSinkManager {
|
|||
ServerName serverNameA = sinkList.get(0);
|
||||
ServerName serverNameB = sinkList.get(1);
|
||||
|
||||
SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
|
||||
SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class));
|
||||
SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AsyncRegionServerAdmin.class));
|
||||
SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AsyncRegionServerAdmin.class));
|
||||
|
||||
for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
|
||||
sinkManager.reportBadSink(sinkPeerA);
|
||||
|
|
Loading…
Reference in New Issue