HBASE-27085 Create REPLICATION_SINK_TRACKER table to persist marker rows coming from source cluster (#4486)

This commit is contained in:
Rushabh Shah 2022-06-20 22:33:10 -07:00 committed by GitHub
parent 2296ffe6cf
commit 3ccbd47dfa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 779 additions and 1 deletions

View File

@ -182,3 +182,12 @@ message RegionEventDescriptor {
*/ */
message WALTrailer { message WALTrailer {
} }
/**
* Special WAL entry for replication marker event.
*/
message ReplicationMarkerDescriptor {
required string region_server_name = 1;
required string wal_name = 2;
required uint64 offset = 3;
}

View File

@ -218,6 +218,7 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator;
import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint; import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint;
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager; import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager;
@ -1247,6 +1248,8 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
slowLogMasterService.init(); slowLogMasterService.init();
WALEventTrackerTableCreator.createIfNeededAndNotExists(conf, this); WALEventTrackerTableCreator.createIfNeededAndNotExists(conf, this);
// Create REPLICATION.SINK_TRACKER table if needed.
ReplicationSinkTrackerTableCreator.createIfNeededAndNotExists(conf, this);
// clear the dead servers with same host name and port of online server because we are not // clear the dead servers with same host name and port of online server because we are not
// removing dead server with same hostname and port of rs which is trying to check in before // removing dead server with same hostname and port of rs which is trying to check in before

View File

@ -143,5 +143,4 @@ public class NamedQueueRecorder {
this.logEventHandler.persistAll(namedQueueEvent, connection); this.logEventHandler.persistAll(namedQueueEvent, connection);
} }
} }
} }

View File

@ -25,6 +25,10 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
import static org.apache.hadoop.hbase.HConstants.NAMED_QUEUE_CHORE_DURATION_DEFAULT; import static org.apache.hadoop.hbase.HConstants.NAMED_QUEUE_CHORE_DURATION_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.WAL_EVENT_TRACKER_ENABLED_DEFAULT; import static org.apache.hadoop.hbase.HConstants.WAL_EVENT_TRACKER_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.WAL_EVENT_TRACKER_ENABLED_KEY; import static org.apache.hadoop.hbase.HConstants.WAL_EVENT_TRACKER_ENABLED_KEY;
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT;
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY;
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY; import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Span;
@ -140,6 +144,7 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener; import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.security.SecurityConstants; import org.apache.hadoop.hbase.security.SecurityConstants;
@ -473,6 +478,11 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
private RegionReplicationBufferManager regionReplicationBufferManager; private RegionReplicationBufferManager regionReplicationBufferManager;
/*
* Chore that creates replication marker rows.
*/
private ReplicationMarkerChore replicationMarkerChore;
/** /**
* Starts a HRegionServer at the default location. * Starts a HRegionServer at the default location.
* <p/> * <p/>
@ -758,6 +768,17 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
|| (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp()); || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
} }
private void initializeReplicationMarkerChore() {
boolean replicationMarkerEnabled =
conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT);
// If replication or replication marker is not enabled then return immediately.
if (replicationMarkerEnabled) {
int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY,
REPLICATION_MARKER_CHORE_DURATION_DEFAULT);
replicationMarkerChore = new ReplicationMarkerChore(this, this, period, conf);
}
}
/** /**
* The HRegionServer sticks in this loop until closed. * The HRegionServer sticks in this loop until closed.
*/ */
@ -1918,6 +1939,11 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
choreService.scheduleChore(brokenStoreFileCleaner); choreService.scheduleChore(brokenStoreFileCleaner);
} }
if (replicationMarkerChore != null) {
LOG.info("Starting replication marker chore");
choreService.scheduleChore(replicationMarkerChore);
}
// Leases is not a Thread. Internally it runs a daemon thread. If it gets // Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit. // an unhandled exception, it will just exit.
Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker", Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker",
@ -2027,6 +2053,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
brokenStoreFileCleanerPeriod, this, conf, this); brokenStoreFileCleanerPeriod, this, conf, this);
registerConfigurationObservers(); registerConfigurationObservers();
initializeReplicationMarkerChore();
} }
private void registerConfigurationObservers() { private void registerConfigurationObservers() {
@ -3592,6 +3619,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
shutdownChore(fsUtilizationChore); shutdownChore(fsUtilizationChore);
shutdownChore(namedQueueServiceChore); shutdownChore(namedQueueServiceChore);
shutdownChore(brokenStoreFileCleaner); shutdownChore(brokenStoreFileCleaner);
shutdownChore(replicationMarkerChore);
} }
@Override @Override

