HBASE-21671 Rewrite RegionReplicaReplicationEndpoint to use AsyncClusterConnection

This commit is contained in:
Duo Zhang 2019-01-11 16:22:24 +08:00 committed by zhangduo
parent 7593e86c5f
commit d005d6f30a
13 changed files with 640 additions and 669 deletions

View File

@ -60,7 +60,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
@ -69,7 +68,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterServ
* The implementation of AsyncConnection.
*/
@InterfaceAudience.Private
class AsyncConnectionImpl implements AsyncClusterConnection {
class AsyncConnectionImpl implements AsyncConnection {
private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
@ -89,7 +88,7 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
private final int rpcTimeout;
private final RpcClient rpcClient;
protected final RpcClient rpcClient;
final RpcControllerFactory rpcControllerFactory;
@ -218,16 +217,10 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
}
// ditto
@Override
public NonceGenerator getNonceGenerator() {
NonceGenerator getNonceGenerator() {
return nonceGenerator;
}
@Override
public RpcClient getRpcClient() {
return rpcClient;
}
private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}
@ -380,16 +373,4 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
Optional<MetricsConnection> getConnectionMetrics() {
return metrics;
}
@Override
public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
return new AsyncRegionServerAdmin(serverName, this);
}
@Override
public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
boolean writeFlushWALMarker) {
RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
return admin.flushRegionInternal(regionName, writeFlushWALMarker);
}
}

View File

@ -17,9 +17,13 @@
*/
package org.apache.hadoop.hbase.client;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
@ -49,4 +53,17 @@ public interface AsyncClusterConnection extends AsyncConnection {
* Flush a region and get the response.
*/
CompletableFuture<FlushRegionResponse> flush(byte[] regionName, boolean writeFlushWALMarker);
/**
* Replicate wal edits for replica regions. The return value is the edits we skipped, as the
* original return value is useless.
*/
CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
List<Entry> entries, int replicaId, int numRetries, long operationTimeoutNs);
/**
* Return all the replicas for a region. Used for regiong replica replication.
*/
CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
boolean reload);
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
/**
* The implementation of AsyncClusterConnection.
*/
@InterfaceAudience.Private
class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClusterConnection {
public AsyncClusterConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
SocketAddress localAddress, User user) {
super(conf, registry, clusterId, localAddress, user);
}
@Override
public NonceGenerator getNonceGenerator() {
return super.getNonceGenerator();
}
@Override
public RpcClient getRpcClient() {
return rpcClient;
}
@Override
public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
return new AsyncRegionServerAdmin(serverName, this);
}
@Override
public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
boolean writeFlushWALMarker) {
RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
return admin.flushRegionInternal(regionName, writeFlushWALMarker);
}
@Override
public CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
List<Entry> entries, int replicaId, int retries, long operationTimeoutNs) {
return new AsyncRegionReplicaReplayRetryingCaller(RETRY_TIMER, this,
ConnectionUtils.retries2Attempts(retries), operationTimeoutNs, tableName, encodedRegionName,
row, entries, replicaId).call();
}
@Override
public CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
boolean reload) {
return getLocator().getRegionLocations(tableName, row, RegionLocateType.CURRENT, reload, -1L);
}
}

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
/**
* For replaying edits for region replica.
* <p/>
* The mainly difference here is that, every time after locating, we will check whether the region
* name is equal, if not, we will give up, as this usually means the region has been split or
* merged, and the new region(s) should already have all the data of the parent region(s).
* <p/>
* Notice that, the return value is the edits we skipped, as the original response message is not
* used at upper layer.
*/
@InterfaceAudience.Private
public class AsyncRegionReplicaReplayRetryingCaller extends AsyncRpcRetryingCaller<Long> {
private static final Logger LOG =
LoggerFactory.getLogger(AsyncRegionReplicaReplayRetryingCaller.class);
private final TableName tableName;
private final byte[] encodedRegionName;
private final byte[] row;
private final Entry[] entries;
private final int replicaId;
public AsyncRegionReplicaReplayRetryingCaller(HashedWheelTimer retryTimer,
AsyncClusterConnectionImpl conn, int maxAttempts, long operationTimeoutNs,
TableName tableName, byte[] encodedRegionName, byte[] row, List<Entry> entries,
int replicaId) {
super(retryTimer, conn, ConnectionUtils.getPriority(tableName), conn.connConf.getPauseNs(),
conn.connConf.getPauseForCQTBENs(), maxAttempts, operationTimeoutNs,
conn.connConf.getWriteRpcTimeoutNs(), conn.connConf.getStartLogErrorsCnt());
this.tableName = tableName;
this.encodedRegionName = encodedRegionName;
this.row = row;
this.entries = entries.toArray(new Entry[0]);
this.replicaId = replicaId;
}
private void call(HRegionLocation loc) {
if (!Bytes.equals(encodedRegionName, loc.getRegion().getEncodedNameAsBytes())) {
if (LOG.isTraceEnabled()) {
LOG.trace(
"Skipping {} entries in table {} because located region {} is different than" +
" the original region {} from WALEdit",
entries.length, tableName, loc.getRegion().getEncodedName(),
Bytes.toStringBinary(encodedRegionName));
for (Entry entry : entries) {
LOG.trace("Skipping : " + entry);
}
}
future.complete(Long.valueOf(entries.length));
return;
}
AdminService.Interface stub;
try {
stub = conn.getAdminStub(loc.getServerName());
} catch (IOException e) {
onError(e,
() -> "Get async admin stub to " + loc.getServerName() + " for '" +
Bytes.toStringBinary(row) + "' in " + loc.getRegion().getEncodedName() + " of " +
tableName + " failed",
err -> conn.getLocator().updateCachedLocationOnError(loc, err));
return;
}
Pair<ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil
.buildReplicateWALEntryRequest(entries, encodedRegionName, null, null, null);
resetCallTimeout();
controller.setCellScanner(p.getSecond());
stub.replay(controller, p.getFirst(), r -> {
if (controller.failed()) {
onError(controller.getFailed(),
() -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " +
loc.getRegion().getEncodedName() + " of " + tableName + " failed",
err -> conn.getLocator().updateCachedLocationOnError(loc, err));
} else {
future.complete(0L);
}
});
}
@Override
protected void doCall() {
long locateTimeoutNs;
if (operationTimeoutNs > 0) {
locateTimeoutNs = remainingTimeNs();
if (locateTimeoutNs <= 0) {
completeExceptionally();
return;
}
} else {
locateTimeoutNs = -1L;
}
addListener(conn.getLocator().getRegionLocation(tableName, row, replicaId,
RegionLocateType.CURRENT, locateTimeoutNs), (loc, error) -> {
if (error != null) {
onError(error,
() -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed", err -> {
});
return;
}
call(loc);
});
}
}

