HBASE-26407 Introduce a region replication sink for sinking WAL edits to secondary replicas directly (#3807)

Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
Duo Zhang 2021-11-02 08:42:29 +08:00
parent c14a76c4fd
commit 7286cc0035
30 changed files with 788 additions and 1835 deletions

View File

@ -57,13 +57,6 @@ public interface AsyncClusterConnection extends AsyncConnection {
*/
CompletableFuture<FlushRegionResponse> flush(byte[] regionName, boolean writeFlushWALMarker);
/**
* Replicate wal edits for replica regions. The return value is the edits we skipped, as the
* original return value is useless.
*/
CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
List<Entry> entries, int replicaId, int numRetries, long operationTimeoutNs);
/**
* Return all the replicas for a region. Used for region replica replication.
*/
@ -110,4 +103,10 @@ public interface AsyncClusterConnection extends AsyncConnection {
* Get the bootstrap node list of another region server.
*/
CompletableFuture<List<ServerName>> getAllBootstrapNodes(ServerName regionServer);
/**
* Replicate wal edits to a secondary replica.
*/
CompletableFuture<Void> replicate(RegionInfo replica, List<Entry> entries, int numRetries,
long rpcTimeoutNs, long operationTimeoutNs);
}

View File

@ -84,14 +84,6 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
return admin.flushRegionInternal(regionName, null, writeFlushWALMarker);
}
@Override
public CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
List<Entry> entries, int replicaId, int retries, long operationTimeoutNs) {
return new AsyncRegionReplicaReplayRetryingCaller(RETRY_TIMER, this,
ConnectionUtils.retries2Attempts(retries), operationTimeoutNs, tableName, encodedRegionName,
row, entries, replicaId).call();
}
@Override
public CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
boolean reload) {
@ -176,4 +168,13 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
});
return future;
}
@Override
public CompletableFuture<Void> replicate(RegionInfo replica,
List<Entry> entries, int retries, long rpcTimeoutNs,
long operationTimeoutNs) {
return new AsyncRegionReplicationRetryingCaller(RETRY_TIMER, this,
ConnectionUtils.retries2Attempts(retries), rpcTimeoutNs, operationTimeoutNs, replica, entries)
.call();
}
}

View File

@ -1,147 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
/**
* For replaying edits for region replica.
* <p/>
* The mainly difference here is that, every time after locating, we will check whether the region
* name is equal, if not, we will give up, as this usually means the region has been split or
* merged, and the new region(s) should already have all the data of the parent region(s).
* <p/>
* Notice that, the return value is the edits we skipped, as the original response message is not
* used at upper layer.
*/
@InterfaceAudience.Private
public class AsyncRegionReplicaReplayRetryingCaller extends AsyncRpcRetryingCaller<Long> {
private static final Logger LOG =
LoggerFactory.getLogger(AsyncRegionReplicaReplayRetryingCaller.class);
private final TableName tableName;
private final byte[] encodedRegionName;
private final byte[] row;
private final Entry[] entries;
private final int replicaId;
public AsyncRegionReplicaReplayRetryingCaller(HashedWheelTimer retryTimer,
AsyncClusterConnectionImpl conn, int maxAttempts, long operationTimeoutNs,
TableName tableName, byte[] encodedRegionName, byte[] row, List<Entry> entries,
int replicaId) {
super(retryTimer, conn, ConnectionUtils.getPriority(tableName), conn.connConf.getPauseNs(),
conn.connConf.getPauseForCQTBENs(), maxAttempts, operationTimeoutNs,
conn.connConf.getWriteRpcTimeoutNs(), conn.connConf.getStartLogErrorsCnt());
this.tableName = tableName;
this.encodedRegionName = encodedRegionName;
this.row = row;
this.entries = entries.toArray(new Entry[0]);
this.replicaId = replicaId;
}
private void call(HRegionLocation loc) {
if (!Bytes.equals(encodedRegionName, loc.getRegion().getEncodedNameAsBytes())) {
if (LOG.isTraceEnabled()) {
LOG.trace(
"Skipping {} entries in table {} because located region {} is different than" +
" the original region {} from WALEdit",
entries.length, tableName, loc.getRegion().getEncodedName(),
Bytes.toStringBinary(encodedRegionName));
for (Entry entry : entries) {
LOG.trace("Skipping : " + entry);
}
}
future.complete(Long.valueOf(entries.length));
return;
}
AdminService.Interface stub;
try {
stub = conn.getAdminStub(loc.getServerName());
} catch (IOException e) {
onError(e,
() -> "Get async admin stub to " + loc.getServerName() + " for '" +
Bytes.toStringBinary(row) + "' in " + loc.getRegion().getEncodedName() + " of " +
tableName + " failed",
err -> conn.getLocator().updateCachedLocationOnError(loc, err));
return;
}
Pair<ReplicateWALEntryRequest, CellScanner> p = ReplicationProtobufUtil
.buildReplicateWALEntryRequest(entries, encodedRegionName, null, null, null);
resetCallTimeout();
controller.setCellScanner(p.getSecond());
stub.replay(controller, p.getFirst(), r -> {
if (controller.failed()) {
onError(controller.getFailed(),
() -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " +
loc.getRegion().getEncodedName() + " of " + tableName + " failed",
err -> conn.getLocator().updateCachedLocationOnError(loc, err));
} else {
future.complete(0L);
}
});
}
@Override
protected void doCall() {
long locateTimeoutNs;
if (operationTimeoutNs > 0) {
locateTimeoutNs = remainingTimeNs();
if (locateTimeoutNs <= 0) {
completeExceptionally();
return;
}
} else {
locateTimeoutNs = -1L;
}
addListener(conn.getLocator().getRegionLocation(tableName, row, replicaId,
RegionLocateType.CURRENT, locateTimeoutNs), (loc, error) -> {
if (error != null) {
onError(error,
() -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed", err -> {
});
return;
}
call(loc);
});
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
/**
* For replicating edits to secondary replicas.
*/
@InterfaceAudience.Private
public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller<Void> {
private final RegionInfo replica;
private final Entry[] entries;
public AsyncRegionReplicationRetryingCaller(HashedWheelTimer retryTimer,
AsyncClusterConnectionImpl conn, int maxAttempts, long rpcTimeoutNs, long operationTimeoutNs,
RegionInfo replica, List<Entry> entries) {
super(retryTimer, conn, ConnectionUtils.getPriority(replica.getTable()),
conn.connConf.getPauseNs(), conn.connConf.getPauseForCQTBENs(), maxAttempts,
operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
this.replica = replica;
this.entries = entries.toArray(new Entry[0]);
}
private void call(HRegionLocation loc) {
AdminService.Interface stub;
try {
stub = conn.getAdminStub(loc.getServerName());
} catch (IOException e) {
onError(e,
() -> "Get async admin stub to " + loc.getServerName() + " for " + replica + " failed",
err -> conn.getLocator().updateCachedLocationOnError(loc, err));
return;
}
Pair<ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtobufUtil
.buildReplicateWALEntryRequest(entries, replica.getEncodedNameAsBytes(), null, null, null);
resetCallTimeout();
controller.setCellScanner(pair.getSecond());
stub.replay(controller, pair.getFirst(), r -> {
if (controller.failed()) {
onError(controller.getFailed(),
() -> "Call to " + loc.getServerName() + " for " + replica + " failed",
err -> conn.getLocator().updateCachedLocationOnError(loc, err));
} else {
future.complete(null);
}
});
}
@Override
protected void doCall() {
long locateTimeoutNs;
if (operationTimeoutNs > 0) {
locateTimeoutNs = remainingTimeNs();
if (locateTimeoutNs <= 0) {
completeExceptionally();
return;
}
} else {
locateTimeoutNs = -1L;
}
addListener(conn.getLocator().getRegionLocation(replica.getTable(), replica.getStartKey(),
replica.getReplicaId(), RegionLocateType.CURRENT, locateTimeoutNs), (loc, error) -> {
if (error != null) {
onError(error, () -> "Locate " + replica + " failed", err -> {
});
return;
}
call(loc);
});
}
}

View File

@ -679,6 +679,17 @@ public abstract class RpcServer implements RpcServerInterface,
return Optional.ofNullable(CurCall.get());
}
/**
* Just return the current rpc call if it is a {@link ServerCall} and also has {@link CellScanner}
* attached.
* <p/>
* Mainly used for reference counting as {@link CellScanner} may reference non heap memory.
*/
public static Optional<ServerCall<?>> getCurrentServerCallWithCellScanner() {
return getCurrentCall().filter(c -> c instanceof ServerCall)
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall<?>) c);
}
public static boolean isInRpcCallContext() {
return CurCall.get() != null;
}

View File

@ -29,7 +29,6 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
@ -40,10 +39,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -197,15 +194,9 @@ public class MetaFixer {
MetaTableAccessor.addRegionsToMeta(masterServices.getConnection(), newRegions,
td.getRegionReplication());
// Setup replication for region replicas if needed
if (td.getRegionReplication() > 1) {
ServerRegionReplicaUtil.setupRegionReplicaReplication(masterServices);
}
return Either.<List<RegionInfo>, IOException> ofLeft(newRegions);
} catch (IOException e) {
return Either.<List<RegionInfo>, IOException> ofRight(e);
} catch (ReplicationException e) {
return Either.<List<RegionInfo>, IOException> ofRight(new HBaseIOException(e));
}
})
.collect(Collectors.toList());

View File

@ -25,7 +25,6 @@ import java.util.List;
import java.util.function.Supplier;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableExistsException;
@ -38,12 +37,10 @@ import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -365,14 +362,6 @@ public class CreateTableProcedure
// Add regions to META
addRegionsToMeta(env, tableDescriptor, newRegions);
// Setup replication for region replicas if needed
if (tableDescriptor.getRegionReplication() > 1) {
try {
ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterServices());
} catch (ReplicationException e) {
throw new HBaseIOException(e);
}
}
return newRegions;
}

View File