View File

@ -17,10 +17,13 @@
*/ */
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.function.Function; import java.util.function.Function;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -31,6 +34,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink; import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
@ -231,4 +235,12 @@ public class WALUtil {
cells.trimToSize(); cells.trimToSize();
} }
} }
public static void writeReplicationMarkerAndSync(WAL wal, MultiVersionConcurrencyControl mvcc,
RegionInfo regionInfo, byte[] rowKey, long timestamp) throws IOException {
NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
replicationScope.put(WALEdit.METAFAMILY, REPLICATION_SCOPE_GLOBAL);
writeMarker(wal, replicationScope, regionInfo,
WALEdit.createReplicationMarkerEdit(rowKey, timestamp), mvcc, null, null);
}
} }

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.master;
import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This will create {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} table if
* hbase.regionserver.replication.sink.tracker.enabled config key is enabled and table not created
**/
@InterfaceAudience.Private
public final class ReplicationSinkTrackerTableCreator {
private static final Logger LOG =
LoggerFactory.getLogger(ReplicationSinkTrackerTableCreator.class);
private static final Long TTL = TimeUnit.DAYS.toSeconds(365); // 1 year in seconds
public static final byte[] RS_COLUMN = Bytes.toBytes("region_server_name");
public static final byte[] WAL_NAME_COLUMN = Bytes.toBytes("wal_name");
public static final byte[] TIMESTAMP_COLUMN = Bytes.toBytes("timestamp");
public static final byte[] OFFSET_COLUMN = Bytes.toBytes("offset");
/** Will create {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} table if this conf is enabled **/
public static final String REPLICATION_SINK_TRACKER_ENABLED_KEY =
"hbase.regionserver.replication.sink.tracker.enabled";
public static final boolean REPLICATION_SINK_TRACKER_ENABLED_DEFAULT = false;
/** The {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} info family as a string */
private static final String REPLICATION_SINK_TRACKER_INFO_FAMILY_STR = "info";
/** The {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} info family in array of bytes */
public static final byte[] REPLICATION_SINK_TRACKER_INFO_FAMILY =
Bytes.toBytes(REPLICATION_SINK_TRACKER_INFO_FAMILY_STR);
public static final String REPLICATION_SINK_TRACKER_TABLE_NAME_STR = "REPLICATION.SINK_TRACKER";
/* Private default constructor */
private ReplicationSinkTrackerTableCreator() {
}
/**
* {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} table name - can be enabled with config -
* hbase.regionserver.replication.sink.tracker.enabled
*/
public static final TableName REPLICATION_SINK_TRACKER_TABLE_NAME =
TableName.valueOf(REPLICATION_SINK_TRACKER_TABLE_NAME_STR);
private static final TableDescriptorBuilder TABLE_DESCRIPTOR_BUILDER = TableDescriptorBuilder
.newBuilder(REPLICATION_SINK_TRACKER_TABLE_NAME).setRegionReplication(1)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(REPLICATION_SINK_TRACKER_INFO_FAMILY)
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBlockCacheEnabled(false).setMaxVersions(1)
.setTimeToLive(TTL.intValue()).build());
/*
* We will create this table only if hbase.regionserver.replication.sink.tracker.enabled is
* enabled and table doesn't exists already.
*/
public static void createIfNeededAndNotExists(Configuration conf, MasterServices masterServices)
throws IOException {
boolean replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY,
REPLICATION_SINK_TRACKER_ENABLED_DEFAULT);
if (!replicationSinkTrackerEnabled) {
LOG.info("replication sink tracker requests logging to table {} is disabled." + " Quitting.",
REPLICATION_SINK_TRACKER_TABLE_NAME_STR);
return;
}
if (!masterServices.getTableDescriptors().exists(REPLICATION_SINK_TRACKER_TABLE_NAME)) {
LOG.info("{} table not found. Creating.", REPLICATION_SINK_TRACKER_TABLE_NAME_STR);
masterServices.createTable(TABLE_DESCRIPTOR_BUILDER.build(), null, 0L, NO_NONCE);
}
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This chore is responsible to create replication marker rows with special WALEdit with family as
* {@link org.apache.hadoop.hbase.wal.WALEdit#METAFAMILY} and column qualifier as
* {@link WALEdit#REPLICATION_MARKER} and empty value. If config key
* {@link #REPLICATION_MARKER_ENABLED_KEY} is set to true, then we will create 1 marker row every
* {@link #REPLICATION_MARKER_CHORE_DURATION_KEY} ms
* {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader} will populate
* the Replication Marker edit with region_server_name, wal_name and wal_offset encoded in
* {@link org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ReplicationMarkerDescriptor}
* object. {@link org.apache.hadoop.hbase.replication.regionserver.Replication} will change the
* REPLICATION_SCOPE for this edit to GLOBAL so that it can replicate. On the sink cluster,
* {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSink} will convert the
* ReplicationMarkerDescriptor into a Put mutation to REPLICATION_SINK_TRACKER_TABLE_NAME_STR table.
*/
@InterfaceAudience.Private
public class ReplicationMarkerChore extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationMarkerChore.class);
private static final MultiVersionConcurrencyControl MVCC = new MultiVersionConcurrencyControl();
public static final RegionInfo REGION_INFO =
RegionInfoBuilder.newBuilder(REPLICATION_SINK_TRACKER_TABLE_NAME).build();
private static final String DELIMITER = "_";
private final Configuration conf;
private final RegionServerServices rsServices;
private WAL wal;
public static final String REPLICATION_MARKER_ENABLED_KEY =
"hbase.regionserver.replication.marker.enabled";
public static final boolean REPLICATION_MARKER_ENABLED_DEFAULT = false;
public static final String REPLICATION_MARKER_CHORE_DURATION_KEY =
"hbase.regionserver.replication.marker.chore.duration";
public static final int REPLICATION_MARKER_CHORE_DURATION_DEFAULT = 30 * 1000; // 30 seconds
public ReplicationMarkerChore(final Stoppable stopper, final RegionServerServices rsServices,
int period, Configuration conf) {
super("ReplicationTrackerChore", stopper, period);
this.conf = conf;
this.rsServices = rsServices;
}
@Override
protected void chore() {
if (wal == null) {
try {
wal = rsServices.getWAL(null);
} catch (IOException ioe) {
LOG.warn("Unable to get WAL ", ioe);
// Shouldn't happen. Ignore and wait for the next chore run.
return;
}
}
String serverName = rsServices.getServerName().getServerName();
long timeStamp = EnvironmentEdgeManager.currentTime();
// We only have timestamp in ReplicationMarkerDescriptor and the remaining properties walname,
// regionserver name and wal offset at ReplicationSourceWALReaderThread.
byte[] rowKey = getRowKey(serverName, timeStamp);
if (LOG.isTraceEnabled()) {
LOG.trace("Creating replication marker edit.");
}
try {
WALUtil.writeReplicationMarkerAndSync(wal, MVCC, REGION_INFO, rowKey, timeStamp);
} catch (IOException ioe) {
LOG.error("Exception while sync'ing replication tracker edit", ioe);
// TODO: Should we stop region server or add a metric and keep going.
}
}
/**
* Creates a rowkey with region server name and timestamp.
* @param serverName region server name
* @param timestamp timestamp n
*/
public static byte[] getRowKey(String serverName, long timestamp) {
// converting to string since this will help seeing the timestamp in string format using
// hbase shell commands.
String timestampStr = String.valueOf(timestamp);
final String rowKeyStr = serverName + DELIMITER + timestampStr;
return Bytes.toBytes(rowKeyStr);
}
}

View File

@ -17,6 +17,15 @@
*/ */
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_INFO_FAMILY;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -62,6 +71,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
@ -100,6 +110,7 @@ public class ReplicationSink {
* Row size threshold for multi requests above which a warning is logged * Row size threshold for multi requests above which a warning is logged
*/ */
private final int rowSizeWarnThreshold; private final int rowSizeWarnThreshold;
private boolean replicationSinkTrackerEnabled;
/** /**
* Create a sink for replication * Create a sink for replication
@ -110,6 +121,8 @@ public class ReplicationSink {
this.conf = HBaseConfiguration.create(conf); this.conf = HBaseConfiguration.create(conf);
rowSizeWarnThreshold = rowSizeWarnThreshold =
conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY,
REPLICATION_SINK_TRACKER_ENABLED_DEFAULT);
decorateConf(); decorateConf();
this.metrics = new MetricsSink(); this.metrics = new MetricsSink();
this.walEntrySinkFilter = setupWALEntrySinkFilter(); this.walEntrySinkFilter = setupWALEntrySinkFilter();
@ -225,6 +238,17 @@ public class ReplicationSink {
bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>()); bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
} }
} else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) {
Mutation put = processReplicationMarkerEntry(cell);
if (put == null) {
continue;
}
List<UUID> clusterIds = new ArrayList<>();
for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
clusterIds.add(toUUID(clusterId));
}
put.setClusterIds(clusterIds);
addToHashMultiMap(rowMap, table, clusterIds, put);
} else { } else {
// Handle wal replication // Handle wal replication
if (isNewRowOrType(previousCell, cell)) { if (isNewRowOrType(previousCell, cell)) {
@ -289,6 +313,33 @@ public class ReplicationSink {
} }
} }
/*
* First check if config key hbase.regionserver.replication.sink.tracker.enabled is true or not.
* If false, then ignore this cell. If set to true, de-serialize value into
* ReplicationTrackerDescriptor. Create a Put mutation with regionserver name, walname, offset and
* timestamp from ReplicationMarkerDescriptor.
*/
private Put processReplicationMarkerEntry(Cell cell) throws IOException {
// If source is emitting replication marker rows but sink is not accepting them,
// ignore the edits.
if (!replicationSinkTrackerEnabled) {
return null;
}
WALProtos.ReplicationMarkerDescriptor descriptor =
WALProtos.ReplicationMarkerDescriptor.parseFrom(new ByteArrayInputStream(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength()));
Put put = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, RS_COLUMN, cell.getTimestamp(),
(Bytes.toBytes(descriptor.getRegionServerName())));
put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, WAL_NAME_COLUMN, cell.getTimestamp(),
Bytes.toBytes(descriptor.getWalName()));
put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, TIMESTAMP_COLUMN, cell.getTimestamp(),
Bytes.toBytes(cell.getTimestamp()));
put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, OFFSET_COLUMN, cell.getTimestamp(),
Bytes.toBytes(descriptor.getOffset()));
return put;
}
private void buildBulkLoadHFileMap( private void buildBulkLoadHFileMap(
final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table, final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
BulkLoadDescriptor bld) throws IOException { BulkLoadDescriptor bld) throws IOException {

View File

@ -825,4 +825,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
public String logPeerId() { public String logPeerId() {
return "peerId=" + this.getPeerId() + ","; return "peerId=" + this.getPeerId() + ",";
} }
// Visible for testing purpose
public long getTotalReplicatedEdits() {
return totalReplicatedEdits.get();
}
} }