View File

@ -164,8 +164,9 @@ public class AsyncRegionServerAdmin {
cellScanner);
}
public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request) {
return call((stub, controller, done) -> stub.replay(controller, request, done));
public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request,
CellScanner cellScanner) {
return call((stub, controller, done) -> stub.replay(controller, request, done), cellScanner);
}
public CompletableFuture<RollWALWriterResponse> rollWALWriter(RollWALWriterRequest request) {

View File

@ -46,6 +46,6 @@ public final class ClusterConnectionFactory {
SocketAddress localAddress, User user) throws IOException {
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
String clusterId = FutureUtils.get(registry.getClusterId());
return new AsyncConnectionImpl(conf, registry, clusterId, localAddress, user);
return new AsyncClusterConnectionImpl(conf, registry, clusterId, localAddress, user);
}
}

View File

@ -37,7 +37,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@InterfaceAudience.Private
@ -55,20 +56,18 @@ public class ReplicationProtbufUtil {
public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries,
String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir)
throws IOException {
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(
entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond()));
}
/**
* Create a new ReplicateWALEntryRequest from a list of WAL entries
*
* @param entries the WAL entries to be replicated
* @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
* found.
* @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found.
*/
public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
buildReplicateWALEntryRequest(final Entry[] entries) throws IOException {
public static Pair<ReplicateWALEntryRequest, CellScanner> buildReplicateWALEntryRequest(
final Entry[] entries) {
return buildReplicateWALEntryRequest(entries, null, null, null, null);
}
@ -82,16 +81,14 @@ public class ReplicationProtbufUtil {
* @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
* @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found.
*/
public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName,
String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir)
throws IOException {
public static Pair<ReplicateWALEntryRequest, CellScanner> buildReplicateWALEntryRequest(
final Entry[] entries, byte[] encodedRegionName, String replicationClusterId,
Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) {
// Accumulate all the Cells seen in here.
List<List<? extends Cell>> allCells = new ArrayList<>(entries.length);
int size = 0;
AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
AdminProtos.ReplicateWALEntryRequest.Builder builder =
AdminProtos.ReplicateWALEntryRequest.newBuilder();
WALEntry.Builder entryBuilder = WALEntry.newBuilder();
ReplicateWALEntryRequest.Builder builder = ReplicateWALEntryRequest.newBuilder();
for (Entry entry: entries) {
entryBuilder.clear();
@ -99,8 +96,8 @@ public class ReplicationProtbufUtil {
try {
keyBuilder = entry.getKey().getBuilder(WALCellCodec.getNoneCompressor());
} catch (IOException e) {
throw new IOException(
"There should not throw exception since NoneCompressor do not throw any exceptions", e);
throw new AssertionError(
"There should not throw exception since NoneCompressor do not throw any exceptions", e);
}
if(encodedRegionName != null){
keyBuilder.setEncodedRegionName(

View File

@ -185,7 +185,6 @@ public class RegionReplicaFlushHandler extends EventHandler {
"Was not able to trigger a flush from primary region due to old server version? " +
"Continuing to open the secondary region replica: " +
region.getRegionInfo().getRegionNameAsString());
region.setReadsEnabled(true);
break;
}
}
@ -195,6 +194,6 @@ public class RegionReplicaFlushHandler extends EventHandler {
throw new InterruptedIOException(e.getMessage());
}
}
region.setReadsEnabled(true);
}
}

View File

@ -29,6 +29,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
@ -53,6 +54,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
class Context {
private final Server server;
private final Configuration localConf;
private final Configuration conf;
private final FileSystem fs;
@ -64,16 +66,11 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
private final Abortable abortable;
@InterfaceAudience.Private
public Context(
final Configuration localConf,
final Configuration conf,
final FileSystem fs,
final String peerId,
final UUID clusterId,
final ReplicationPeer replicationPeer,
final MetricsSource metrics,
final TableDescriptors tableDescriptors,
final Abortable abortable) {
public Context(final Server server, final Configuration localConf, final Configuration conf,
final FileSystem fs, final String peerId, final UUID clusterId,
final ReplicationPeer replicationPeer, final MetricsSource metrics,
final TableDescriptors tableDescriptors, final Abortable abortable) {
this.server = server;
this.localConf = localConf;
this.conf = conf;
this.fs = fs;
@ -84,34 +81,50 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
this.tableDescriptors = tableDescriptors;
this.abortable = abortable;
}
public Server getServer() {
return server;
}
public Configuration getConfiguration() {
return conf;
}
public Configuration getLocalConfiguration() {
return localConf;
}
public FileSystem getFilesystem() {
return fs;
}
public UUID getClusterId() {
return clusterId;
}
public String getPeerId() {
return peerId;
}
public ReplicationPeerConfig getPeerConfig() {
return replicationPeer.getPeerConfig();
}
public ReplicationPeer getReplicationPeer() {
return replicationPeer;
}
public MetricsSource getMetrics() {
return metrics;
}
public TableDescriptors getTableDescriptors() {
return tableDescriptors;
}
public Abortable getAbortable() { return abortable; }
public Abortable getAbortable() {
return abortable;
}
}
/**

View File

@ -19,67 +19,47 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RetryingCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.AtomicUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.EntryBuffers;
import org.apache.hadoop.hbase.wal.OutputSink;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
/**
* A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint
* which receives the WAL edits from the WAL, and sends the edits to replicas
* of regions.
* A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint which receives the WAL
* edits from the WAL, and sends the edits to replicas of regions.
*/
@InterfaceAudience.Private
public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
@ -87,32 +67,55 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaReplicationEndpoint.class);
// Can be configured differently than hbase.client.retries.number
private static String CLIENT_RETRIES_NUMBER
= "hbase.region.replica.replication.client.retries.number";
private static String CLIENT_RETRIES_NUMBER =
"hbase.region.replica.replication.client.retries.number";
private Configuration conf;
private ClusterConnection connection;
private AsyncClusterConnection connection;
private TableDescriptors tableDescriptors;
// Reuse WALSplitter constructs as a WAL pipe
private PipelineController controller;
private RegionReplicaOutputSink outputSink;
private EntryBuffers entryBuffers;
private int numRetries;
// Number of writer threads
private int numWriterThreads;
private long operationTimeoutNs;
private int operationTimeout;
private LoadingCache<TableName, Optional<TableDescriptor>> tableDescriptorCache;
private ExecutorService pool;
private Cache<TableName, TableName> disabledTableCache;
private final RetryCounterFactory retryCounterFactory =
new RetryCounterFactory(Integer.MAX_VALUE, 1000, 60000);
@Override
public void init(Context context) throws IOException {
super.init(context);
this.conf = HBaseConfiguration.create(context.getConfiguration());
this.conf = context.getConfiguration();
this.tableDescriptors = context.getTableDescriptors();
int memstoreReplicationEnabledCacheExpiryMs = conf
.getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
// A cache for the table "memstore replication enabled" flag.
// It has a default expiry of 5 sec. This means that if the table is altered
// with a different flag value, we might miss to replicate for that amount of
// time. But this cache avoid the slow lookup and parsing of the TableDescriptor.
tableDescriptorCache = CacheBuilder.newBuilder()
.expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
.initialCapacity(10).maximumSize(1000)
.build(new CacheLoader<TableName, Optional<TableDescriptor>>() {
@Override
public Optional<TableDescriptor> load(TableName tableName) throws Exception {
// check if the table requires memstore replication
// some unit-test drop the table, so we should do a bypass check and always replicate.
return Optional.ofNullable(tableDescriptors.get(tableName));
}
});
int nonExistentTableCacheExpiryMs =
conf.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
// A cache for non existing tables that have a default expiry of 5 sec. This means that if the
// table is created again with the same name, we might miss to replicate for that amount of
// time. But this cache prevents overloading meta requests for every edit from a deleted file.
disabledTableCache = CacheBuilder.newBuilder()
.expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS).initialCapacity(10)
.maximumSize(1000).build();
// HRS multiplies client retries by 10 globally for meta operations, but we do not want this.
// We are resetting it here because we want default number of retries (35) rather than 10 times
// that which makes very long retries for disabled tables etc.
@ -123,133 +126,247 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already
}
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
int numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries);
this.numWriterThreads = this.conf.getInt(
"hbase.region.replica.replication.writer.threads", 3);
controller = new PipelineController();
entryBuffers = new EntryBuffers(controller,
this.conf.getLong("hbase.region.replica.replication.buffersize", 128 * 1024 * 1024));
this.numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
// use the regular RPC timeout for replica replication RPC's
this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
}
@Override
protected void doStart() {
try {
connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
this.pool = getDefaultThreadPool(conf);
outputSink = new RegionReplicaOutputSink(controller, tableDescriptors, entryBuffers,
connection, pool, numWriterThreads, operationTimeout);
outputSink.startWriterThreads();
super.doStart();
} catch (IOException ex) {
LOG.warn("Received exception while creating connection :" + ex);
notifyFailed(ex);
}
}
@Override
protected void doStop() {
if (outputSink != null) {
try {
outputSink.finishWritingAndClose();
} catch (IOException ex) {
LOG.warn("Got exception while trying to close OutputSink", ex);
}
}
if (this.pool != null) {
this.pool.shutdownNow();
try {
// wait for 10 sec
boolean shutdown = this.pool.awaitTermination(10000, TimeUnit.MILLISECONDS);
if (!shutdown) {
LOG.warn("Failed to shutdown the thread pool after 10 seconds");
}
} catch (InterruptedException e) {
LOG.warn("Got interrupted while waiting for the thread pool to shut down" + e);
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException ex) {
LOG.warn("Got exception closing connection :" + ex);
}
}
super.doStop();
this.operationTimeoutNs =
TimeUnit.MILLISECONDS.toNanos(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
this.connection = context.getServer().getAsyncClusterConnection();
}
/**
* Returns a Thread pool for the RPC's to region replicas. Similar to
* Connection's thread pool.
* returns true if the specified entry must be replicated. We should always replicate meta
* operations (e.g. flush) and use the user HTD flag to decide whether or not replicate the
* memstore.
*/
private ExecutorService getDefaultThreadPool(Configuration conf) {
int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256);
if (maxThreads == 0) {
maxThreads = Runtime.getRuntime().availableProcessors() * 8;
private boolean requiresReplication(Optional<TableDescriptor> tableDesc, Entry entry) {
// empty edit does not need to be replicated
if (entry.getEdit().isEmpty() || !tableDesc.isPresent()) {
return false;
}
long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60);
LinkedBlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<>(maxThreads *
conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
maxThreads,
maxThreads,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
Threads.newDaemonThreadFactory(this.getClass().getSimpleName() + "-rpc-shared-"));
tpe.allowCoreThreadTimeOut(true);
return tpe;
// meta edits (e.g. flush) must be always replicated
return entry.getEdit().isMetaEdit() || tableDesc.get().hasRegionMemStoreReplication();
}
private void getRegionLocations(CompletableFuture<RegionLocations> future,
TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, boolean reload) {
FutureUtils.addListener(connection.getRegionLocations(tableDesc.getTableName(), row, reload),
(r, e) -> {
if (e != null) {
future.completeExceptionally(e);
return;
}
// if we are not loading from cache, just return
if (reload) {
future.complete(r);
return;
}
// check if the number of region replicas is correct, and also the primary region name
// matches
if (r.size() == tableDesc.getRegionReplication() && Bytes.equals(
r.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(), encodedRegionName)) {
future.complete(r);
} else {
// reload again as the information in cache maybe stale
getRegionLocations(future, tableDesc, encodedRegionName, row, true);
}
});
}
private void replicate(CompletableFuture<Long> future, RegionLocations locs,
TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, List<Entry> entries) {
if (locs.size() == 1) {
// Could this happen?
future.complete(Long.valueOf(entries.size()));
return;
}
if (!Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(),
encodedRegionName)) {
// the region name is not equal, this usually means the region has been split or merged, so
// give up replicating as the new region(s) should already have all the data of the parent
// region(s).
if (LOG.isTraceEnabled()) {
LOG.trace(
"Skipping {} entries in table {} because located region {} is different than" +
" the original region {} from WALEdit",
tableDesc.getTableName(), locs.getDefaultRegionLocation().getRegion().getEncodedName(),
Bytes.toStringBinary(encodedRegionName));
}
future.complete(Long.valueOf(entries.size()));
return;
}
AtomicReference<Throwable> error = new AtomicReference<>();
AtomicInteger remainingTasks = new AtomicInteger(locs.size() - 1);
AtomicLong skippedEdits = new AtomicLong(0);
for (int i = 1, n = locs.size(); i < n; i++) {
final int replicaId = i;
FutureUtils.addListener(connection.replay(tableDesc.getTableName(),
locs.getRegionLocation(replicaId).getRegion().getEncodedNameAsBytes(), row, entries,
replicaId, numRetries, operationTimeoutNs), (r, e) -> {
if (e != null) {
LOG.warn("Failed to replicate to {}", locs.getRegionLocation(replicaId), e);
error.compareAndSet(null, e);
} else {
AtomicUtils.updateMax(skippedEdits, r.longValue());
}
if (remainingTasks.decrementAndGet() == 0) {
if (error.get() != null) {
future.completeExceptionally(error.get());
} else {
future.complete(skippedEdits.get());
}
}
});
}
}
private void logSkipped(TableName tableName, List<Entry> entries, String reason) {
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping {} entries because table {} is {}", entries.size(), tableName, reason);
for (Entry entry : entries) {
LOG.trace("Skipping : {}", entry);
}
}
}
private CompletableFuture<Long> replicate(TableDescriptor tableDesc, byte[] encodedRegionName,
List<Entry> entries) {
if (disabledTableCache.getIfPresent(tableDesc.getTableName()) != null) {
logSkipped(tableDesc.getTableName(), entries, "cached as a disabled table");
return CompletableFuture.completedFuture(Long.valueOf(entries.size()));
}
byte[] row = CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0));
CompletableFuture<RegionLocations> locateFuture = new CompletableFuture<>();
getRegionLocations(locateFuture, tableDesc, encodedRegionName, row, false);
CompletableFuture<Long> future = new CompletableFuture<>();
FutureUtils.addListener(locateFuture, (locs, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else {
replicate(future, locs, tableDesc, encodedRegionName, row, entries);
}
});
return future;
}
@Override
public boolean replicate(ReplicateContext replicateContext) {
/* A note on batching in RegionReplicaReplicationEndpoint (RRRE):
*
* RRRE relies on batching from two different mechanisms. The first is the batching from
* ReplicationSource since RRRE is a ReplicationEndpoint driven by RS. RS reads from a single
* WAL file filling up a buffer of heap size "replication.source.size.capacity"(64MB) or at most
* "replication.source.nb.capacity" entries or until it sees the end of file (in live tailing).
* Then RS passes all the buffered edits in this replicate() call context. RRRE puts the edits
* to the WALSplitter.EntryBuffers which is a blocking buffer space of up to
* "hbase.region.replica.replication.buffersize" (128MB) in size. This buffer splits the edits
* based on regions.
*
* There are "hbase.region.replica.replication.writer.threads"(default 3) writer threads which
* pick largest per-region buffer and send it to the SinkWriter (see RegionReplicaOutputSink).
* The SinkWriter in this case will send the wal edits to all secondary region replicas in
* parallel via a retrying rpc call. EntryBuffers guarantees that while a buffer is
* being written to the sink, another buffer for the same region will not be made available to
* writers ensuring regions edits are not replayed out of order.
*
* The replicate() call won't return until all the buffers are sent and ack'd by the sinks so
* that the replication can assume all edits are persisted. We may be able to do a better
* pipelining between the replication thread and output sinks later if it becomes a bottleneck.
*/
while (this.isRunning()) {
try {
for (Entry entry: replicateContext.getEntries()) {
entryBuffers.appendEntry(entry);
Map<byte[], Pair<TableDescriptor, List<Entry>>> encodedRegionName2Entries =
new TreeMap<>(Bytes.BYTES_COMPARATOR);
long skippedEdits = 0;
RetryCounter retryCounter = retryCounterFactory.create();
outer: while (isRunning()) {
encodedRegionName2Entries.clear();
skippedEdits = 0;
for (Entry entry : replicateContext.getEntries()) {
Optional<TableDescriptor> tableDesc;
try {
tableDesc = tableDescriptorCache.get(entry.getKey().getTableName());
} catch (ExecutionException e) {
LOG.warn("Failed to load table descriptor for {}, attempts={}",
entry.getKey().getTableName(), retryCounter.getAttemptTimes(), e.getCause());
if (!retryCounter.shouldRetry()) {
return false;
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException e1) {
// restore the interrupted state
Thread.currentThread().interrupt();
return false;
}
continue outer;
}
outputSink.flush(); // make sure everything is flushed
ctx.getMetrics().incrLogEditsFiltered(
outputSink.getSkippedEditsCounter().getAndSet(0));
if (!requiresReplication(tableDesc, entry)) {
skippedEdits++;
continue;
}
byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
encodedRegionName2Entries
.computeIfAbsent(encodedRegionName, k -> Pair.newPair(tableDesc.get(), new ArrayList<>()))
.getSecond().add(entry);
}
break;
}
// send the request to regions
retryCounter = retryCounterFactory.create();
while (isRunning()) {
List<Pair<CompletableFuture<Long>, byte[]>> futureAndEncodedRegionNameList =
new ArrayList<Pair<CompletableFuture<Long>, byte[]>>();
for (Map.Entry<byte[], Pair<TableDescriptor, List<Entry>>> entry : encodedRegionName2Entries
.entrySet()) {
CompletableFuture<Long> future =
replicate(entry.getValue().getFirst(), entry.getKey(), entry.getValue().getSecond());
futureAndEncodedRegionNameList.add(Pair.newPair(future, entry.getKey()));
}
for (Pair<CompletableFuture<Long>, byte[]> pair : futureAndEncodedRegionNameList) {
byte[] encodedRegionName = pair.getSecond();
try {
skippedEdits += pair.getFirst().get().longValue();
encodedRegionName2Entries.remove(encodedRegionName);
} catch (InterruptedException e) {
// restore the interrupted state
Thread.currentThread().interrupt();
return false;
} catch (ExecutionException e) {
Pair<TableDescriptor, List<Entry>> tableAndEntries =
encodedRegionName2Entries.get(encodedRegionName);
TableName tableName = tableAndEntries.getFirst().getTableName();
List<Entry> entries = tableAndEntries.getSecond();
Throwable cause = e.getCause();
// The table can be disabled or dropped at this time. For disabled tables, we have no
// cheap mechanism to detect this case because meta does not contain this information.
// ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay
// RPC. So instead we start the replay RPC with retries and check whether the table is
// dropped or disabled which might cause SocketTimeoutException, or
// RetriesExhaustedException or similar if we get IOE.
if (cause instanceof TableNotFoundException) {
// add to cache that the table does not exist
tableDescriptorCache.put(tableName, Optional.empty());
logSkipped(tableName, entries, "dropped");
skippedEdits += entries.size();
encodedRegionName2Entries.remove(encodedRegionName);
continue;
}
boolean disabled = false;
try {
disabled = connection.getAdmin().isTableDisabled(tableName).get();
} catch (InterruptedException e1) {
// restore the interrupted state
Thread.currentThread().interrupt();
return false;
} catch (ExecutionException e1) {
LOG.warn("Failed to test whether {} is disabled, assume it is not disabled", tableName,
e1.getCause());
}
if (disabled) {
disabledTableCache.put(tableName, tableName);
logSkipped(tableName, entries, "disabled");
skippedEdits += entries.size();
encodedRegionName2Entries.remove(encodedRegionName);
continue;
}
LOG.warn("Failed to replicate {} entries for region {} of table {}", entries.size(),
Bytes.toStringBinary(encodedRegionName), tableName);
}
}
// we have done
if (encodedRegionName2Entries.isEmpty()) {
ctx.getMetrics().incrLogEditsFiltered(skippedEdits);
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
} catch (IOException e) {
LOG.warn("Received IOException while trying to replicate"
+ StringUtils.stringifyException(e));
} else {
LOG.warn("Failed to replicate all entris, retry={}", retryCounter.getAttemptTimes());
if (!retryCounter.shouldRetry()) {
return false;
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException e) {
// restore the interrupted state
Thread.currentThread().interrupt();
return false;
}
}
}
@ -266,373 +383,4 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
// we do not care about scope. We replicate everything.
return null;
}
static class RegionReplicaOutputSink extends OutputSink {
private final RegionReplicaSinkWriter sinkWriter;
private final TableDescriptors tableDescriptors;
private final Cache<TableName, Boolean> memstoreReplicationEnabled;
public RegionReplicaOutputSink(PipelineController controller, TableDescriptors tableDescriptors,
EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool,
int numWriters, int operationTimeout) {
super(controller, entryBuffers, numWriters);
this.sinkWriter =
new RegionReplicaSinkWriter(this, connection, pool, operationTimeout, tableDescriptors);
this.tableDescriptors = tableDescriptors;
// A cache for the table "memstore replication enabled" flag.
// It has a default expiry of 5 sec. This means that if the table is altered
// with a different flag value, we might miss to replicate for that amount of
// time. But this cache avoid the slow lookup and parsing of the TableDescriptor.
int memstoreReplicationEnabledCacheExpiryMs = connection.getConfiguration()
.getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
this.memstoreReplicationEnabled = CacheBuilder.newBuilder()
.expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
.initialCapacity(10)
.maximumSize(1000)
.build();
}
@Override
public void append(RegionEntryBuffer buffer) throws IOException {
List<Entry> entries = buffer.getEntryBuffer();
if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) {
return;
}
// meta edits (e.g. flush) are always replicated.
// data edits (e.g. put) are replicated if the table requires them.
if (!requiresReplication(buffer.getTableName(), entries)) {
return;
}
sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(),
CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0)), entries);
}
@Override
public boolean flush() throws IOException {
// nothing much to do for now. Wait for the Writer threads to finish up
// append()'ing the data.
entryBuffers.waitUntilDrained();
return super.flush();
}
@Override
public boolean keepRegionEvent(Entry entry) {
return true;
}
@Override
public List<Path> finishWritingAndClose() throws IOException {
finishWriting(true);
return null;
}
@Override
public Map<byte[], Long> getOutputCounts() {
return null; // only used in tests
}
@Override
public int getNumberOfRecoveredRegions() {
return 0;
}
AtomicLong getSkippedEditsCounter() {
return skippedEdits;
}
/**
* returns true if the specified entry must be replicated.
* We should always replicate meta operations (e.g. flush)
* and use the user HTD flag to decide whether or not replicate the memstore.
*/
private boolean requiresReplication(final TableName tableName, final List<Entry> entries)
throws IOException {
// unit-tests may not the TableDescriptors, bypass the check and always replicate
if (tableDescriptors == null) return true;
Boolean requiresReplication = memstoreReplicationEnabled.getIfPresent(tableName);
if (requiresReplication == null) {
// check if the table requires memstore replication
// some unit-test drop the table, so we should do a bypass check and always replicate.
TableDescriptor htd = tableDescriptors.get(tableName);
requiresReplication = htd == null || htd.hasRegionMemStoreReplication();
memstoreReplicationEnabled.put(tableName, requiresReplication);
}
// if memstore replication is not required, check the entries.
// meta edits (e.g. flush) must be always replicated.
if (!requiresReplication) {
int skipEdits = 0;
java.util.Iterator<Entry> it = entries.iterator();
while (it.hasNext()) {
Entry entry = it.next();
if (entry.getEdit().isMetaEdit()) {
requiresReplication = true;
} else {
it.remove();
skipEdits++;
}
}
skippedEdits.addAndGet(skipEdits);
}
return requiresReplication;
}
}
static class RegionReplicaSinkWriter extends SinkWriter {
RegionReplicaOutputSink sink;
ClusterConnection connection;
RpcControllerFactory rpcControllerFactory;
RpcRetryingCallerFactory rpcRetryingCallerFactory;
int operationTimeout;
ExecutorService pool;
Cache<TableName, Boolean> disabledAndDroppedTables;
TableDescriptors tableDescriptors;
public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection,
ExecutorService pool, int operationTimeout, TableDescriptors tableDescriptors) {
this.sink = sink;
this.connection = connection;
this.operationTimeout = operationTimeout;
this.rpcRetryingCallerFactory
= RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
this.pool = pool;
this.tableDescriptors = tableDescriptors;
int nonExistentTableCacheExpiryMs = connection.getConfiguration()
.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
// A cache for non existing tables that have a default expiry of 5 sec. This means that if the
// table is created again with the same name, we might miss to replicate for that amount of
// time. But this cache prevents overloading meta requests for every edit from a deleted file.
disabledAndDroppedTables = CacheBuilder.newBuilder()
.expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS)
.initialCapacity(10)
.maximumSize(1000)
.build();
}
public void append(TableName tableName, byte[] encodedRegionName, byte[] row,
List<Entry> entries) throws IOException {
if (disabledAndDroppedTables.getIfPresent(tableName) != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
+ " is cached as a disabled or dropped table");
for (Entry entry : entries) {
LOG.trace("Skipping : " + entry);
}
}
sink.getSkippedEditsCounter().addAndGet(entries.size());
return;
}
// If the table is disabled or dropped, we should not replay the entries, and we can skip
// replaying them. However, we might not know whether the table is disabled until we
// invalidate the cache and check from meta
RegionLocations locations = null;
boolean useCache = true;
while (true) {
// get the replicas of the primary region
try {
locations = RegionReplicaReplayCallable
.getRegionLocations(connection, tableName, row, useCache, 0);
if (locations == null) {
throw new HBaseIOException("Cannot locate locations for "
+ tableName + ", row:" + Bytes.toStringBinary(row));
}
} catch (TableNotFoundException e) {
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
+ " is dropped. Adding table to cache.");
for (Entry entry : entries) {
LOG.trace("Skipping : " + entry);
}
}
disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored
// skip this entry
sink.getSkippedEditsCounter().addAndGet(entries.size());
return;
}
// check whether we should still replay this entry. If the regions are changed, or the
// entry is not coming from the primary region, filter it out.
HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
if (!Bytes.equals(primaryLocation.getRegion().getEncodedNameAsBytes(),
encodedRegionName)) {
if (useCache) {
useCache = false;
continue; // this will retry location lookup
}
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
+ " because located region " + primaryLocation.getRegion().getEncodedName()
+ " is different than the original region " + Bytes.toStringBinary(encodedRegionName)
+ " from WALEdit");
for (Entry entry : entries) {
LOG.trace("Skipping : " + entry);
}
}
sink.getSkippedEditsCounter().addAndGet(entries.size());
return;
}
break;
}
if (locations.size() == 1) {
return;
}
ArrayList<Future<ReplicateWALEntryResponse>> tasks = new ArrayList<>(locations.size() - 1);
// All passed entries should belong to one region because it is coming from the EntryBuffers
// split per region. But the regions might split and merge (unlike log recovery case).
for (int replicaId = 0; replicaId < locations.size(); replicaId++) {
HRegionLocation location = locations.getRegionLocation(replicaId);
if (!RegionReplicaUtil.isDefaultReplica(replicaId)) {
RegionInfo regionInfo = location == null
? RegionReplicaUtil.getRegionInfoForReplica(
locations.getDefaultRegionLocation().getRegion(), replicaId)
: location.getRegion();
RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
rpcControllerFactory, tableName, location, regionInfo, row, entries,
sink.getSkippedEditsCounter());
Future<ReplicateWALEntryResponse> task = pool.submit(
new RetryingRpcCallable<>(rpcRetryingCallerFactory, callable, operationTimeout));
tasks.add(task);
}
}
boolean tasksCancelled = false;
for (int replicaId = 0; replicaId < tasks.size(); replicaId++) {
try {
tasks.get(replicaId).get();
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
} catch (ExecutionException e) {
Throwable cause = e.getCause();
boolean canBeSkipped = false;
if (cause instanceof IOException) {
// The table can be disabled or dropped at this time. For disabled tables, we have no
// cheap mechanism to detect this case because meta does not contain this information.
// ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay
// RPC. So instead we start the replay RPC with retries and check whether the table is
// dropped or disabled which might cause SocketTimeoutException, or
// RetriesExhaustedException or similar if we get IOE.
if (cause instanceof TableNotFoundException
|| connection.isTableDisabled(tableName)) {
disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
canBeSkipped = true;
} else if (tableDescriptors != null) {
TableDescriptor tableDescriptor = tableDescriptors.get(tableName);
if (tableDescriptor != null
//(replicaId + 1) as no task is added for primary replica for replication
&& tableDescriptor.getRegionReplication() <= (replicaId + 1)) {
canBeSkipped = true;
}
}
if (canBeSkipped) {
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
+ " because received exception for dropped or disabled table",
cause);
for (Entry entry : entries) {
LOG.trace("Skipping : " + entry);
}
}
if (!tasksCancelled) {
sink.getSkippedEditsCounter().addAndGet(entries.size());
tasksCancelled = true; // so that we do not add to skipped counter again
}
continue;
}
// otherwise rethrow
throw (IOException)cause;
}
// unexpected exception
throw new IOException(cause);
}
}
}
}
static class RetryingRpcCallable<V> implements Callable<V> {
RpcRetryingCallerFactory factory;
RetryingCallable<V> callable;
int timeout;
public RetryingRpcCallable(RpcRetryingCallerFactory factory, RetryingCallable<V> callable,
int timeout) {
this.factory = factory;
this.callable = callable;
this.timeout = timeout;
}
@Override
public V call() throws Exception {
return factory.<V>newCaller().callWithRetries(callable, timeout);
}
}
/**
* Calls replay on the passed edits for the given set of entries belonging to the region. It skips
* the entry if the region boundaries have changed or the region is gone.
*/
static class RegionReplicaReplayCallable extends
RegionAdminServiceCallable<ReplicateWALEntryResponse> {
private final List<Entry> entries;
private final byte[] initialEncodedRegionName;
private final AtomicLong skippedEntries;
public RegionReplicaReplayCallable(ClusterConnection connection,
RpcControllerFactory rpcControllerFactory, TableName tableName,
HRegionLocation location, RegionInfo regionInfo, byte[] row,List<Entry> entries,
AtomicLong skippedEntries) {
super(connection, rpcControllerFactory, location, tableName, row, regionInfo.getReplicaId());
this.entries = entries;
this.skippedEntries = skippedEntries;
this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
}
@Override
public ReplicateWALEntryResponse call(HBaseRpcController controller) throws Exception {
// Check whether we should still replay this entry. If the regions are changed, or the
// entry is not coming form the primary region, filter it out because we do not need it.
// Regions can change because of (1) region split (2) region merge (3) table recreated
boolean skip = false;
if (!Bytes.equals(location.getRegion().getEncodedNameAsBytes(),
initialEncodedRegionName)) {
skip = true;
}
if (!this.entries.isEmpty() && !skip) {
Entry[] entriesArray = new Entry[this.entries.size()];
entriesArray = this.entries.toArray(entriesArray);
// set the region name for the target region replica
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location
.getRegion().getEncodedNameAsBytes(), null, null, null);
controller.setCellScanner(p.getSecond());
return stub.replay(controller, p.getFirst());
}
if (skip) {
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
+ " because located region " + location.getRegion().getEncodedName()
+ " is different than the original region "
+ Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit");
for (Entry entry : entries) {
LOG.trace("Skipping : " + entry);
}
}
skippedEntries.addAndGet(entries.size());
}
return ReplicateWALEntryResponse.newBuilder().build();
}
}
}