@ -39,10 +39,8 @@ import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -419,13 +417,6 @@ public class ModifyTableProcedure
.collect(Collectors.toList());
addChildProcedure(env.getAssignmentManager().createAssignProcedures(newReplicas));
}
if (oldReplicaCount <= 1) {
try {
ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterServices());
} catch (ReplicationException e) {
throw new HBaseIOException(e);
}
}
}
private void closeExcessReplicasIfNeeded(MasterProcedureEnv env) {

View File

@ -139,6 +139,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@ -196,6 +197,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
@ -713,6 +715,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final StoreHotnessProtector storeHotnessProtector;
private Optional<RegionReplicationSink> regionReplicationSink = Optional.empty();
/**
* HRegion constructor. This constructor should only be used for testing and
* extensions. Instances of HRegion should be instantiated with the
@ -1085,11 +1089,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
status.setStatus("Running coprocessor post-open hooks");
coprocessorHost.postOpen();
}
initializeRegionReplicationSink(reporter, status);
status.markComplete("Region opened successfully");
return nextSeqId;
}
private void initializeRegionReplicationSink(CancelableProgressable reporter,
MonitoredTask status) {
RegionServerServices rss = getRegionServerServices();
TableDescriptor td = getTableDescriptor();
int regionReplication = td.getRegionReplication();
RegionInfo regionInfo = getRegionInfo();
if (regionReplication <= 1 || !RegionReplicaUtil.isDefaultReplica(regionInfo) ||
!ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(conf, regionInfo.getTable()) ||
rss == null) {
regionReplicationSink = Optional.empty();
return;
}
status.setStatus("Initializaing region replication sink");
regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo,
regionReplication, td.hasRegionMemStoreReplication(), rss.getAsyncClusterConnection()));
}
/**
* Open all Stores.
* @param reporter
@ -1212,7 +1233,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
getRegionServerServices().getServerName(), storeFiles);
WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionOpenDesc,
mvcc);
mvcc, regionReplicationSink.orElse(null));
}
private void writeRegionCloseMarker(WAL wal) throws IOException {
@ -1221,7 +1242,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(),
getRegionServerServices().getServerName(), storeFiles);
WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc,
mvcc);
mvcc, null);
// Store SeqId in WAL FileSystem when a region closes
// checking region folder exists is due to many tests which delete the table folder while a
@ -1866,7 +1887,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
writeRegionCloseMarker(wal);
}
if (regionReplicationSink.isPresent()) {
// stop replicating to secondary replicas
RegionReplicationSink sink = regionReplicationSink.get();
sink.stop();
try {
regionReplicationSink.get().waitUntilStopped();
} catch (InterruptedException e) {
throw throwOnInterrupt(e);
}
}
this.closed.set(true);
if (!canFlush) {
decrMemStoreSize(this.memStoreSizing.getMemStoreSize());
@ -2827,7 +2857,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
getRegionInfo(), flushOpSeqId, committedFiles);
// No sync. Sync is below where no updates lock and we do FlushAction.COMMIT_FLUSH
WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
mvcc);
mvcc, null);
}
// Prepare flush (take a snapshot)
@ -2888,8 +2918,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
mvcc);
WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false, mvcc,
null);
} catch (Throwable t) {
LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL: {} in "
+ " region {}", StringUtils.stringifyException(t), this);
@ -2933,8 +2963,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
getRegionInfo(), -1, new TreeMap<>(Bytes.BYTES_COMPARATOR));
try {
WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true,
mvcc);
WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, mvcc,
null);
return true;
} catch (IOException e) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
@ -3013,8 +3043,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// write flush marker to WAL. If fail, we should throw DroppedSnapshotException
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true,
mvcc);
WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, mvcc,
regionReplicationSink.orElse(null));
}
} catch (Throwable t) {
// An exception here means that the snapshot was not persisted.
@ -3027,7 +3057,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
WALUtil.writeFlushMarker(wal, this.replicationScope, getRegionInfo(), desc, false, mvcc);
WALUtil.writeFlushMarker(wal, this.replicationScope, getRegionInfo(), desc, false, mvcc,
null);
} catch (Throwable ex) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "failed writing ABORT_FLUSH marker to WAL", ex);
@ -7071,10 +7102,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
WALProtos.BulkLoadDescriptor loadDescriptor =
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
loadDescriptor, mvcc);
loadDescriptor, mvcc, regionReplicationSink.orElse(null));
} catch (IOException ioe) {
if (this.rsServices != null) {
// Have to abort region server because some hfiles has been loaded but we can't write
@ -7761,21 +7792,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (this.coprocessorHost != null && !walEdit.isMetaEdit()) {
this.coprocessorHost.preWALAppend(walKey, walEdit);
}
WriteEntry writeEntry = null;
ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
try {
long txid = this.wal.appendData(this.getRegionInfo(), walKey, walEdit);
WriteEntry writeEntry = walKey.getWriteEntry();
regionReplicationSink.ifPresent(sink -> writeEntry.attachCompletionAction(() -> {
sink.add(walKey, walEdit, rpcCall);
}));
// Call sync on our edit.
if (txid != 0) {
sync(txid, durability);
}
writeEntry = walKey.getWriteEntry();
return writeEntry;
} catch (IOException ioe) {
if (walKey != null && walKey.getWriteEntry() != null) {
if (walKey.getWriteEntry() != null) {
mvcc.complete(walKey.getWriteEntry());
}
throw ioe;
}
return writeEntry;
}
public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HRegion.class, false);
@ -8430,6 +8465,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
public Optional<RegionReplicationSink> getRegionReplicationSink() {
return regionReplicationSink;
}
public void addReadRequestsCount(long readRequestsCount) {
this.readRequestsCount.add(readRequestsCount);
}

View File

@ -1222,7 +1222,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
// Does this method belong in Region altogether given it is making so many references up there?
// Could be Region#writeCompactionMarker(compactionDescriptor);
WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(),
this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC());
this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC(),
region.getRegionReplicationSink().orElse(null));
}
@RestrictedApi(explanation = "Should only be called in TestHStore", link = "",

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.util.LinkedList;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@ -202,6 +203,7 @@ public class MultiVersionConcurrencyControl {
if (queueFirst.isCompleted()) {
nextReadValue = queueFirst.getWriteNumber();
writeQueue.removeFirst();
queueFirst.runCompletionAction();
} else {
break;
}
@ -271,22 +273,36 @@ public class MultiVersionConcurrencyControl {
* Every created WriteEntry must be completed by calling mvcc#complete or #completeAndWait.
*/
@InterfaceAudience.Private
public static class WriteEntry {
public static final class WriteEntry {
private final long writeNumber;
private boolean completed = false;
/**
* Will be called after completion, i.e, when being removed from the
* {@link MultiVersionConcurrencyControl#writeQueue}.
*/
private Optional<Runnable> completionAction = Optional.empty();
WriteEntry(long writeNumber) {
private WriteEntry(long writeNumber) {
this.writeNumber = writeNumber;
}
void markCompleted() {
private void markCompleted() {
this.completed = true;
}
boolean isCompleted() {
private boolean isCompleted() {
return this.completed;
}
public void attachCompletionAction(Runnable action) {
assert !completionAction.isPresent();
completionAction = Optional.of(action);
}
private void runCompletionAction() {
completionAction.ifPresent(Runnable::run);
}
public long getWriteNumber() {
return this.writeNumber;
}

View File

@ -0,0 +1,228 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
* The class for replicating WAL edits to secondary replicas, one instance per region.
*/
@InterfaceAudience.Private
public class RegionReplicationSink {
private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationSink.class);
public static final String MAX_PENDING_SIZE = "hbase.region.read-replica.sink.max-pending-size";
public static final long MAX_PENDING_SIZE_DEFAULT = 10L * 1024 * 1024;
public static final String RETRIES_NUMBER = "hbase.region.read-replica.sink.retries.number";
public static final int RETRIES_NUMBER_DEFAULT = 3;
public static final String RPC_TIMEOUT_MS = "hbase.region.read-replica.sink.rpc.timeout.ms";
public static final long RPC_TIMEOUT_MS_DEFAULT = 200;
public static final String OPERATION_TIMEOUT_MS =
"hbase.region.read-replica.sink.operation.timeout.ms";
public static final long OPERATION_TIMEOUT_MS_DEFAULT = 1000;
private static final class SinkEntry {
final WALKeyImpl key;
final WALEdit edit;
final ServerCall<?> rpcCall;
SinkEntry(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
this.key = key;
this.edit = edit;
this.rpcCall = rpcCall;
if (rpcCall != null) {
// increase the reference count to avoid the rpc framework free the memory before we
// actually sending them out.
rpcCall.retainByWAL();
}
}
/**
* Should be called regardless of the result of the replicating operation. Unless you still want
* to reuse this entry, otherwise you must call this method to release the possible off heap
* memories.
*/
void replicated() {
if (rpcCall != null) {
rpcCall.releaseByWAL();
}
}
}
private final RegionInfo primary;
private final int regionReplication;
private final boolean hasRegionMemStoreReplication;
private final Queue<SinkEntry> entries = new ArrayDeque<>();
private final AsyncClusterConnection conn;
private final int retries;
private final long rpcTimeoutNs;
private final long operationTimeoutNs;
private CompletableFuture<Void> future;
private boolean stopping;
private boolean stopped;
RegionReplicationSink(Configuration conf, RegionInfo primary, int regionReplication,
boolean hasRegionMemStoreReplication, AsyncClusterConnection conn) {
Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary",
primary);
Preconditions.checkArgument(regionReplication > 1,
"region replication should be greater than 1 but got %s", regionReplication);
this.primary = primary;
this.regionReplication = regionReplication;
this.hasRegionMemStoreReplication = hasRegionMemStoreReplication;
this.conn = conn;
this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT);
this.rpcTimeoutNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT));
this.operationTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT));
}
private void send() {
List<SinkEntry> toSend = new ArrayList<>();
for (SinkEntry entry;;) {
entry = entries.poll();
if (entry == null) {
break;
}
toSend.add(entry);
}
List<WAL.Entry> walEntries =
toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList());
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
futures.add(conn.replicate(replica, walEntries, retries, rpcTimeoutNs, operationTimeoutNs));
}
future = CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]));
FutureUtils.addListener(future, (r, e) -> {
if (e != null) {
// TODO: drop pending edits and issue a flush
LOG.warn("Failed to replicate to secondary replicas for {}", primary, e);
}
toSend.forEach(SinkEntry::replicated);
synchronized (entries) {
future = null;
if (stopping) {
stopped = true;
entries.notifyAll();
return;
}
if (!entries.isEmpty()) {
send();
}
}
});
}
/**
* Add this edit to replication queue.
* <p/>
* The {@code rpcCall} is for retaining the cells if the edit is built within an rpc call and the
* rpc call has cell scanner, which is off heap.
*/
public void add(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
if (!hasRegionMemStoreReplication && !edit.isMetaEdit()) {
// only replicate meta edit if region memstore replication is not enabled
return;
}
synchronized (entries) {
if (stopping) {
return;
}
// TODO: limit the total cached entries here, and we should have a global limitation, not for
// only this region.
entries.add(new SinkEntry(key, edit, rpcCall));
if (future == null) {
send();
}
}
}
/**
* Stop the replication sink.
* <p/>
* Usually this should only be called when you want to close a region.
*/
void stop() {
synchronized (entries) {
stopping = true;
if (future == null) {
stopped = true;
entries.notifyAll();
}
}
}
/**
* Make sure that we have finished all the replicating requests.
* <p/>
* After returning, we can make sure there will be no new replicating requests to secondary
* replicas.
* <p/>
* This is used to keep the replicating order the same with the WAL edit order when writing.
*/
void waitUntilStopped() throws InterruptedException {
synchronized (entries) {
while (!stopped) {
entries.wait();
}
}
}
}