View File

@ -69,6 +69,10 @@ class ReplicationSourceWALActionListener implements WALActionsListener {
if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) { if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
return; return;
} }
// Allow replication marker row to pass through.
if (WALEdit.isReplicationMarkerEdit(logEdit)) {
return;
}
// For replay, or if all the cells are markers, do not need to store replication scope. // For replay, or if all the cells are markers, do not need to store replication scope.
if ( if (
logEdit.isReplay() || logEdit.getCells().stream().allMatch(c -> WALEdit.isMetaEditFamily(c)) logEdit.isReplay() || logEdit.getCells().stream().allMatch(c -> WALEdit.isMetaEditFamily(c))

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -30,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
@ -42,6 +44,9 @@ import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
@ -178,6 +183,7 @@ class ReplicationSourceWALReader extends Thread {
} }
LOG.trace("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}", LOG.trace("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}",
entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId()); entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId());
updateReplicationMarkerEdit(entry, batch.getLastWalPosition());
long entrySize = getEntrySizeIncludeBulkLoad(entry); long entrySize = getEntrySizeIncludeBulkLoad(entry);
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
batch.addEntry(entry, entrySize); batch.addEntry(entry, entrySize);
@ -451,6 +457,36 @@ class ReplicationSourceWALReader extends Thread {
return totalStoreFilesSize; return totalStoreFilesSize;
} }
/*
* Create @ReplicationMarkerDescriptor with region_server_name, wal_name and offset and set to
* cell's value.
*/
private void updateReplicationMarkerEdit(Entry entry, long offset) {
WALEdit edit = entry.getEdit();
// Return early if it is not ReplicationMarker edit.
if (!WALEdit.isReplicationMarkerEdit(edit)) {
return;
}
List<Cell> cells = edit.getCells();
Preconditions.checkArgument(cells.size() == 1, "ReplicationMarker should have only 1 cell");
Cell cell = cells.get(0);
// Create a descriptor with region_server_name, wal_name and offset
WALProtos.ReplicationMarkerDescriptor.Builder builder =
WALProtos.ReplicationMarkerDescriptor.newBuilder();
builder.setRegionServerName(this.source.getServer().getServerName().getHostname());
builder.setWalName(getCurrentPath().getName());
builder.setOffset(offset);
WALProtos.ReplicationMarkerDescriptor descriptor = builder.build();
// Create a new KeyValue
KeyValue kv = new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell), cell.getTimestamp(), descriptor.toByteArray());
ArrayList<Cell> newCells = new ArrayList<>();
newCells.add(kv);
// Update edit with new cell.
edit.setCells(newCells);
}
/** /**
* @param size delta size for grown buffer * @param size delta size for grown buffer
* @return true if we should clear buffer and push all * @return true if we should clear buffer and push all

View File

@ -132,6 +132,23 @@ public class WALEdit implements HeapSize {
@InterfaceAudience.Private @InterfaceAudience.Private
public static final byte[] BULK_LOAD = Bytes.toBytes("HBASE::BULK_LOAD"); public static final byte[] BULK_LOAD = Bytes.toBytes("HBASE::BULK_LOAD");
/**
* Periodically {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore}
* will create marker edits with family as {@link WALEdit#METAFAMILY} and
* {@link WALEdit#REPLICATION_MARKER} as qualifier and an empty value.
* org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader will populate the
* Replication Marker edit with region_server_name, wal_name and wal_offset encoded in
* {@link org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ReplicationMarkerDescriptor}
* object.
* {@link org.apache.hadoop.hbase.replication.regionserver.Replication} will change the
* REPLICATION_SCOPE for this edit to GLOBAL so that it can replicate. On the sink cluster,
* {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSink} will convert the
* ReplicationMarkerDescriptor into a Put mutation to REPLICATION_SINK_TRACKER_TABLE_NAME_STR
* table.
*/
@InterfaceAudience.Private
public static final byte[] REPLICATION_MARKER = Bytes.toBytes("HBASE::REPLICATION_MARKER");
private final transient boolean replay; private final transient boolean replay;
private ArrayList<Cell> cells; private ArrayList<Cell> cells;
@ -480,4 +497,28 @@ public class WALEdit implements HeapSize {
this.cells.add(cell); this.cells.add(cell);
return this; return this;
} }
/**
* Creates a replication tracker edit with {@link #METAFAMILY} family and
* {@link #REPLICATION_MARKER} qualifier and has null value.
* @param rowKey rowkey
* @param timestamp timestamp
*/
public static WALEdit createReplicationMarkerEdit(byte[] rowKey, long timestamp) {
KeyValue kv =
new KeyValue(rowKey, METAFAMILY, REPLICATION_MARKER, timestamp, KeyValue.Type.Put);
return new WALEdit().add(kv);
}
/**
* Checks whether this edit is a replication marker edit.
* @param edit edit
* @return true if the cell within an edit has column = METAFAMILY and qualifier =
* REPLICATION_MARKER, false otherwise
*/
public static boolean isReplicationMarkerEdit(WALEdit edit) {
// Check just the first cell from the edit. ReplicationMarker edit will have only 1 cell.
return edit.getCells().size() == 1
&& CellUtil.matchingColumn(edit.getCells().get(0), METAFAMILY, REPLICATION_MARKER);
}
} }