View File

@ -283,7 +283,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
tableDescriptors = ((HRegionServer) server).getTableDescriptors();
}
replicationEndpoint
.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs,
.init(new ReplicationEndpoint.Context(server, conf, replicationPeer.getConfiguration(), fs,
replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server));
replicationEndpoint.start();
replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);

View File

@ -20,16 +20,17 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.CellBuilderFactory;
@ -43,7 +44,6 @@ import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
@ -51,12 +51,12 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
@ -383,9 +383,8 @@ public class TestRegionReplicaReplicationEndpoint {
testRegionReplicaReplicationIgnores(false, true);
}
public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
private void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
throws Exception {
// tests having edits from a disabled or dropped table is handled correctly by skipping those
// entries and further edits after the edits from dropped/disabled table can be replicated
// without problems.
@ -405,8 +404,7 @@ public class TestRegionReplicaReplicationEndpoint {
HTU.getAdmin().createTable(htd);
// both tables are created, now pause replication
Admin admin = ConnectionFactory.createConnection(HTU.getConfiguration()).getAdmin();
admin.disableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
// now that the replication is disabled, write to the table to be dropped, then drop the table.
@ -416,16 +414,6 @@ public class TestRegionReplicaReplicationEndpoint {
HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000);
AtomicLong skippedEdits = new AtomicLong();
RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink =
mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class);
when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits);
FSTableDescriptors fstd = new FSTableDescriptors(HTU.getConfiguration(),
FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath());
RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter =
new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink,
(ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE,
fstd);
RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
byte[] encodedRegionName = hrl.getRegion().getEncodedNameAsBytes();
@ -436,7 +424,6 @@ public class TestRegionReplicaReplicationEndpoint {
new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
new WALEdit()
.add(cell));
HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
if (dropTable) {
HTU.getAdmin().deleteTable(toBeDisabledTable);
@ -445,11 +432,23 @@ public class TestRegionReplicaReplicationEndpoint {
HTU.getAdmin().modifyTable(htd);
HTU.getAdmin().enableTable(toBeDisabledTable);
}
sinkWriter.append(toBeDisabledTable, encodedRegionName,
HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry));
assertEquals(2, skippedEdits.get());
HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0);
MetricsSource metrics = mock(MetricsSource.class);
ReplicationEndpoint.Context ctx =
new ReplicationEndpoint.Context(rs, HTU.getConfiguration(), HTU.getConfiguration(),
HTU.getTestFileSystem(), ServerRegionReplicaUtil.getReplicationPeerId(),
UUID.fromString(rs.getClusterId()), rs.getReplicationSourceService().getReplicationPeers()
.getPeer(ServerRegionReplicaUtil.getReplicationPeerId()),
metrics, rs.getTableDescriptors(), rs);
RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint();
rrpe.init(ctx);
rrpe.start();
ReplicationEndpoint.ReplicateContext repCtx = new ReplicationEndpoint.ReplicateContext();
repCtx.setEntries(Lists.newArrayList(entry, entry));
assertTrue(rrpe.replicate(repCtx));
verify(metrics, times(1)).incrLogEditsFiltered(eq(2L));
rrpe.stop();
if (disableReplication) {
// enable replication again so that we can verify replication
HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
@ -460,17 +459,14 @@ public class TestRegionReplicaReplicationEndpoint {
try {
// load some data to the to-be-dropped table
// load the data to the table
HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
// now enable the replication
admin.enableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
verifyReplication(tableName, regionReplication, 0, 1000);
} finally {
admin.close();
table.close();
rl.close();
tableToBeDisabled.close();

View File

@ -23,10 +23,12 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -36,24 +38,22 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.WALCoprocessor;
import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.WALObserver;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -73,8 +73,6 @@ import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
/**
* Tests RegionReplicaReplicationEndpoint. Unlike TestRegionReplicaReplicationEndpoint this
* class contains lower level tests using callables.
@ -178,39 +176,34 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
public void testReplayCallable() throws Exception {
// tests replaying the edits to a secondary region replica using the Callable directly
openRegion(HTU, rs0, hriSecondary);
ClusterConnection connection =
(ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
//load some data to primary
// load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
Assert.assertEquals(1000, entries.size());
// replay the edits to the secondary using replay callable
replicateUsingCallable(connection, entries);
try (AsyncClusterConnection conn = ClusterConnectionFactory
.createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
// replay the edits to the secondary using replay callable
replicateUsingCallable(conn, entries);
}
Region region = rs0.getRegion(hriSecondary.getEncodedName());
HTU.verifyNumericRows(region, f, 0, 1000);
HTU.deleteNumericRows(table, f, 0, 1000);
closeRegion(HTU, rs0, hriSecondary);
connection.close();
}
private void replicateUsingCallable(ClusterConnection connection, Queue<Entry> entries)
throws IOException, RuntimeException {
private void replicateUsingCallable(AsyncClusterConnection connection, Queue<Entry> entries)
throws IOException, ExecutionException, InterruptedException {
Entry entry;
while ((entry = entries.poll()) != null) {
byte[] row = CellUtil.cloneRow(entry.getEdit().getCells().get(0));
RegionLocations locations = connection.locateRegion(tableName, row, true, true);
RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
RpcControllerFactory.instantiate(connection.getConfiguration()),
table.getName(), locations.getRegionLocation(1),
locations.getRegionLocation(1).getRegion(), row, Lists.newArrayList(entry),
new AtomicLong());
RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(
connection.getConfiguration());
factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000);
RegionLocations locations = connection.getRegionLocations(tableName, row, true).get();
connection
.replay(tableName, locations.getRegionLocation(1).getRegion().getEncodedNameAsBytes(), row,
Collections.singletonList(entry), 1, Integer.MAX_VALUE, TimeUnit.SECONDS.toNanos(10))
.get();
}
}
@ -218,49 +211,49 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
public void testReplayCallableWithRegionMove() throws Exception {
// tests replaying the edits to a secondary region replica using the Callable directly while
// the region is moved to another location.It tests handling of RME.
openRegion(HTU, rs0, hriSecondary);
ClusterConnection connection =
(ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
//load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
try (AsyncClusterConnection conn = ClusterConnectionFactory
.createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
openRegion(HTU, rs0, hriSecondary);
// load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
Assert.assertEquals(1000, entries.size());
// replay the edits to the secondary using replay callable
replicateUsingCallable(connection, entries);
Assert.assertEquals(1000, entries.size());
Region region = rs0.getRegion(hriSecondary.getEncodedName());
HTU.verifyNumericRows(region, f, 0, 1000);
// replay the edits to the secondary using replay callable
replicateUsingCallable(conn, entries);
HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
Region region = rs0.getRegion(hriSecondary.getEncodedName());
HTU.verifyNumericRows(region, f, 0, 1000);
// move the secondary region from RS0 to RS1
closeRegion(HTU, rs0, hriSecondary);
openRegion(HTU, rs1, hriSecondary);
HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
// replicate the new data
replicateUsingCallable(connection, entries);
// move the secondary region from RS0 to RS1
closeRegion(HTU, rs0, hriSecondary);
openRegion(HTU, rs1, hriSecondary);
region = rs1.getRegion(hriSecondary.getEncodedName());
// verify the new data. old data may or may not be there
HTU.verifyNumericRows(region, f, 1000, 2000);
// replicate the new data
replicateUsingCallable(conn, entries);
HTU.deleteNumericRows(table, f, 0, 2000);
closeRegion(HTU, rs1, hriSecondary);
connection.close();
region = rs1.getRegion(hriSecondary.getEncodedName());
// verify the new data. old data may or may not be there
HTU.verifyNumericRows(region, f, 1000, 2000);
HTU.deleteNumericRows(table, f, 0, 2000);
closeRegion(HTU, rs1, hriSecondary);
}
}
@Test
public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
// tests replaying the edits to a secondary region replica using the RRRE.replicate()
openRegion(HTU, rs0, hriSecondary);
ClusterConnection connection =
(ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
when(context.getServer()).thenReturn(rs0);
when(context.getTableDescriptors()).thenReturn(rs0.getTableDescriptors());
replicator.init(context);
replicator.startAsync();
@ -272,12 +265,11 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
final String fakeWalGroupId = "fakeWALGroup";
replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))
.setWalGroupId(fakeWalGroupId));
replicator.stop();
Region region = rs0.getRegion(hriSecondary.getEncodedName());
HTU.verifyNumericRows(region, f, 0, 1000);
HTU.deleteNumericRows(table, f, 0, 1000);
closeRegion(HTU, rs0, hriSecondary);
connection.close();
}
}