View File

@ -22,9 +22,7 @@ import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
@ -34,10 +32,10 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
/**
@ -134,14 +132,6 @@ public class AssignRegionHandler extends EventHandler {
// pass null for the last parameter, which used to be a CancelableProgressable, as now the
// opening can not be interrupted by a close request any more.
Configuration conf = rs.getConfiguration();
TableName tn = htd.getTableName();
if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(conf, tn)) {
if (RegionReplicaUtil.isDefaultReplica(this.regionInfo.getReplicaId())) {
// Add the hbase:meta replication source on replica zero/default.
rs.getReplicationSourceService().getReplicationManager().
addCatalogReplicationSource(this.regionInfo);
}
}
region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), conf, rs, null);
} catch (IOException e) {
cleanUpAndReportFailure(e);

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HRegion;
@ -31,10 +30,10 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
/**
@ -121,15 +120,6 @@ public class UnassignRegionHandler extends EventHandler {
}
rs.removeRegion(region, destination);
if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(rs.getConfiguration(),
region.getTableDescriptor().getTableName())) {
if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo().getReplicaId())) {
// If hbase:meta read replicas enabled, remove replication source for hbase:meta Regions.
// See assign region handler where we add the replication source on open.
rs.getReplicationSourceService().getReplicationManager().
removeCatalogReplicationSource(region.getRegionInfo());
}
}
if (!rs.reportRegionStateTransition(
new RegionStateTransitionContext(TransitionCode.CLOSED, HConstants.NO_SEQNUM, closeProcId,
-1, region.getRegionInfo()))) {

View File

@ -1200,8 +1200,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
txidHolder.setValue(ringBuffer.next());
});
long txid = txidHolder.longValue();
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
try {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
entry.stampRegionSequenceId(we);

View File

@ -28,7 +28,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.RegionReplicationSink;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL;
@ -71,9 +74,9 @@ public class WALUtil {
*/
public static WALKeyImpl writeCompactionMarker(WAL wal,
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
MultiVersionConcurrencyControl mvcc) throws IOException {
MultiVersionConcurrencyControl mvcc, RegionReplicationSink sink) throws IOException {
WALKeyImpl walKey =
writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null);
writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null, sink);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
}
@ -86,10 +89,10 @@ public class WALUtil {
* This write is for internal use only. Not for external client consumption.
*/
public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
throws IOException {
RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc,
RegionReplicationSink sink) throws IOException {
WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri,
WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync);
WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync, sink);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
}
@ -102,9 +105,9 @@ public class WALUtil {
*/
public static WALKeyImpl writeRegionEventMarker(WAL wal,
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, RegionEventDescriptor r,
MultiVersionConcurrencyControl mvcc) throws IOException {
WALKeyImpl walKey =
writeMarker(wal, replicationScope, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc, null);
MultiVersionConcurrencyControl mvcc, RegionReplicationSink sink) throws IOException {
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
WALEdit.createRegionEventWALEdit(hri, r), mvcc, null, sink);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
}
@ -122,11 +125,11 @@ public class WALUtil {
* @throws IOException We will throw an IOException if we can not append to the HLog.
*/
public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
throws IOException {
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc,
final RegionReplicationSink sink) throws IOException {
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
WALEdit.createBulkLoadEvent(hri, desc), mvcc, null);
WALEdit.createBulkLoadEvent(hri, desc), mvcc, null, sink);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
}
@ -135,11 +138,11 @@ public class WALUtil {
private static WALKeyImpl writeMarker(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes)
throws IOException {
final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes,
final RegionReplicationSink sink) throws IOException {
// If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, mvcc, extendedAttributes,
true);
true, sink);
}
/**
@ -152,19 +155,24 @@ public class WALUtil {
*/
private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
final MultiVersionConcurrencyControl mvcc,
final Map<String, byte[]> extendedAttributes, final boolean sync) throws IOException {
final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes,
final boolean sync, final RegionReplicationSink sink) throws IOException {
// TODO: Pass in current time to use?
WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes);
long trx = MultiVersionConcurrencyControl.NONE;
try {
trx = wal.appendMarker(hri, walKey, edit);
WriteEntry writeEntry = walKey.getWriteEntry();
if (sink != null) {
writeEntry.attachCompletionAction(() -> sink.add(walKey, edit,
RpcServer.getCurrentServerCallWithCellScanner().orElse(null)));
}
if (sync) {
wal.sync(trx);
}
// Call complete only here because these are markers only. They are not for clients to read.
mvcc.complete(walKey.getWriteEntry());
mvcc.complete(writeEntry);
} catch (IOException ioe) {
if (walKey.getWriteEntry() != null) {
mvcc.complete(walKey.getWriteEntry());

View File

@ -1,47 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.util.Collections;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
/**
* ReplicationSource that reads catalog WAL files -- e.g. hbase:meta WAL files -- and lets through
* all WALEdits from these WALs. This ReplicationSource is NOT created via
* {@link ReplicationSourceFactory}.
*/
@InterfaceAudience.Private
class CatalogReplicationSource extends ReplicationSource {
CatalogReplicationSource() {
// Filters in hbase:meta WAL files and allows all edits, including 'meta' edits (these are
// filtered out in the 'super' class default implementation).
super(p -> AbstractFSWALProvider.isMetaFile(p), Collections.emptyList());
}
@Override
public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
// Noop. This CatalogReplicationSource implementation does not persist state to backing storage
// nor does it keep its WALs in a general map up in ReplicationSourceManager --
// CatalogReplicationSource is used by the Catalog Read Replica feature which resets everytime
// the WAL source process crashes. Skip calling through to the default implementation.
// See "4.1 Skip maintaining zookeeper replication queue (offsets/WALs)" in the
// design doc attached to HBASE-18070 'Enable memstore replication for meta replica for detail'
// for background on why no need to keep WAL state.
}
}

View File

@ -1,50 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
/**
* The 'peer' used internally by Catalog Region Replicas Replication Source.
* The Replication system has 'peer' baked into its core so though we do not need 'peering', we
* need a 'peer' and its configuration else the replication system breaks at a few locales.
* Set "hbase.region.replica.catalog.replication" if you want to change the configured endpoint.
*/
@InterfaceAudience.Private
class CatalogReplicationSourcePeer extends ReplicationPeerImpl {
/**
* @param clusterKey Usually the UUID from zk passed in by caller as a String.
*/
CatalogReplicationSourcePeer(Configuration configuration, String clusterKey) {
super(configuration, ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER + "_catalog",
ReplicationPeerConfig.newBuilder().
setClusterKey(clusterKey).
setReplicationEndpointImpl(
configuration.get("hbase.region.replica.catalog.replication",
RegionReplicaReplicationEndpoint.class.getName())).
setBandwidth(0). // '0' means no bandwidth.
setSerial(false).
build(),
true, SyncReplicationState.NONE, SyncReplicationState.NONE);
}
}

View File

@ -1,407 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.AtomicUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
/**
* A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint which receives the WAL
* edits from the WAL, and sends the edits to replicas of regions.
*/
@InterfaceAudience.Private
public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaReplicationEndpoint.class);
// Can be configured differently than hbase.client.retries.number
private static String CLIENT_RETRIES_NUMBER =
"hbase.region.replica.replication.client.retries.number";
private Configuration conf;
private AsyncClusterConnection connection;
private TableDescriptors tableDescriptors;
private int numRetries;
private long operationTimeoutNs;
private LoadingCache<TableName, Optional<TableDescriptor>> tableDescriptorCache;
private Cache<TableName, TableName> disabledTableCache;
private final RetryCounterFactory retryCounterFactory =
new RetryCounterFactory(Integer.MAX_VALUE, 1000, 60000);
@Override
public void init(Context context) throws IOException {
super.init(context);
this.conf = context.getConfiguration();
this.tableDescriptors = context.getTableDescriptors();
int memstoreReplicationEnabledCacheExpiryMs = conf
.getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
// A cache for the table "memstore replication enabled" flag.
// It has a default expiry of 5 sec. This means that if the table is altered
// with a different flag value, we might miss to replicate for that amount of
// time. But this cache avoid the slow lookup and parsing of the TableDescriptor.
tableDescriptorCache = CacheBuilder.newBuilder()
.expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
.initialCapacity(10).maximumSize(1000)
.build(new CacheLoader<TableName, Optional<TableDescriptor>>() {
@Override
public Optional<TableDescriptor> load(TableName tableName) throws Exception {
// check if the table requires memstore replication
// some unit-test drop the table, so we should do a bypass check and always replicate.
return Optional.ofNullable(tableDescriptors.get(tableName));
}
});
int nonExistentTableCacheExpiryMs =
conf.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
// A cache for non existing tables that have a default expiry of 5 sec. This means that if the
// table is created again with the same name, we might miss to replicate for that amount of
// time. But this cache prevents overloading meta requests for every edit from a deleted file.
disabledTableCache = CacheBuilder.newBuilder()
.expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS).initialCapacity(10)
.maximumSize(1000).build();
// HRS multiplies client retries by 10 globally for meta operations, but we do not want this.
// We are resetting it here because we want default number of retries (35) rather than 10 times
// that which makes very long retries for disabled tables etc.
int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
if (defaultNumRetries > 10) {
int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER,
HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already
}
this.numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
// use the regular RPC timeout for replica replication RPC's
this.operationTimeoutNs =
TimeUnit.MILLISECONDS.toNanos(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
this.connection = context.getServer().getAsyncClusterConnection();
}
/**
* returns true if the specified entry must be replicated. We should always replicate meta
* operations (e.g. flush) and use the user HTD flag to decide whether or not replicate the
* memstore.
*/
private boolean requiresReplication(Optional<TableDescriptor> tableDesc,
Entry entry) {
// empty edit does not need to be replicated
if (entry.getEdit().isEmpty() || !tableDesc.isPresent()) {
return false;
}
// meta edits (e.g. flush) must be always replicated
return entry.getEdit().isMetaEdit() || tableDesc.get().hasRegionMemStoreReplication();
}
private void getRegionLocations(CompletableFuture<RegionLocations> future,
TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, boolean reload) {
FutureUtils.addListener(connection.getRegionLocations(tableDesc.getTableName(), row, reload),
(locs, e) -> {
if (e != null) {
future.completeExceptionally(e);
return;
}
// if we are not loading from cache, just return
if (reload) {
future.complete(locs);
return;
}
// check if the number of region replicas is correct, and also the primary region name
// matches.
if (locs.size() == tableDesc.getRegionReplication() &&
locs.getDefaultRegionLocation() != null &&
Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(),
encodedRegionName)) {
future.complete(locs);
} else {
// reload again as the information in cache maybe stale
getRegionLocations(future, tableDesc, encodedRegionName, row, true);
}
});
}
private void replicate(CompletableFuture<Long> future, RegionLocations locs,
TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, List<Entry> entries) {
if (locs.size() == 1) {
LOG.info("Only one location for {}.{}, refresh the location cache only for meta now",
tableDesc.getTableName(), Bytes.toString(encodedRegionName));
// This could happen to meta table. In case of meta table comes with no replica and
// later it is changed to multiple replicas. The cached location for meta may only has
// the primary region. In this case, it needs to clean up and refresh the cached meta
// locations.
if (tableDesc.isMetaTable()) {
connection.getRegionLocator(tableDesc.getTableName()).clearRegionLocationCache();
}
future.complete(Long.valueOf(entries.size()));
return;
}
RegionInfo defaultReplica = locs.getDefaultRegionLocation().getRegion();
if (!Bytes.equals(defaultReplica.getEncodedNameAsBytes(), encodedRegionName)) {
// the region name is not equal, this usually means the region has been split or merged, so
// give up replicating as the new region(s) should already have all the data of the parent
// region(s).
if (LOG.isTraceEnabled()) {
LOG.trace(
"Skipping {} entries in table {} because located region {} is different than" +
" the original region {} from WALEdit",
tableDesc.getTableName(), defaultReplica.getEncodedName(),
Bytes.toStringBinary(encodedRegionName));
}
future.complete(Long.valueOf(entries.size()));
return;
}
AtomicReference<Throwable> error = new AtomicReference<>();
AtomicInteger remainingTasks = new AtomicInteger(locs.size() - 1);
AtomicLong skippedEdits = new AtomicLong(0);
for (int i = 1, n = locs.size(); i < n; i++) {
// Do not use the elements other than the default replica as they may be null. We will fail
// earlier if the location for default replica is null.
final RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(defaultReplica, i);
FutureUtils
.addListener(connection.replay(tableDesc.getTableName(), replica.getEncodedNameAsBytes(),
row, entries, replica.getReplicaId(), numRetries, operationTimeoutNs), (r, e) -> {
if (e != null) {
LOG.warn("Failed to replicate to {}", replica, e);
error.compareAndSet(null, e);
} else {
AtomicUtils.updateMax(skippedEdits, r.longValue());
}
if (remainingTasks.decrementAndGet() == 0) {
if (error.get() != null) {
future.completeExceptionally(error.get());
} else {
future.complete(skippedEdits.get());
}
}
});
}
}
private void logSkipped(TableName tableName, List<Entry> entries, String reason) {
if (LOG.isTraceEnabled()) {
LOG.trace("Skipping {} entries because table {} is {}", entries.size(), tableName, reason);
for (Entry entry : entries) {
LOG.trace("Skipping : {}", entry);
}
}
}
private CompletableFuture<Long> replicate(TableDescriptor tableDesc, byte[] encodedRegionName,
List<Entry> entries) {
if (disabledTableCache.getIfPresent(tableDesc.getTableName()) != null) {
logSkipped(tableDesc.getTableName(), entries, "cached as a disabled table");
return CompletableFuture.completedFuture(Long.valueOf(entries.size()));
}
byte[] row = CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0));
CompletableFuture<RegionLocations> locateFuture = new CompletableFuture<>();
getRegionLocations(locateFuture, tableDesc, encodedRegionName, row, false);
CompletableFuture<Long> future = new CompletableFuture<>();
FutureUtils.addListener(locateFuture, (locs, error) -> {
if (error != null) {
future.completeExceptionally(error);
} else if (locs.getDefaultRegionLocation() == null) {
future.completeExceptionally(
new HBaseIOException("No location found for default replica of table=" +
tableDesc.getTableName() + " row='" + Bytes.toStringBinary(row) + "'"));
} else {
replicate(future, locs, tableDesc, encodedRegionName, row, entries);
}
});
return future;
}
@Override
public boolean replicate(ReplicateContext replicateContext) {
Map<byte[], Pair<TableDescriptor, List<Entry>>> encodedRegionName2Entries =
new TreeMap<>(Bytes.BYTES_COMPARATOR);
long skippedEdits = 0;
RetryCounter retryCounter = retryCounterFactory.create();
outer: while (isRunning()) {
encodedRegionName2Entries.clear();
skippedEdits = 0;
for (Entry entry : replicateContext.getEntries()) {
Optional<TableDescriptor> tableDesc;
try {
tableDesc = tableDescriptorCache.get(entry.getKey().getTableName());
} catch (ExecutionException e) {
LOG.warn("Failed to load table descriptor for {}, attempts={}",
entry.getKey().getTableName(), retryCounter.getAttemptTimes(), e.getCause());
if (!retryCounter.shouldRetry()) {
return false;
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException e1) {
// restore the interrupted state
Thread.currentThread().interrupt();
return false;
}
continue outer;
}
if (!requiresReplication(tableDesc, entry)) {
skippedEdits++;
continue;
}
byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
encodedRegionName2Entries
.computeIfAbsent(encodedRegionName, k -> Pair.newPair(tableDesc.get(), new ArrayList<>()))
.getSecond().add(entry);
}
break;
}
// send the request to regions
retryCounter = retryCounterFactory.create();
while (isRunning()) {
List<Pair<CompletableFuture<Long>, byte[]>> futureAndEncodedRegionNameList =
new ArrayList<Pair<CompletableFuture<Long>, byte[]>>();
for (Map.Entry<byte[], Pair<TableDescriptor, List<Entry>>> entry : encodedRegionName2Entries
.entrySet()) {
CompletableFuture<Long> future =
replicate(entry.getValue().getFirst(), entry.getKey(), entry.getValue().getSecond());
futureAndEncodedRegionNameList.add(Pair.newPair(future, entry.getKey()));
}
for (Pair<CompletableFuture<Long>, byte[]> pair : futureAndEncodedRegionNameList) {
byte[] encodedRegionName = pair.getSecond();
try {
skippedEdits += pair.getFirst().get().longValue();
encodedRegionName2Entries.remove(encodedRegionName);
} catch (InterruptedException e) {
// restore the interrupted state
Thread.currentThread().interrupt();
return false;
} catch (ExecutionException e) {
Pair<TableDescriptor, List<Entry>> tableAndEntries =
encodedRegionName2Entries.get(encodedRegionName);
TableName tableName = tableAndEntries.getFirst().getTableName();
List<Entry> entries = tableAndEntries.getSecond();
Throwable cause = e.getCause();
// The table can be disabled or dropped at this time. For disabled tables, we have no
// cheap mechanism to detect this case because meta does not contain this information.
// ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay
// RPC. So instead we start the replay RPC with retries and check whether the table is
// dropped or disabled which might cause SocketTimeoutException, or
// RetriesExhaustedException or similar if we get IOE.
if (cause instanceof TableNotFoundException) {
// add to cache that the table does not exist
tableDescriptorCache.put(tableName, Optional.empty());
logSkipped(tableName, entries, "dropped");
skippedEdits += entries.size();
encodedRegionName2Entries.remove(encodedRegionName);
continue;
}
boolean disabled = false;
try {
disabled = connection.getAdmin().isTableDisabled(tableName).get();
} catch (InterruptedException e1) {
// restore the interrupted state
Thread.currentThread().interrupt();
return false;
} catch (ExecutionException e1) {
LOG.warn("Failed to test whether {} is disabled, assume it is not disabled", tableName,
e1.getCause());
}
if (disabled) {
disabledTableCache.put(tableName, tableName);
logSkipped(tableName, entries, "disabled");
skippedEdits += entries.size();
encodedRegionName2Entries.remove(encodedRegionName);
continue;
}
LOG.warn("Failed to replicate {} entries for region {} of table {}", entries.size(),
Bytes.toStringBinary(encodedRegionName), tableName);
}
}
// we have done
if (encodedRegionName2Entries.isEmpty()) {
ctx.getMetrics().incrLogEditsFiltered(skippedEdits);
return true;
} else {
LOG.warn("Failed to replicate all entries, retry={}", retryCounter.getAttemptTimes());
if (!retryCounter.shouldRetry()) {
return false;
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException e) {
// restore the interrupted state
Thread.currentThread().interrupt();
return false;
}
}
}
return false;
}
@Override
public boolean canReplicateToSameCluster() {
return true;
}
@Override
protected WALEntryFilter getScopeWALEntryFilter() {
// we do not care about scope. We replicate everything.
return null;
}
}