View File

@ -319,6 +319,14 @@ public class WALSplitter {
Entry entry; Entry entry;
startTS = EnvironmentEdgeManager.currentTime(); startTS = EnvironmentEdgeManager.currentTime();
while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) { while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) {
if (WALEdit.isReplicationMarkerEdit(entry.getEdit())) {
// This condition is strictly not required since the regionid present in the edit is
// invalid, so it will skip processing this edit.
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring Replication marker edits.");
}
continue;
}
byte[] region = entry.getKey().getEncodedRegionName(); byte[] region = entry.getKey().getEncodedRegionName();
String encodedRegionNameAsStr = Bytes.toString(region); String encodedRegionNameAsStr = Bytes.toString(region);
Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY;
import static org.apache.hadoop.hbase.wal.WALEdit.REPLICATION_MARKER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ SmallTests.class })
public class TestWALEdit {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALEdit.class);
private static final String RS_NAME = "test-region-server-name";
/**
* Tests that
* {@link org.apache.hadoop.hbase.wal.WALEdit#createReplicationMarkerEdit(byte[], long)} method is
* creating WALEdit with correct family and qualifier.
*/
@Test
public void testCreateReplicationMarkerEdit() {
long timestamp = EnvironmentEdgeManager.currentTime();
byte[] rowkey = ReplicationMarkerChore.getRowKey(RS_NAME, timestamp);
WALEdit edit = WALEdit.createReplicationMarkerEdit(rowkey, timestamp);
assertEquals(1, edit.getCells().size());
Cell cell = edit.getCells().get(0);
assertTrue(CellUtil.matchingFamily(cell, METAFAMILY));
assertTrue(CellUtil.matchingQualifier(cell, REPLICATION_MARKER));
assertTrue(WALEdit.isReplicationMarkerEdit(edit));
}
}

View File

@ -0,0 +1,265 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN;
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY;
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This test creates 2 mini hbase cluster. One cluster with
* "hbase.regionserver.replication.marker.enabled" conf key. This will create
* {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore} which will create
* marker rows to be replicated to sink cluster. Second cluster with
* "hbase.regionserver.replication.sink.tracker.enabled" conf key enabled. This will persist the
* marker rows coming from peer cluster to persist to REPLICATION.SINK_TRACKER table.
**/
@Category({ ReplicationTests.class, MediumTests.class })
public class TestReplicationMarker {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationMarker.class);
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationMarker.class);
private static Configuration conf1;
private static Configuration conf2;
private static HBaseTestingUtil utility1;
private static HBaseTestingUtil utility2;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1 = HBaseConfiguration.create();
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
conf2 = new Configuration(conf1);
// Run the replication marker chore in cluster1.
conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
conf1.setLong(REPLICATION_MARKER_CHORE_DURATION_KEY, 1000); // 1 sec
utility1 = new HBaseTestingUtil(conf1);
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
// Enable the replication sink tracker for cluster 2
conf2.setBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY, true);
utility2 = new HBaseTestingUtil(conf2);
// Start cluster 2 first so that hbase:replicationsinktracker table gets created first.
utility2.startMiniCluster(1);
waitForReplicationTrackerTableCreation();
// Start cluster1
utility1.startMiniCluster(1);
Admin admin1 = utility1.getAdmin();
ReplicationPeerConfigBuilder rpcBuilder = ReplicationPeerConfig.newBuilder();
rpcBuilder.setClusterKey(utility2.getClusterKey());
admin1.addReplicationPeer("1", rpcBuilder.build());
ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0)
.getReplicationSourceService().getReplicationManager();
// Wait until the peer gets established.
Waiter.waitFor(conf1, 10000, (Waiter.Predicate) () -> manager.getSources().size() == 1);
}
private static void waitForReplicationTrackerTableCreation() {
Waiter.waitFor(conf2, 10000, (Waiter.Predicate) () -> utility2.getAdmin()
.tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME));
}
@AfterClass
public static void tearDown() throws Exception {
utility1.shutdownMiniCluster();
utility2.shutdownMiniCluster();
}
@Test
public void testReplicationMarkerRow() throws Exception {
// We have configured ReplicationTrackerChore to run every second. Sleeping so that it will
// create enough sentinel rows.
Thread.sleep(5000);
WAL wal1 = utility1.getHBaseCluster().getRegionServer(0).getWAL(null);
String walName1ForCluster1 = ((AbstractFSWAL) wal1).getCurrentFileName().getName();
String rs1Name = utility1.getHBaseCluster().getRegionServer(0).getServerName().getHostname();
// Since we sync the marker edits while appending to wal, all the edits should be visible
// to Replication threads immediately.
assertTrue(getReplicatedEntries() >= 5);
// Force log roll.
wal1.rollWriter(true);
String walName2ForCluster1 = ((AbstractFSWAL) wal1).getCurrentFileName().getName();
Connection connection2 = utility2.getMiniHBaseCluster().getRegionServer(0).getConnection();
// Sleep for 5 more seconds to get marker rows with new wal name.
Thread.sleep(5000);
// Wait for cluster 2 to have atleast 8 tracker rows from cluster1.
utility2.waitFor(5000, () -> getTableCount(connection2) >= 8);
// Get replication marker rows from cluster2
List<ReplicationSinkTrackerRow> list = getRows(connection2);
for (ReplicationSinkTrackerRow desc : list) {
// All the tracker rows should have same region server name i.e. rs of cluster1
assertEquals(rs1Name, desc.getRegionServerName());
// All the tracker rows will have either wal1 or wal2 name.
assertTrue(walName1ForCluster1.equals(desc.getWalName())
|| walName2ForCluster1.equals(desc.getWalName()));
}
// This table shouldn't exist on cluster1 since
// hbase.regionserver.replication.sink.tracker.enabled is not enabled on this cluster.
assertFalse(utility1.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME));
// This table shouldn't exist on cluster1 since
// hbase.regionserver.replication.sink.tracker.enabled is enabled on this cluster.
assertTrue(utility2.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME));
}
/*
* Get rows for replication sink tracker table.
*/
private List<ReplicationSinkTrackerRow> getRows(Connection connection) throws IOException {
List<ReplicationSinkTrackerRow> list = new ArrayList<>();
Scan scan = new Scan();
Table table = connection.getTable(REPLICATION_SINK_TRACKER_TABLE_NAME);
ResultScanner scanner = table.getScanner(scan);
Result r;
while ((r = scanner.next()) != null) {
List<Cell> cells = r.listCells();
list.add(getPayload(cells));
}
return list;
}
private ReplicationSinkTrackerRow getPayload(List<Cell> cells) {
String rsName = null, walName = null;
Long offset = null;
long timestamp = 0L;
for (Cell cell : cells) {
byte[] qualifier = CellUtil.cloneQualifier(cell);
byte[] value = CellUtil.cloneValue(cell);
if (Bytes.equals(RS_COLUMN, qualifier)) {
rsName = Bytes.toString(value);
} else if (Bytes.equals(WAL_NAME_COLUMN, qualifier)) {
walName = Bytes.toString(value);
} else if (Bytes.equals(TIMESTAMP_COLUMN, qualifier)) {
timestamp = Bytes.toLong(value);
} else if (Bytes.equals(OFFSET_COLUMN, qualifier)) {
offset = Bytes.toLong(value);
}
}
ReplicationSinkTrackerRow row =
new ReplicationSinkTrackerRow(rsName, walName, timestamp, offset);
return row;
}
class ReplicationSinkTrackerRow {
private String region_server_name;
private String wal_name;
private long timestamp;
private long offset;
public ReplicationSinkTrackerRow(String region_server_name, String wal_name, long timestamp,
long offset) {
this.region_server_name = region_server_name;
this.wal_name = wal_name;
this.timestamp = timestamp;
this.offset = offset;
}
public String getRegionServerName() {
return region_server_name;
}
public String getWalName() {
return wal_name;
}
public long getTimestamp() {
return timestamp;
}
public long getOffset() {
return offset;
}
@Override
public String toString() {
return "ReplicationSinkTrackerRow{" + "region_server_name='" + region_server_name + '\''
+ ", wal_name='" + wal_name + '\'' + ", timestamp=" + timestamp + ", offset=" + offset
+ '}';
}
}
private int getTableCount(Connection connection) throws Exception {
Table table = connection.getTable(REPLICATION_SINK_TRACKER_TABLE_NAME);
ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM));
int count = 0;
while (resultScanner.next() != null) {
count++;
}
LOG.info("Table count: " + count);
return count;
}
/*
* Return replicated entries from cluster1.
*/
private long getReplicatedEntries() {
ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0)
.getReplicationSourceService().getReplicationManager();
List<ReplicationSourceInterface> sources = manager.getSources();
assertEquals(1, sources.size());
ReplicationSource source = (ReplicationSource) sources.get(0);
return source.getTotalReplicatedEdits();
}
}