View File

@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory;
/**
* Constructs a {@link ReplicationSourceInterface}
* Note, not used to create specialized ReplicationSources
* @see CatalogReplicationSource
*/
@InterfaceAudience.Private
public final class ReplicationSourceFactory {

View File

@ -39,7 +39,6 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -49,8 +48,6 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
@ -64,9 +61,7 @@ import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@ -182,16 +177,6 @@ public class ReplicationSourceManager {
private final long totalBufferLimit;
private final MetricsReplicationGlobalSourceSource globalMetrics;
/**
* A special ReplicationSource for hbase:meta Region Read Replicas.
* Usually this reference remains empty. If an hbase:meta Region is opened on this server, we
* will create an instance of a hbase:meta CatalogReplicationSource and it will live the life of
* the Server thereafter; i.e. we will not shut it down even if the hbase:meta moves away from
* this server (in case it later gets moved back). We synchronize on this instance testing for
* presence and if absent, while creating so only created and started once.
*/
AtomicReference<ReplicationSourceInterface> catalogReplicationSource = new AtomicReference<>();
/**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param queueStorage the interface for manipulating replication queues
@ -1066,78 +1051,6 @@ public class ReplicationSourceManager {
return this.globalMetrics;
}
/**
* Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region.
* Create it once only. If exists already, use the existing one.
* @see #removeCatalogReplicationSource(RegionInfo)
* @see #addSource(String) This is specialization on the addSource method.
*/
public ReplicationSourceInterface addCatalogReplicationSource(RegionInfo regionInfo)
throws IOException {
// Poor-man's putIfAbsent
synchronized (this.catalogReplicationSource) {
ReplicationSourceInterface rs = this.catalogReplicationSource.get();
return rs != null ? rs :
this.catalogReplicationSource.getAndSet(createCatalogReplicationSource(regionInfo));
}
}
/**
* Remove the hbase:meta Catalog replication source.
* Called when we close hbase:meta.
* @see #addCatalogReplicationSource(RegionInfo regionInfo)
*/
public void removeCatalogReplicationSource(RegionInfo regionInfo) {
// Nothing to do. Leave any CatalogReplicationSource in place in case an hbase:meta Region
// comes back to this server.
}
/**
* Create, initialize, and start the Catalog ReplicationSource.
* Presumes called one-time only (caller must ensure one-time only call).
* This ReplicationSource is NOT created via {@link ReplicationSourceFactory}.
* @see #addSource(String) This is a specialization of the addSource call.
* @see #catalogReplicationSource for a note on this ReplicationSource's lifecycle (and more on
* why the special handling).
*/
private ReplicationSourceInterface createCatalogReplicationSource(RegionInfo regionInfo)
throws IOException {
// Instantiate meta walProvider. Instantiated here or over in the #warmupRegion call made by the
// Master on a 'move' operation. Need to do extra work if we did NOT instantiate the provider.
WALProvider walProvider = this.walFactory.getMetaWALProvider();
boolean instantiate = walProvider == null;
if (instantiate) {
walProvider = this.walFactory.getMetaProvider();
}
// Here we do a specialization on what {@link ReplicationSourceFactory} does. There is no need
// for persisting offset into WALs up in zookeeper (via ReplicationQueueInfo) as the catalog
// read replicas feature that makes use of the source does a reset on a crash of the WAL
// source process. See "4.1 Skip maintaining zookeeper replication queue (offsets/WALs)" in the
// design doc attached to HBASE-18070 'Enable memstore replication for meta replica' for detail.
CatalogReplicationSourcePeer peer = new CatalogReplicationSourcePeer(this.conf,
this.clusterId.toString());
final ReplicationSourceInterface crs = new CatalogReplicationSource();
crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(),
clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
// Add listener on the provider so we can pick up the WAL to replicate on roll.
WALActionsListener listener = new WALActionsListener() {
@Override public void postLogRoll(Path oldPath, Path newPath) throws IOException {
crs.enqueueLog(newPath);
}
};
walProvider.addWALActionsListener(listener);
if (!instantiate) {
// If we did not instantiate provider, need to add our listener on already-created WAL
// instance too (listeners are passed by provider to WAL instance on creation but if provider
// created already, our listener add above is missed). And add the current WAL file to the
// Replication Source so it can start replicating it.
WAL wal = walProvider.getWAL(regionInfo);
wal.registerWALActionsListener(listener);
crs.enqueueLog(((AbstractFSWAL)wal).getCurrentFileName());
}
return crs.startup();
}
ReplicationQueueStorage getQueueStorage() {
return queueStorage;
}

View File

@ -22,23 +22,15 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Similar to {@link RegionReplicaUtil} but for the server side
@ -46,8 +38,6 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class ServerRegionReplicaUtil extends RegionReplicaUtil {
private static final Logger LOG = LoggerFactory.getLogger(ServerRegionReplicaUtil.class);
/**
* Whether asynchronous WAL replication to the secondary region replicas is enabled or not.
* If this is enabled, a replication peer named "region_replica_replication" will be created
@ -59,7 +49,6 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
public static final String REGION_REPLICA_REPLICATION_CONF_KEY
= "hbase.region.replica.replication.enabled";
private static final boolean DEFAULT_REGION_REPLICA_REPLICATION = false;
public static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication";
/**
* Same as for {@link #REGION_REPLICA_REPLICATION_CONF_KEY} but for catalog replication.
@ -161,27 +150,6 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
}
}
/**
* Create replication peer for replicating user-space Region Read Replicas.
* This methods should only be called at master side.
*/
public static void setupRegionReplicaReplication(MasterServices services)
throws IOException, ReplicationException {
if (!isRegionReplicaReplicationEnabled(services.getConfiguration())) {
return;
}
if (services.getReplicationPeerManager().getPeerConfig(REGION_REPLICA_REPLICATION_PEER)
.isPresent()) {
return;
}
LOG.info("Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER +
" not exist. Creating...");
ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
.setClusterKey(ZKConfig.getZooKeeperClusterKey(services.getConfiguration()))
.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName()).build();
services.addReplicationPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, true);
}
/**
* @return True if Region Read Replica is enabled for <code>tn</code> (whether hbase:meta or
* user-space tables).

View File

@ -298,8 +298,8 @@ public class TestIOFencing {
FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
new Path("store_dir"));
WALUtil.writeCompactionMarker(compactingRegion.getWAL(),
((HRegion)compactingRegion).getReplicationScope(),
oldHri, compactionDescriptor, compactingRegion.getMVCC());
((HRegion) compactingRegion).getReplicationScope(), oldHri, compactionDescriptor,
compactingRegion.getMVCC(), null);
// Wait till flush has happened, otherwise there won't be multiple store files
long startWaitTime = EnvironmentEdgeManager.currentTime();

View File

@ -125,12 +125,6 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection {
return null;
}
@Override
public CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
List<Entry> entries, int replicaId, int numRetries, long operationTimeoutNs) {
return null;
}
@Override
public CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
boolean reload) {
@ -169,4 +163,11 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection {
public CompletableFuture<List<ServerName>> getAllBootstrapNodes(ServerName regionServer) {
return null;
}
@Override
public CompletableFuture<Void> replicate(RegionInfo replica,
List<Entry> entries, int numRetries, long rpcTimeoutNs,
long operationTimeoutNs) {
return null;
}
}

View File

@ -996,7 +996,7 @@ public class TestHRegion {
region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
this.region.getRegionInfo(), compactionDescriptor, region.getMVCC(), null);
Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplication;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
@ -64,7 +64,7 @@ public class TestRegionReplicaFailover {
HBaseClassTestRule.forClass(TestRegionReplicaFailover.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class);
LoggerFactory.getLogger(TestRegionReplicaReplication.class);
private static final HBaseTestingUtil HTU = new HBaseTestingUtil();

View File

@ -20,14 +20,13 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@ -36,7 +35,6 @@ import org.apache.hadoop.hbase.ClientMetaTableAccessor;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.TableName;
@ -45,7 +43,6 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
@ -67,18 +64,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests RegionReplicaReplicationEndpoint class for hbase:meta by setting up region replicas and
* verifying async wal replication replays the edits to the secondary region in various scenarios.
*
* @see TestRegionReplicaReplicationEndpoint
* Tests region replication for hbase:meta by setting up region replicas and verifying async wal
* replication replays the edits to the secondary region in various scenarios.
* @see TestRegionReplicaReplication
*/
@Category({LargeTests.class})
public class TestMetaRegionReplicaReplicationEndpoint {
@Category({ LargeTests.class })
public class TestMetaRegionReplicaReplication {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestMetaRegionReplicaReplicationEndpoint.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestMetaRegionReplicaReplicationEndpoint.class);
HBaseClassTestRule.forClass(TestMetaRegionReplicaReplication.class);
private static final Logger LOG = LoggerFactory.getLogger(TestMetaRegionReplicaReplication.class);
private static final int NB_SERVERS = 4;
private final HBaseTestingUtil HTU = new HBaseTestingUtil();
private int numOfMetaReplica = NB_SERVERS - 1;
@ -102,17 +98,15 @@ public class TestMetaRegionReplicaReplicationEndpoint {
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
// Enable hbase:meta replication.
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY,
true);
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
// Set hbase:meta replicas to be 3.
// conf.setInt(HConstants.META_REPLICAS_NUM, numOfMetaReplica);
HTU.startMiniCluster(NB_SERVERS);
// Enable hbase:meta replication.
HBaseTestingUtil.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, numOfMetaReplica);
HTU.waitFor(30000,
() -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size()
>= numOfMetaReplica);
HTU.waitFor(30000, () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME)
.size() >= numOfMetaReplica);
}
@After
@ -120,84 +114,20 @@ public class TestMetaRegionReplicaReplicationEndpoint {
HTU.shutdownMiniCluster();
}
/**
* Assert that the ReplicationSource for hbase:meta gets created when hbase:meta is opened.
*/
@Test
public void testHBaseMetaReplicationSourceCreatedOnOpen() throws Exception {
SingleProcessHBaseCluster cluster = HTU.getMiniHBaseCluster();
HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
// Replicate a row to prove all working.
testHBaseMetaReplicatesOneRow(0);
assertTrue(isMetaRegionReplicaReplicationSource(hrs));
// Now move the hbase:meta and make sure the ReplicationSource is in both places.
HRegionServer hrsOther = null;
for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) {
hrsOther = cluster.getRegionServer(i);
if (hrsOther.getServerName().equals(hrs.getServerName())) {
hrsOther = null;
continue;
}
break;
}
assertNotNull(hrsOther);
assertFalse(isMetaRegionReplicaReplicationSource(hrsOther));
Region meta = null;
for (Region region : hrs.getOnlineRegionsLocalContext()) {
if (region.getRegionInfo().isMetaRegion()) {
meta = region;
break;
}
}
assertNotNull(meta);
HTU.moveRegionAndWait(meta.getRegionInfo(), hrsOther.getServerName());
// Assert that there is a ReplicationSource in both places now.
assertTrue(isMetaRegionReplicaReplicationSource(hrs));
assertTrue(isMetaRegionReplicaReplicationSource(hrsOther));
// Replicate to show stuff still works.
testHBaseMetaReplicatesOneRow(1);
// Now pretend a few hours have gone by... roll the meta WAL in original location... Move the
// meta back and retry replication. See if it works.
hrs.getWAL(meta.getRegionInfo()).rollWriter(true);
testHBaseMetaReplicatesOneRow(2);
hrs.getWAL(meta.getRegionInfo()).rollWriter(true);
testHBaseMetaReplicatesOneRow(3);
}
/**
* Test meta region replica replication. Create some tables and see if replicas pick up the
* additions.
*/
private void testHBaseMetaReplicatesOneRow(int i) throws Exception {
waitForMetaReplicasToOnline();
try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_" + i),
HConstants.CATALOG_FAMILY)) {
verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
}
}
/**
* @return Whether the special meta region replica peer is enabled on <code>hrs</code>
*/
private boolean isMetaRegionReplicaReplicationSource(HRegionServer hrs) {
return hrs.getReplicationSourceService().getReplicationManager().
catalogReplicationSource.get() != null;
}
/**
* Test meta region replica replication. Create some tables and see if replicas pick up the
* additions.
*/
@Test
public void testHBaseMetaReplicates() throws Exception {
try (Table table = HTU
.createTable(TableName.valueOf(this.name.getMethodName() + "_0"), HConstants.CATALOG_FAMILY,
Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_0"),
HConstants.CATALOG_FAMILY,
Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
}
try (Table table = HTU
.createTable(TableName.valueOf(this.name.getMethodName() + "_1"), HConstants.CATALOG_FAMILY,
Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_1"),
HConstants.CATALOG_FAMILY,
Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
// Try delete.
HTU.deleteTableIfAny(table.getName());
@ -207,26 +137,22 @@ public class TestMetaRegionReplicaReplicationEndpoint {
@Test
public void testCatalogReplicaReplicationWithFlushAndCompaction() throws Exception {
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
TableName tableName = TableName.valueOf("hbase:meta");
Table table = connection.getTable(tableName);
try {
try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(TableName.META_TABLE_NAME)) {
// load the data to the table
for (int i = 0; i < 5; i++) {
LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
LOG.info("flushing table");
HTU.flush(tableName);
HTU.flush(TableName.META_TABLE_NAME);
LOG.info("compacting table");
if (i < 4) {
HTU.compact(tableName, false);
HTU.compact(TableName.META_TABLE_NAME, false);
}
}
verifyReplication(tableName, numOfMetaReplica, 0, 5000, HConstants.CATALOG_FAMILY);
} finally {
table.close();
connection.close();
verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, 0, 5000,
HConstants.CATALOG_FAMILY);
}
}
@ -235,7 +161,6 @@ public class TestMetaRegionReplicaReplicationEndpoint {
SingleProcessHBaseCluster cluster = HTU.getMiniHBaseCluster();
HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
HRegionServer hrsMetaReplica = null;
HRegionServer hrsNoMetaReplica = null;
HRegionServer server = null;
Region metaReplica = null;
@ -260,11 +185,8 @@ public class TestMetaRegionReplicaReplicationEndpoint {
hrsNoMetaReplica = server;
}
}
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
TableName tableName = TableName.valueOf("hbase:meta");
Table table = connection.getTable(tableName);
try {
try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(TableName.META_TABLE_NAME)) {
// load the data to the table
for (int i = 0; i < 5; i++) {
LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
@ -274,10 +196,8 @@ public class TestMetaRegionReplicaReplicationEndpoint {
}
}
verifyReplication(tableName, numOfMetaReplica, 0, 5000, HConstants.CATALOG_FAMILY);
} finally {
table.close();
connection.close();
verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, 0, 5000,
HConstants.CATALOG_FAMILY);
}
}
@ -323,22 +243,6 @@ public class TestMetaRegionReplicaReplicationEndpoint {
}
}
/**
* Replicas come online after primary.
*/
private void waitForMetaReplicasToOnline() throws IOException {
final RegionLocator regionLocator =
HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME);
HTU.waitFor(10000,
// getRegionLocations returns an entry for each replica but if unassigned, entry is null.
// Pass reload to force us to skip cache else it just keeps returning default.
() -> regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW, true).stream().
filter(Objects::nonNull).count() >= numOfMetaReplica);
List<HRegionLocation> locations = regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW);
LOG.info("Found locations {}", locations);
assertEquals(numOfMetaReplica, locations.size());
}
/**
* Scan hbase:meta for <code>tableName</code> content.
*/
@ -373,20 +277,9 @@ public class TestMetaRegionReplicaReplicationEndpoint {
return regions;
}
private Region getOneRegion(TableName tableName) {
for (int i = 0; i < NB_SERVERS; i++) {
HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
List<HRegion> onlineRegions = rs.getRegions(tableName);
if (onlineRegions.size() > 1) {
return onlineRegions.get(0);
}
}
return null;
}
/**
* Verify when a Table is deleted from primary, then there are no references in replicas
* (because they get the delete of the table rows too).
* Verify when a Table is deleted from primary, then there are no references in replicas (because
* they get the delete of the table rows too).
*/
private void verifyDeletedReplication(TableName tableName, int regionReplication,
final TableName deletedTableName) {
@ -417,8 +310,8 @@ public class TestMetaRegionReplicaReplicationEndpoint {
}
/**
* Cells are from hbase:meta replica so will start w/ 'tableName,'; i.e. the tablename followed
* by HConstants.DELIMITER. Make sure the deleted table is no longer present in passed
* Cells are from hbase:meta replica so will start w/ 'tableName,'; i.e. the tablename followed by
* HConstants.DELIMITER. Make sure the deleted table is no longer present in passed
* <code>cells</code>.
*/
private boolean doesNotContain(List<Cell> cells, TableName tableName) {
@ -491,21 +384,19 @@ public class TestMetaRegionReplicaReplicationEndpoint {
}
private void primaryNoChangeReplicaIncrease(final long[] before, final long[] after) {
assertEquals(before[RegionInfo.DEFAULT_REPLICA_ID],
after[RegionInfo.DEFAULT_REPLICA_ID]);
assertEquals(before[RegionInfo.DEFAULT_REPLICA_ID], after[RegionInfo.DEFAULT_REPLICA_ID]);
for (int i = 1; i < after.length; i ++) {
for (int i = 1; i < after.length; i++) {
assertTrue(after[i] > before[i]);
}
}
private void primaryIncreaseReplicaNoChange(final long[] before, final long[] after) {
// There are read requests increase for primary meta replica.
assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] >
before[RegionInfo.DEFAULT_REPLICA_ID]);
assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] > before[RegionInfo.DEFAULT_REPLICA_ID]);
// No change for replica regions
for (int i = 1; i < after.length; i ++) {
for (int i = 1; i < after.length; i++) {
assertEquals(before[i], after[i]);
}
}
@ -515,13 +406,12 @@ public class TestMetaRegionReplicaReplicationEndpoint {
for (Region r : metaRegions) {
LOG.info("read request for region {} is {}", r, r.getReadRequestsCount());
counters[i] = r.getReadRequestsCount();
i ++;
i++;
}
}
@Test
public void testHBaseMetaReplicaGets() throws Exception {
TableName tn = TableName.valueOf(this.name.getMethodName());
final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, numOfMetaReplica);
long[] readReqsForMetaReplicas = new long[numOfMetaReplica];

View File

@ -0,0 +1,273 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertNotNull;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
/**
* Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
* async wal replication replays the edits to the secondary region in various scenarios.
*/
@Category({FlakeyTests.class, LargeTests.class})
public class TestRegionReplicaReplication {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionReplicaReplication.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestRegionReplicaReplication.class);
private static final int NB_SERVERS = 2;
private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
@Rule
public TestName name = new TestName();
@BeforeClass
public static void beforeClass() throws Exception {
Configuration conf = HTU.getConfiguration();
conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
conf.setInt("replication.source.size.capacity", 10240);
conf.setLong("replication.source.sleepforretries", 100);
conf.setInt("hbase.regionserver.maxlogs", 10);
conf.setLong("hbase.master.logcleaner.ttl", 10);
conf.setInt("zookeeper.recovery.retry", 1);
conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf.setInt("replication.stats.thread.period.seconds", 5);
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
HTU.startMiniCluster(NB_SERVERS);
}
@AfterClass
public static void afterClass() throws Exception {
HTU.shutdownMiniCluster();
}
private void testRegionReplicaReplication(int regionReplication) throws Exception {
// test region replica replication. Create a table with single region, write some data
// ensure that data is replicated to the secondary region
TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
+ regionReplication);
TableDescriptor htd = HTU
.createModifyableTableDescriptor(TableName.valueOf(tableName.toString()),
ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
.setRegionReplication(regionReplication).build();
createOrEnableTableWithRetries(htd, true);
TableName tableNameNoReplicas =
TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
HTU.deleteTableIfAny(tableNameNoReplicas);
HTU.createTable(tableNameNoReplicas, HBaseTestingUtil.fam1);
try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(tableName);
Table tableNoReplicas = connection.getTable(tableNameNoReplicas)) {
// load some data to the non-replicated table
HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtil.fam1, 6000, 7000);
// load the data to the table
HTU.loadNumericRows(table, HBaseTestingUtil.fam1, 0, 1000);
verifyReplication(tableName, regionReplication, 0, 1000);
} finally {
HTU.deleteTableIfAny(tableNameNoReplicas);
}
}
private void verifyReplication(TableName tableName, int regionReplication,
final int startRow, final int endRow) throws Exception {
verifyReplication(tableName, regionReplication, startRow, endRow, true);
}
private void verifyReplication(TableName tableName, int regionReplication,
final int startRow, final int endRow, final boolean present) throws Exception {
// find the regions
final Region[] regions = new Region[regionReplication];
for (int i=0; i < NB_SERVERS; i++) {
HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
List<HRegion> onlineRegions = rs.getRegions(tableName);
for (HRegion region : onlineRegions) {
regions[region.getRegionInfo().getReplicaId()] = region;
}
}
for (Region region : regions) {
assertNotNull(region);
}
for (int i = 1; i < regionReplication; i++) {
final Region region = regions[i];
// wait until all the data is replicated to all secondary regions
Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
LOG.info("verifying replication for region replica:" + region.getRegionInfo());
try {
HTU.verifyNumericRows(region, HBaseTestingUtil.fam1, startRow, endRow, present);
} catch(Throwable ex) {
LOG.warn("Verification from secondary region is not complete yet", ex);
// still wait
return false;
}
return true;
}
});
}
}
@Test
public void testRegionReplicaReplicationWith2Replicas() throws Exception {
testRegionReplicaReplication(2);
}
@Test
public void testRegionReplicaReplicationWith3Replicas() throws Exception {
testRegionReplicaReplication(3);
}
@Test
public void testRegionReplicaReplicationWith10Replicas() throws Exception {
testRegionReplicaReplication(10);
}
@Test
public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
int regionReplication = 3;
TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
.setRegionReplication(regionReplication).setRegionMemStoreReplication(false).build();
createOrEnableTableWithRetries(htd, true);
final TableName tableName = htd.getTableName();
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(tableName);
try {
// write data to the primary. The replicas should not receive the data
final int STEP = 100;
for (int i = 0; i < 3; ++i) {
final int startRow = i * STEP;
final int endRow = (i + 1) * STEP;
LOG.info("Writing data from " + startRow + " to " + endRow);
HTU.loadNumericRows(table, HBaseTestingUtil.fam1, startRow, endRow);
verifyReplication(tableName, regionReplication, startRow, endRow, false);
// Flush the table, now the data should show up in the replicas
LOG.info("flushing table");
HTU.flush(tableName);
verifyReplication(tableName, regionReplication, 0, endRow, true);
}
} finally {
table.close();
connection.close();
}
}
@Test
public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
// Tests a table with region replication 3. Writes some data, and causes flushes and
// compactions. Verifies that the data is readable from the replicas. Note that this
// does not test whether the replicas actually pick up flushed files and apply compaction
// to their stores
int regionReplication = 3;
TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
.setRegionReplication(regionReplication).build();
createOrEnableTableWithRetries(htd, true);
final TableName tableName = htd.getTableName();
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(tableName);
try {
// load the data to the table
for (int i = 0; i < 6000; i += 1000) {
LOG.info("Writing data from " + i + " to " + (i+1000));
HTU.loadNumericRows(table, HBaseTestingUtil.fam1, i, i+1000);
LOG.info("flushing table");
HTU.flush(tableName);
LOG.info("compacting table");
HTU.compact(tableName, false);
}
verifyReplication(tableName, regionReplication, 0, 1000);
} finally {
table.close();
connection.close();
}
}
private void createOrEnableTableWithRetries(TableDescriptor htd, boolean createTableOperation) {
// Helper function to run create/enable table operations with a retry feature
boolean continueToRetry = true;
int tries = 0;
while (continueToRetry && tries < 50) {
try {
continueToRetry = false;
if (createTableOperation) {
HTU.getAdmin().createTable(htd);
} else {
HTU.getAdmin().enableTable(htd.getTableName());
}
} catch (IOException e) {
if (e.getCause() instanceof ReplicationException) {
continueToRetry = true;
tries++;
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
}
}
}
}
}