View File

@ -17,6 +17,10 @@
*/ */
package org.apache.hadoop.hbase.wal; package org.apache.hadoop.hbase.wal;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.getRowKey;
import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY;
import static org.apache.hadoop.hbase.wal.WALEdit.REPLICATION_MARKER;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -59,10 +63,12 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter; import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -484,6 +490,39 @@ public class TestWALSplit {
assertEquals(11, countWAL(splitLog[0])); assertEquals(11, countWAL(splitLog[0]));
} }
/*
* Tests that WalSplitter ignores replication marker edits.
*/
@Test(timeout = 30000)
public void testSplitRemovesReplicationMarkerEdits() throws IOException {
RegionInfo regionInfo = ReplicationMarkerChore.REGION_INFO;
Path path = new Path(WALDIR, WAL_FILE_PREFIX + "1");
generateReplicationMarkerEdits(path, regionInfo);
useDifferentDFSClient();
List<FileStatus> logFiles =
SplitLogManager.getFileList(conf, Collections.singletonList(WALDIR), null);
assertEquals(1, logFiles.size());
assertEquals(path, logFiles.get(0).getPath());
List<Path> splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
// Make sure that WALSplitter doesn't fail.
assertEquals(0, splitPaths.size());
}
private void generateReplicationMarkerEdits(Path path, RegionInfo regionInfo) throws IOException {
long timestamp = EnvironmentEdgeManager.currentTime();
fs.mkdirs(WALDIR);
try (Writer writer = wals.createWALWriter(fs, path)) {
WALProtos.ReplicationMarkerDescriptor.Builder builder =
WALProtos.ReplicationMarkerDescriptor.newBuilder();
builder.setWalName("wal-name");
builder.setRegionServerName("rs-name");
builder.setOffset(0L);
WALProtos.ReplicationMarkerDescriptor desc = builder.build();
appendEntry(writer, REPLICATION_SINK_TRACKER_TABLE_NAME, regionInfo.getEncodedNameAsBytes(),
getRowKey(desc.getRegionServerName(), timestamp), METAFAMILY, REPLICATION_MARKER, VALUE, 1);
}
}
/** /**
* @param expectedEntries -1 to not assert * @param expectedEntries -1 to not assert
* @return the count across all regions * @return the count across all regions