View File

@ -1,515 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
/**
* Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
* async wal replication replays the edits to the secondary region in various scenarios.
*/
@Category({FlakeyTests.class, LargeTests.class})
public class TestRegionReplicaReplicationEndpoint {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpoint.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class);
private static final int NB_SERVERS = 2;
private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
@Rule
public TestName name = new TestName();
@BeforeClass
public static void beforeClass() throws Exception {
Configuration conf = HTU.getConfiguration();
conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
conf.setInt("replication.source.size.capacity", 10240);
conf.setLong("replication.source.sleepforretries", 100);
conf.setInt("hbase.regionserver.maxlogs", 10);
conf.setLong("hbase.master.logcleaner.ttl", 10);
conf.setInt("zookeeper.recovery.retry", 1);
conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf.setInt("replication.stats.thread.period.seconds", 5);
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
HTU.startMiniCluster(NB_SERVERS);
}
@AfterClass
public static void afterClass() throws Exception {
HTU.shutdownMiniCluster();
}
@Test
public void testRegionReplicaReplicationPeerIsCreated() throws IOException {
// create a table with region replicas. Check whether the replication peer is created
// and replication started.
try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Admin admin = connection.getAdmin()) {
String peerId = ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER;
ReplicationPeerConfig peerConfig = null;
try {
peerConfig = admin.getReplicationPeerConfig(peerId);
} catch (ReplicationPeerNotFoundException e) {
LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
}
if (peerConfig != null) {
admin.removeReplicationPeer(peerId);
peerConfig = null;
}
TableDescriptor htd = HTU
.createTableDescriptor(TableName.valueOf("testReplicationPeerIsCreated_no_region_replicas"),
ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
createOrEnableTableWithRetries(htd, true);
try {
peerConfig = admin.getReplicationPeerConfig(peerId);
fail("Should throw ReplicationException, because replication peer id=" + peerId
+ " not exist");
} catch (ReplicationPeerNotFoundException e) {
}
assertNull(peerConfig);
htd = HTU.createModifyableTableDescriptor(TableName.valueOf("testReplicationPeerIsCreated"),
ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED).setRegionReplication(2).build();
createOrEnableTableWithRetries(htd, true);
// assert peer configuration is correct
peerConfig = admin.getReplicationPeerConfig(peerId);
assertNotNull(peerConfig);
assertEquals(peerConfig.getClusterKey(),
ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration()));
assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
peerConfig.getReplicationEndpointImpl());
}
}
@Test
public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception {
// modify a table by adding region replicas. Check whether the replication peer is created
// and replication started.
try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Admin admin = connection.getAdmin()) {
String peerId = "region_replica_replication";
ReplicationPeerConfig peerConfig = null;
try {
peerConfig = admin.getReplicationPeerConfig(peerId);
} catch (ReplicationPeerNotFoundException e) {
LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
}
if (peerConfig != null) {
admin.removeReplicationPeer(peerId);
peerConfig = null;
}
TableDescriptor htd = HTU.createTableDescriptor(
TableName.valueOf("testRegionReplicaReplicationPeerIsCreatedForModifyTable"),
ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
createOrEnableTableWithRetries(htd, true);
// assert that replication peer is not created yet
try {
peerConfig = admin.getReplicationPeerConfig(peerId);
fail("Should throw ReplicationException, because replication peer id=" + peerId
+ " not exist");
} catch (ReplicationPeerNotFoundException e) {
}
assertNull(peerConfig);
HTU.getAdmin().disableTable(htd.getTableName());
htd = TableDescriptorBuilder.newBuilder(htd).setRegionReplication(2).build();
HTU.getAdmin().modifyTable(htd);
createOrEnableTableWithRetries(htd, false);
// assert peer configuration is correct
peerConfig = admin.getReplicationPeerConfig(peerId);
assertNotNull(peerConfig);
assertEquals(peerConfig.getClusterKey(),
ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration()));
assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
peerConfig.getReplicationEndpointImpl());
}
}
public void testRegionReplicaReplication(int regionReplication) throws Exception {
// test region replica replication. Create a table with single region, write some data
// ensure that data is replicated to the secondary region
TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
+ regionReplication);
TableDescriptor htd = HTU
.createModifyableTableDescriptor(TableName.valueOf(tableName.toString()),
ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
.setRegionReplication(regionReplication).build();
createOrEnableTableWithRetries(htd, true);
TableName tableNameNoReplicas =
TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
HTU.deleteTableIfAny(tableNameNoReplicas);
HTU.createTable(tableNameNoReplicas, HBaseTestingUtil.fam1);
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(tableName);
Table tableNoReplicas = connection.getTable(tableNameNoReplicas);
try {
// load some data to the non-replicated table
HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtil.fam1, 6000, 7000);
// load the data to the table
HTU.loadNumericRows(table, HBaseTestingUtil.fam1, 0, 1000);
verifyReplication(tableName, regionReplication, 0, 1000);
} finally {
table.close();
tableNoReplicas.close();
HTU.deleteTableIfAny(tableNameNoReplicas);
connection.close();
}
}
private void verifyReplication(TableName tableName, int regionReplication,
final int startRow, final int endRow) throws Exception {
verifyReplication(tableName, regionReplication, startRow, endRow, true);
}
private void verifyReplication(TableName tableName, int regionReplication,
final int startRow, final int endRow, final boolean present) throws Exception {
// find the regions
final Region[] regions = new Region[regionReplication];
for (int i=0; i < NB_SERVERS; i++) {
HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
List<HRegion> onlineRegions = rs.getRegions(tableName);
for (HRegion region : onlineRegions) {
regions[region.getRegionInfo().getReplicaId()] = region;
}
}
for (Region region : regions) {
assertNotNull(region);
}
for (int i = 1; i < regionReplication; i++) {
final Region region = regions[i];
// wait until all the data is replicated to all secondary regions
Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
LOG.info("verifying replication for region replica:" + region.getRegionInfo());
try {
HTU.verifyNumericRows(region, HBaseTestingUtil.fam1, startRow, endRow, present);
} catch(Throwable ex) {
LOG.warn("Verification from secondary region is not complete yet", ex);
// still wait
return false;
}
return true;
}
});
}
}
@Test
public void testRegionReplicaReplicationWith2Replicas() throws Exception {
testRegionReplicaReplication(2);
}
@Test
public void testRegionReplicaReplicationWith3Replicas() throws Exception {
testRegionReplicaReplication(3);
}
@Test
public void testRegionReplicaReplicationWith10Replicas() throws Exception {
testRegionReplicaReplication(10);
}
@Test
public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
int regionReplication = 3;
TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
.setRegionReplication(regionReplication).setRegionMemStoreReplication(false).build();
createOrEnableTableWithRetries(htd, true);
final TableName tableName = htd.getTableName();
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(tableName);
try {
// write data to the primary. The replicas should not receive the data
final int STEP = 100;
for (int i = 0; i < 3; ++i) {
final int startRow = i * STEP;
final int endRow = (i + 1) * STEP;
LOG.info("Writing data from " + startRow + " to " + endRow);
HTU.loadNumericRows(table, HBaseTestingUtil.fam1, startRow, endRow);
verifyReplication(tableName, regionReplication, startRow, endRow, false);
// Flush the table, now the data should show up in the replicas
LOG.info("flushing table");
HTU.flush(tableName);
verifyReplication(tableName, regionReplication, 0, endRow, true);
}
} finally {
table.close();
connection.close();
}
}
@Test
public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
// Tests a table with region replication 3. Writes some data, and causes flushes and
// compactions. Verifies that the data is readable from the replicas. Note that this
// does not test whether the replicas actually pick up flushed files and apply compaction
// to their stores
int regionReplication = 3;
TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
.setRegionReplication(regionReplication).build();
createOrEnableTableWithRetries(htd, true);
final TableName tableName = htd.getTableName();
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(tableName);
try {
// load the data to the table
for (int i = 0; i < 6000; i += 1000) {
LOG.info("Writing data from " + i + " to " + (i+1000));
HTU.loadNumericRows(table, HBaseTestingUtil.fam1, i, i+1000);
LOG.info("flushing table");
HTU.flush(tableName);
LOG.info("compacting table");
HTU.compact(tableName, false);
}
verifyReplication(tableName, regionReplication, 0, 1000);
} finally {
table.close();
connection.close();
}
}
@Test
public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception {
testRegionReplicaReplicationIgnores(false, false);
}
@Test
public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception {
testRegionReplicaReplicationIgnores(true, false);
}
@Test
public void testRegionReplicaReplicationIgnoresNonReplicatedTables() throws Exception {
testRegionReplicaReplicationIgnores(false, true);
}
private void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
throws Exception {
// tests having edits from a disabled or dropped table is handled correctly by skipping those
// entries and further edits after the edits from dropped/disabled table can be replicated
// without problems.
int regionReplication = 3;
TableDescriptor htd = HTU
.createModifyableTableDescriptor(
name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + disableReplication)
.setRegionReplication(regionReplication).build();
final TableName tableName = htd.getTableName();
HTU.deleteTableIfAny(tableName);
createOrEnableTableWithRetries(htd, true);
TableName toBeDisabledTable = TableName.valueOf(
dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable"));
HTU.deleteTableIfAny(toBeDisabledTable);
htd = HTU
.createModifyableTableDescriptor(toBeDisabledTable,
ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
.setRegionReplication(regionReplication).build();
createOrEnableTableWithRetries(htd, true);
// both tables are created, now pause replication
HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
// now that the replication is disabled, write to the table to be dropped, then drop the table.
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(tableName);
Table tableToBeDisabled = connection.getTable(toBeDisabledTable);
HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtil.fam1, 6000, 7000);
RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
byte[] encodedRegionName = hrl.getRegion().getEncodedNameAsBytes();
Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A"))
.setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build();
Entry entry = new Entry(
new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
new WALEdit()
.add(cell));
HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
if (dropTable) {
HTU.getAdmin().deleteTable(toBeDisabledTable);
} else if (disableReplication) {
htd =
TableDescriptorBuilder.newBuilder(htd).setRegionReplication(regionReplication - 2).build();
HTU.getAdmin().modifyTable(htd);
createOrEnableTableWithRetries(htd, false);
}
HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0);
MetricsSource metrics = mock(MetricsSource.class);
ReplicationEndpoint.Context ctx =
new ReplicationEndpoint.Context(rs, HTU.getConfiguration(), HTU.getConfiguration(),
HTU.getTestFileSystem(), ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER,
UUID.fromString(rs.getClusterId()), rs.getReplicationSourceService().getReplicationPeers()
.getPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER),
metrics, rs.getTableDescriptors(), rs);
RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint();
rrpe.init(ctx);
rrpe.start();
ReplicationEndpoint.ReplicateContext repCtx = new ReplicationEndpoint.ReplicateContext();
repCtx.setEntries(Lists.newArrayList(entry, entry));
assertTrue(rrpe.replicate(repCtx));
verify(metrics, times(1)).incrLogEditsFiltered(eq(2L));
rrpe.stop();
if (disableReplication) {
// enable replication again so that we can verify replication
HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
htd = TableDescriptorBuilder.newBuilder(htd).setRegionReplication(regionReplication).build();
HTU.getAdmin().modifyTable(htd);
createOrEnableTableWithRetries(htd, false);
}
try {
// load some data to the to-be-dropped table
// load the data to the table
HTU.loadNumericRows(table, HBaseTestingUtil.fam1, 0, 1000);
// now enable the replication
HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
verifyReplication(tableName, regionReplication, 0, 1000);
} finally {
table.close();
rl.close();
tableToBeDisabled.close();
HTU.deleteTableIfAny(toBeDisabledTable);
connection.close();
}
}
private void createOrEnableTableWithRetries(TableDescriptor htd, boolean createTableOperation) {
// Helper function to run create/enable table operations with a retry feature
boolean continueToRetry = true;
int tries = 0;
while (continueToRetry && tries < 50) {
try {
continueToRetry = false;
if (createTableOperation) {
HTU.getAdmin().createTable(htd);
} else {
HTU.getAdmin().enableTable(htd.getTableName());
}
} catch (IOException e) {
if (e.getCause() instanceof ReplicationException) {
continueToRetry = true;
tries++;
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
}
}
}
}
}

View File

@ -1,281 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.WALCoprocessor;
import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.WALObserver;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/**
* Tests RegionReplicaReplicationEndpoint. Unlike TestRegionReplicaReplicationEndpoint this
* class contains lower level tests using callables.
*/
@Category({ReplicationTests.class, MediumTests.class})
public class TestRegionReplicaReplicationEndpointNoMaster {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpointNoMaster.class);
private static final int NB_SERVERS = 2;
private static TableName tableName = TableName.valueOf(
TestRegionReplicaReplicationEndpointNoMaster.class.getSimpleName());
private static Table table;
private static final byte[] row = Bytes.toBytes("TestRegionReplicaReplicator");
private static HRegionServer rs0;
private static HRegionServer rs1;
private static RegionInfo hriPrimary;
private static RegionInfo hriSecondary;
private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
private static final byte[] f = HConstants.CATALOG_FAMILY;
@BeforeClass
public static void beforeClass() throws Exception {
Configuration conf = HTU.getConfiguration();
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
// install WALObserver coprocessor for tests
String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY);
if (walCoprocs == null) {
walCoprocs = WALEditCopro.class.getName();
} else {
walCoprocs += "," + WALEditCopro.class.getName();
}
HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
walCoprocs);
StartTestingClusterOption option = StartTestingClusterOption.builder()
.numAlwaysStandByMasters(1).numRegionServers(NB_SERVERS).numDataNodes(NB_SERVERS).build();
HTU.startMiniCluster(option);
// Create table then get the single region for our new table.
TableDescriptor htd = HTU.createTableDescriptor(TableName.valueOf(tableName.getNameAsString()),
ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
table = HTU.createTable(htd, new byte[][]{f}, null);
try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) {
hriPrimary = locator.getRegionLocation(row, false).getRegion();
}
// mock a secondary region info to open
hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
// No master
TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
rs0 = HTU.getMiniHBaseCluster().getRegionServer(0);
rs1 = HTU.getMiniHBaseCluster().getRegionServer(1);
}
@AfterClass
public static void afterClass() throws Exception {
HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
table.close();
HTU.shutdownMiniCluster();
}
@Before
public void before() throws Exception {
entries.clear();
}
@After
public void after() throws Exception {
}
static ConcurrentLinkedQueue<Entry> entries = new ConcurrentLinkedQueue<>();
public static class WALEditCopro implements WALCoprocessor, WALObserver {
public WALEditCopro() {
entries.clear();
}
@Override
public Optional<WALObserver> getWALObserver() {
return Optional.of(this);
}
@Override
public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
// only keep primary region's edits
if (logKey.getTableName().equals(tableName) && info.getReplicaId() == 0) {
// Presume type is a WALKeyImpl
entries.add(new Entry((WALKeyImpl)logKey, logEdit));
}
}
}
@Test
public void testReplayCallable() throws Exception {
// tests replaying the edits to a secondary region replica using the Callable directly
openRegion(HTU, rs0, hriSecondary);
// load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
Assert.assertEquals(1000, entries.size());
try (AsyncClusterConnection conn = ClusterConnectionFactory
.createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
// replay the edits to the secondary using replay callable
replicateUsingCallable(conn, entries);
}
Region region = rs0.getRegion(hriSecondary.getEncodedName());
HTU.verifyNumericRows(region, f, 0, 1000);
HTU.deleteNumericRows(table, f, 0, 1000);
closeRegion(HTU, rs0, hriSecondary);
}
private void replicateUsingCallable(AsyncClusterConnection connection, Queue<Entry> entries)
throws IOException, ExecutionException, InterruptedException {
Entry entry;
while ((entry = entries.poll()) != null) {
byte[] row = CellUtil.cloneRow(entry.getEdit().getCells().get(0));
RegionLocations locations = connection.getRegionLocations(tableName, row, true).get();
connection
.replay(tableName, locations.getRegionLocation(1).getRegion().getEncodedNameAsBytes(), row,
Collections.singletonList(entry), 1, Integer.MAX_VALUE, TimeUnit.SECONDS.toNanos(10))
.get();
}
}
@Test
public void testReplayCallableWithRegionMove() throws Exception {
// tests replaying the edits to a secondary region replica using the Callable directly while
// the region is moved to another location.It tests handling of RME.
try (AsyncClusterConnection conn = ClusterConnectionFactory
.createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
openRegion(HTU, rs0, hriSecondary);
// load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
Assert.assertEquals(1000, entries.size());
// replay the edits to the secondary using replay callable
replicateUsingCallable(conn, entries);
Region region = rs0.getRegion(hriSecondary.getEncodedName());
HTU.verifyNumericRows(region, f, 0, 1000);
HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
// move the secondary region from RS0 to RS1
closeRegion(HTU, rs0, hriSecondary);
openRegion(HTU, rs1, hriSecondary);
// replicate the new data
replicateUsingCallable(conn, entries);
region = rs1.getRegion(hriSecondary.getEncodedName());
// verify the new data. old data may or may not be there
HTU.verifyNumericRows(region, f, 1000, 2000);
HTU.deleteNumericRows(table, f, 0, 2000);
closeRegion(HTU, rs1, hriSecondary);
}
}
@Test
public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
// tests replaying the edits to a secondary region replica using the RRRE.replicate()
openRegion(HTU, rs0, hriSecondary);
RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
when(context.getServer()).thenReturn(rs0);
when(context.getTableDescriptors()).thenReturn(rs0.getTableDescriptors());
replicator.init(context);
replicator.startAsync();
//load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
Assert.assertEquals(1000, entries.size());
// replay the edits to the secondary using replay callable
final String fakeWalGroupId = "fakeWALGroup";
replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))
.setWalGroupId(fakeWalGroupId));
replicator.stop();
Region region = rs0.getRegion(hriSecondary.getEncodedName());
HTU.verifyNumericRows(region, f, 0, 1000);
HTU.deleteNumericRows(table, f, 0, 1000);
closeRegion(HTU, rs0, hriSecondary);
}
}