diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto index 48a108bb8a7..ba12dcf3edf 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/region/WAL.proto @@ -182,3 +182,12 @@ message RegionEventDescriptor { */ message WALTrailer { } + +/** + * Special WAL entry for replication marker event. + */ +message ReplicationMarkerDescriptor { + required string region_server_name = 1; + required string wal_name = 2; + required uint64 offset = 3; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index b6aea311ccb..4d381a7fd3b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -218,6 +218,7 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; +import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator; import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint; import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager; @@ -1247,6 +1248,8 @@ public class HMaster extends HBaseServerBase implements Maste slowLogMasterService.init(); WALEventTrackerTableCreator.createIfNeededAndNotExists(conf, this); + // Create REPLICATION.SINK_TRACKER table if needed. + ReplicationSinkTrackerTableCreator.createIfNeededAndNotExists(conf, this); // clear the dead servers with same host name and port of online server because we are not // removing dead server with same hostname and port of rs which is trying to check in before diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java index 6e88cf9cbc2..bd47f24df6c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java @@ -143,5 +143,4 @@ public class NamedQueueRecorder { this.logEventHandler.persistAll(namedQueueEvent, connection); } } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 76a64cfe4ba..b8a7609ca5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -25,6 +25,10 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; import static org.apache.hadoop.hbase.HConstants.NAMED_QUEUE_CHORE_DURATION_DEFAULT; import static org.apache.hadoop.hbase.HConstants.WAL_EVENT_TRACKER_ENABLED_DEFAULT; import static org.apache.hadoop.hbase.HConstants.WAL_EVENT_TRACKER_ENABLED_KEY; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY; import io.opentelemetry.api.trace.Span; @@ -140,6 +144,7 @@ import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.security.SecurityConstants; @@ -473,6 +478,11 @@ public class HRegionServer extends HBaseServerBase private RegionReplicationBufferManager regionReplicationBufferManager; + /* + * Chore that creates replication marker rows. + */ + private ReplicationMarkerChore replicationMarkerChore; + /** * Starts a HRegionServer at the default location. *

@@ -758,6 +768,17 @@ public class HRegionServer extends HBaseServerBase || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp()); } + private void initializeReplicationMarkerChore() { + boolean replicationMarkerEnabled = + conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT); + // If replication or replication marker is not enabled then return immediately. + if (replicationMarkerEnabled) { + int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY, + REPLICATION_MARKER_CHORE_DURATION_DEFAULT); + replicationMarkerChore = new ReplicationMarkerChore(this, this, period, conf); + } + } + /** * The HRegionServer sticks in this loop until closed. */ @@ -1918,6 +1939,11 @@ public class HRegionServer extends HBaseServerBase choreService.scheduleChore(brokenStoreFileCleaner); } + if (replicationMarkerChore != null) { + LOG.info("Starting replication marker chore"); + choreService.scheduleChore(replicationMarkerChore); + } + // Leases is not a Thread. Internally it runs a daemon thread. If it gets // an unhandled exception, it will just exit. Threads.setDaemonThreadRunning(this.leaseManager, getName() + ".leaseChecker", @@ -2027,6 +2053,7 @@ public class HRegionServer extends HBaseServerBase brokenStoreFileCleanerPeriod, this, conf, this); registerConfigurationObservers(); + initializeReplicationMarkerChore(); } private void registerConfigurationObservers() { @@ -3592,6 +3619,7 @@ public class HRegionServer extends HBaseServerBase shutdownChore(fsUtilizationChore); shutdownChore(namedQueueServiceChore); shutdownChore(brokenStoreFileCleaner); + shutdownChore(replicationMarkerChore); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java index 243e5eb983e..ffde2b03431 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java @@ -17,10 +17,13 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL; + import java.io.IOException; import java.util.ArrayList; import java.util.Map; import java.util.NavigableMap; +import java.util.TreeMap; import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -31,6 +34,7 @@ import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationSink; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.wal.WAL; @@ -231,4 +235,12 @@ public class WALUtil { cells.trimToSize(); } } + + public static void writeReplicationMarkerAndSync(WAL wal, MultiVersionConcurrencyControl mvcc, + RegionInfo regionInfo, byte[] rowKey, long timestamp) throws IOException { + NavigableMap replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR); + replicationScope.put(WALEdit.METAFAMILY, REPLICATION_SCOPE_GLOBAL); + writeMarker(wal, replicationScope, regionInfo, + WALEdit.createReplicationMarkerEdit(rowKey, timestamp), mvcc, null, null); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java new file mode 100644 index 00000000000..20181a90c39 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationSinkTrackerTableCreator.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.master; + +import static org.apache.hadoop.hbase.HConstants.NO_NONCE; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This will create {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} table if + * hbase.regionserver.replication.sink.tracker.enabled config key is enabled and table not created + **/ +@InterfaceAudience.Private +public final class ReplicationSinkTrackerTableCreator { + private static final Logger LOG = + LoggerFactory.getLogger(ReplicationSinkTrackerTableCreator.class); + private static final Long TTL = TimeUnit.DAYS.toSeconds(365); // 1 year in seconds + + public static final byte[] RS_COLUMN = Bytes.toBytes("region_server_name"); + public static final byte[] WAL_NAME_COLUMN = Bytes.toBytes("wal_name"); + public static final byte[] TIMESTAMP_COLUMN = Bytes.toBytes("timestamp"); + public static final byte[] OFFSET_COLUMN = Bytes.toBytes("offset"); + + /** Will create {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} table if this conf is enabled **/ + public static final String REPLICATION_SINK_TRACKER_ENABLED_KEY = + "hbase.regionserver.replication.sink.tracker.enabled"; + public static final boolean REPLICATION_SINK_TRACKER_ENABLED_DEFAULT = false; + + /** The {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} info family as a string */ + private static final String REPLICATION_SINK_TRACKER_INFO_FAMILY_STR = "info"; + + /** The {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} info family in array of bytes */ + public static final byte[] REPLICATION_SINK_TRACKER_INFO_FAMILY = + Bytes.toBytes(REPLICATION_SINK_TRACKER_INFO_FAMILY_STR); + + public static final String REPLICATION_SINK_TRACKER_TABLE_NAME_STR = "REPLICATION.SINK_TRACKER"; + + /* Private default constructor */ + private ReplicationSinkTrackerTableCreator() { + } + + /** + * {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} table name - can be enabled with config - + * hbase.regionserver.replication.sink.tracker.enabled + */ + public static final TableName REPLICATION_SINK_TRACKER_TABLE_NAME = + TableName.valueOf(REPLICATION_SINK_TRACKER_TABLE_NAME_STR); + + private static final TableDescriptorBuilder TABLE_DESCRIPTOR_BUILDER = TableDescriptorBuilder + .newBuilder(REPLICATION_SINK_TRACKER_TABLE_NAME).setRegionReplication(1) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(REPLICATION_SINK_TRACKER_INFO_FAMILY) + .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBlockCacheEnabled(false).setMaxVersions(1) + .setTimeToLive(TTL.intValue()).build()); + + /* + * We will create this table only if hbase.regionserver.replication.sink.tracker.enabled is + * enabled and table doesn't exists already. + */ + public static void createIfNeededAndNotExists(Configuration conf, MasterServices masterServices) + throws IOException { + boolean replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY, + REPLICATION_SINK_TRACKER_ENABLED_DEFAULT); + if (!replicationSinkTrackerEnabled) { + LOG.info("replication sink tracker requests logging to table {} is disabled." + " Quitting.", + REPLICATION_SINK_TRACKER_TABLE_NAME_STR); + return; + } + if (!masterServices.getTableDescriptors().exists(REPLICATION_SINK_TRACKER_TABLE_NAME)) { + LOG.info("{} table not found. Creating.", REPLICATION_SINK_TRACKER_TABLE_NAME_STR); + masterServices.createTable(TABLE_DESCRIPTOR_BUILDER.build(), null, 0L, NO_NONCE); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationMarkerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationMarkerChore.java new file mode 100644 index 00000000000..9c96bc780b4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationMarkerChore.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME; + +import java.io.IOException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.wal.WALUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This chore is responsible to create replication marker rows with special WALEdit with family as + * {@link org.apache.hadoop.hbase.wal.WALEdit#METAFAMILY} and column qualifier as + * {@link WALEdit#REPLICATION_MARKER} and empty value. If config key + * {@link #REPLICATION_MARKER_ENABLED_KEY} is set to true, then we will create 1 marker row every + * {@link #REPLICATION_MARKER_CHORE_DURATION_KEY} ms + * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader} will populate + * the Replication Marker edit with region_server_name, wal_name and wal_offset encoded in + * {@link org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ReplicationMarkerDescriptor} + * object. {@link org.apache.hadoop.hbase.replication.regionserver.Replication} will change the + * REPLICATION_SCOPE for this edit to GLOBAL so that it can replicate. On the sink cluster, + * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSink} will convert the + * ReplicationMarkerDescriptor into a Put mutation to REPLICATION_SINK_TRACKER_TABLE_NAME_STR table. + */ +@InterfaceAudience.Private +public class ReplicationMarkerChore extends ScheduledChore { + private static final Logger LOG = LoggerFactory.getLogger(ReplicationMarkerChore.class); + private static final MultiVersionConcurrencyControl MVCC = new MultiVersionConcurrencyControl(); + public static final RegionInfo REGION_INFO = + RegionInfoBuilder.newBuilder(REPLICATION_SINK_TRACKER_TABLE_NAME).build(); + private static final String DELIMITER = "_"; + private final Configuration conf; + private final RegionServerServices rsServices; + private WAL wal; + + public static final String REPLICATION_MARKER_ENABLED_KEY = + "hbase.regionserver.replication.marker.enabled"; + public static final boolean REPLICATION_MARKER_ENABLED_DEFAULT = false; + + public static final String REPLICATION_MARKER_CHORE_DURATION_KEY = + "hbase.regionserver.replication.marker.chore.duration"; + public static final int REPLICATION_MARKER_CHORE_DURATION_DEFAULT = 30 * 1000; // 30 seconds + + public ReplicationMarkerChore(final Stoppable stopper, final RegionServerServices rsServices, + int period, Configuration conf) { + super("ReplicationTrackerChore", stopper, period); + this.conf = conf; + this.rsServices = rsServices; + } + + @Override + protected void chore() { + if (wal == null) { + try { + wal = rsServices.getWAL(null); + } catch (IOException ioe) { + LOG.warn("Unable to get WAL ", ioe); + // Shouldn't happen. Ignore and wait for the next chore run. + return; + } + } + String serverName = rsServices.getServerName().getServerName(); + long timeStamp = EnvironmentEdgeManager.currentTime(); + // We only have timestamp in ReplicationMarkerDescriptor and the remaining properties walname, + // regionserver name and wal offset at ReplicationSourceWALReaderThread. + byte[] rowKey = getRowKey(serverName, timeStamp); + if (LOG.isTraceEnabled()) { + LOG.trace("Creating replication marker edit."); + } + try { + WALUtil.writeReplicationMarkerAndSync(wal, MVCC, REGION_INFO, rowKey, timeStamp); + } catch (IOException ioe) { + LOG.error("Exception while sync'ing replication tracker edit", ioe); + // TODO: Should we stop region server or add a metric and keep going. + } + } + + /** + * Creates a rowkey with region server name and timestamp. + * @param serverName region server name + * @param timestamp timestamp n + */ + public static byte[] getRowKey(String serverName, long timestamp) { + // converting to string since this will help seeing the timestamp in string format using + // hbase shell commands. + String timestampStr = String.valueOf(timestamp); + final String rowKeyStr = serverName + DELIMITER + timestampStr; + return Bytes.toBytes(rowKeyStr); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index e40f4bbf21b..60f149a6502 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -17,6 +17,15 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_DEFAULT; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_INFO_FAMILY; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN; + +import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -62,6 +71,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; @@ -100,6 +110,7 @@ public class ReplicationSink { * Row size threshold for multi requests above which a warning is logged */ private final int rowSizeWarnThreshold; + private boolean replicationSinkTrackerEnabled; /** * Create a sink for replication @@ -110,6 +121,8 @@ public class ReplicationSink { this.conf = HBaseConfiguration.create(conf); rowSizeWarnThreshold = conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); + replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY, + REPLICATION_SINK_TRACKER_ENABLED_DEFAULT); decorateConf(); this.metrics = new MetricsSink(); this.walEntrySinkFilter = setupWALEntrySinkFilter(); @@ -225,6 +238,17 @@ public class ReplicationSink { bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>()); buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); } + } else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) { + Mutation put = processReplicationMarkerEntry(cell); + if (put == null) { + continue; + } + List clusterIds = new ArrayList<>(); + for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) { + clusterIds.add(toUUID(clusterId)); + } + put.setClusterIds(clusterIds); + addToHashMultiMap(rowMap, table, clusterIds, put); } else { // Handle wal replication if (isNewRowOrType(previousCell, cell)) { @@ -289,6 +313,33 @@ public class ReplicationSink { } } + /* + * First check if config key hbase.regionserver.replication.sink.tracker.enabled is true or not. + * If false, then ignore this cell. If set to true, de-serialize value into + * ReplicationTrackerDescriptor. Create a Put mutation with regionserver name, walname, offset and + * timestamp from ReplicationMarkerDescriptor. + */ + private Put processReplicationMarkerEntry(Cell cell) throws IOException { + // If source is emitting replication marker rows but sink is not accepting them, + // ignore the edits. + if (!replicationSinkTrackerEnabled) { + return null; + } + WALProtos.ReplicationMarkerDescriptor descriptor = + WALProtos.ReplicationMarkerDescriptor.parseFrom(new ByteArrayInputStream(cell.getValueArray(), + cell.getValueOffset(), cell.getValueLength())); + Put put = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, RS_COLUMN, cell.getTimestamp(), + (Bytes.toBytes(descriptor.getRegionServerName()))); + put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, WAL_NAME_COLUMN, cell.getTimestamp(), + Bytes.toBytes(descriptor.getWalName())); + put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, TIMESTAMP_COLUMN, cell.getTimestamp(), + Bytes.toBytes(cell.getTimestamp())); + put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, OFFSET_COLUMN, cell.getTimestamp(), + Bytes.toBytes(descriptor.getOffset())); + return put; + } + private void buildBulkLoadHFileMap( final Map>>> bulkLoadHFileMap, TableName table, BulkLoadDescriptor bld) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java index 2373751afbb..d41dcd82e87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java @@ -825,4 +825,9 @@ public class ReplicationSource implements ReplicationSourceInterface { public String logPeerId() { return "peerId=" + this.getPeerId() + ","; } + + // Visible for testing purpose + public long getTotalReplicatedEdits() { + return totalReplicatedEdits.get(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java index 6e5da0feffb..7337694addb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java @@ -69,6 +69,10 @@ class ReplicationSourceWALActionListener implements WALActionsListener { if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) { return; } + // Allow replication marker row to pass through. + if (WALEdit.isReplicationMarkerEdit(logEdit)) { + return; + } // For replay, or if all the cells are markers, do not need to store replication scope. if ( logEdit.isReplay() || logEdit.getCells().stream().allMatch(c -> WALEdit.isMetaEditFamily(c)) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java index ef550fe6978..54b90db8b51 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver; import java.io.EOFException; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; @@ -30,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; @@ -42,6 +44,9 @@ import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; @@ -178,6 +183,7 @@ class ReplicationSourceWALReader extends Thread { } LOG.trace("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}", entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId()); + updateReplicationMarkerEdit(entry, batch.getLastWalPosition()); long entrySize = getEntrySizeIncludeBulkLoad(entry); long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); batch.addEntry(entry, entrySize); @@ -451,6 +457,36 @@ class ReplicationSourceWALReader extends Thread { return totalStoreFilesSize; } + /* + * Create @ReplicationMarkerDescriptor with region_server_name, wal_name and offset and set to + * cell's value. + */ + private void updateReplicationMarkerEdit(Entry entry, long offset) { + WALEdit edit = entry.getEdit(); + // Return early if it is not ReplicationMarker edit. + if (!WALEdit.isReplicationMarkerEdit(edit)) { + return; + } + List cells = edit.getCells(); + Preconditions.checkArgument(cells.size() == 1, "ReplicationMarker should have only 1 cell"); + Cell cell = cells.get(0); + // Create a descriptor with region_server_name, wal_name and offset + WALProtos.ReplicationMarkerDescriptor.Builder builder = + WALProtos.ReplicationMarkerDescriptor.newBuilder(); + builder.setRegionServerName(this.source.getServer().getServerName().getHostname()); + builder.setWalName(getCurrentPath().getName()); + builder.setOffset(offset); + WALProtos.ReplicationMarkerDescriptor descriptor = builder.build(); + + // Create a new KeyValue + KeyValue kv = new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell), + CellUtil.cloneQualifier(cell), cell.getTimestamp(), descriptor.toByteArray()); + ArrayList newCells = new ArrayList<>(); + newCells.add(kv); + // Update edit with new cell. + edit.setCells(newCells); + } + /** * @param size delta size for grown buffer * @return true if we should clear buffer and push all diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java index c688f6b1de5..bf78fcd21e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALEdit.java @@ -132,6 +132,23 @@ public class WALEdit implements HeapSize { @InterfaceAudience.Private public static final byte[] BULK_LOAD = Bytes.toBytes("HBASE::BULK_LOAD"); + /** + * Periodically {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore} + * will create marker edits with family as {@link WALEdit#METAFAMILY} and + * {@link WALEdit#REPLICATION_MARKER} as qualifier and an empty value. + * org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader will populate the + * Replication Marker edit with region_server_name, wal_name and wal_offset encoded in + * {@link org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ReplicationMarkerDescriptor} + * object. + * {@link org.apache.hadoop.hbase.replication.regionserver.Replication} will change the + * REPLICATION_SCOPE for this edit to GLOBAL so that it can replicate. On the sink cluster, + * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSink} will convert the + * ReplicationMarkerDescriptor into a Put mutation to REPLICATION_SINK_TRACKER_TABLE_NAME_STR + * table. + */ + @InterfaceAudience.Private + public static final byte[] REPLICATION_MARKER = Bytes.toBytes("HBASE::REPLICATION_MARKER"); + private final transient boolean replay; private ArrayList cells; @@ -480,4 +497,28 @@ public class WALEdit implements HeapSize { this.cells.add(cell); return this; } + + /** + * Creates a replication tracker edit with {@link #METAFAMILY} family and + * {@link #REPLICATION_MARKER} qualifier and has null value. + * @param rowKey rowkey + * @param timestamp timestamp + */ + public static WALEdit createReplicationMarkerEdit(byte[] rowKey, long timestamp) { + KeyValue kv = + new KeyValue(rowKey, METAFAMILY, REPLICATION_MARKER, timestamp, KeyValue.Type.Put); + return new WALEdit().add(kv); + } + + /** + * Checks whether this edit is a replication marker edit. + * @param edit edit + * @return true if the cell within an edit has column = METAFAMILY and qualifier = + * REPLICATION_MARKER, false otherwise + */ + public static boolean isReplicationMarkerEdit(WALEdit edit) { + // Check just the first cell from the edit. ReplicationMarker edit will have only 1 cell. + return edit.getCells().size() == 1 + && CellUtil.matchingColumn(edit.getCells().get(0), METAFAMILY, REPLICATION_MARKER); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 02a9904d1d2..c2a6508788d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -319,6 +319,14 @@ public class WALSplitter { Entry entry; startTS = EnvironmentEdgeManager.currentTime(); while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) { + if (WALEdit.isReplicationMarkerEdit(entry.getEdit())) { + // This condition is strictly not required since the regionid present in the edit is + // invalid, so it will skip processing this edit. + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring Replication marker edits."); + } + continue; + } byte[] region = entry.getKey().getEncodedRegionName(); String encodedRegionNameAsStr = Bytes.toString(region); Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALEdit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALEdit.java new file mode 100644 index 00000000000..00de2118795 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALEdit.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY; +import static org.apache.hadoop.hbase.wal.WALEdit.REPLICATION_MARKER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ SmallTests.class }) +public class TestWALEdit { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestWALEdit.class); + + private static final String RS_NAME = "test-region-server-name"; + + /** + * Tests that + * {@link org.apache.hadoop.hbase.wal.WALEdit#createReplicationMarkerEdit(byte[], long)} method is + * creating WALEdit with correct family and qualifier. + */ + @Test + public void testCreateReplicationMarkerEdit() { + long timestamp = EnvironmentEdgeManager.currentTime(); + + byte[] rowkey = ReplicationMarkerChore.getRowKey(RS_NAME, timestamp); + WALEdit edit = WALEdit.createReplicationMarkerEdit(rowkey, timestamp); + assertEquals(1, edit.getCells().size()); + Cell cell = edit.getCells().get(0); + assertTrue(CellUtil.matchingFamily(cell, METAFAMILY)); + assertTrue(CellUtil.matchingQualifier(cell, REPLICATION_MARKER)); + assertTrue(WALEdit.isReplicationMarkerEdit(edit)); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationMarker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationMarker.java new file mode 100644 index 00000000000..6c58973a759 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationMarker.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This test creates 2 mini hbase cluster. One cluster with + * "hbase.regionserver.replication.marker.enabled" conf key. This will create + * {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore} which will create + * marker rows to be replicated to sink cluster. Second cluster with + * "hbase.regionserver.replication.sink.tracker.enabled" conf key enabled. This will persist the + * marker rows coming from peer cluster to persist to REPLICATION.SINK_TRACKER table. + **/ +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestReplicationMarker { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationMarker.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestReplicationMarker.class); + + private static Configuration conf1; + private static Configuration conf2; + private static HBaseTestingUtil utility1; + private static HBaseTestingUtil utility2; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf1 = HBaseConfiguration.create(); + conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1"); + conf2 = new Configuration(conf1); + // Run the replication marker chore in cluster1. + conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true); + conf1.setLong(REPLICATION_MARKER_CHORE_DURATION_KEY, 1000); // 1 sec + utility1 = new HBaseTestingUtil(conf1); + + conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + // Enable the replication sink tracker for cluster 2 + conf2.setBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY, true); + utility2 = new HBaseTestingUtil(conf2); + + // Start cluster 2 first so that hbase:replicationsinktracker table gets created first. + utility2.startMiniCluster(1); + waitForReplicationTrackerTableCreation(); + + // Start cluster1 + utility1.startMiniCluster(1); + Admin admin1 = utility1.getAdmin(); + ReplicationPeerConfigBuilder rpcBuilder = ReplicationPeerConfig.newBuilder(); + rpcBuilder.setClusterKey(utility2.getClusterKey()); + admin1.addReplicationPeer("1", rpcBuilder.build()); + + ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0) + .getReplicationSourceService().getReplicationManager(); + // Wait until the peer gets established. + Waiter.waitFor(conf1, 10000, (Waiter.Predicate) () -> manager.getSources().size() == 1); + } + + private static void waitForReplicationTrackerTableCreation() { + Waiter.waitFor(conf2, 10000, (Waiter.Predicate) () -> utility2.getAdmin() + .tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME)); + } + + @AfterClass + public static void tearDown() throws Exception { + utility1.shutdownMiniCluster(); + utility2.shutdownMiniCluster(); + } + + @Test + public void testReplicationMarkerRow() throws Exception { + // We have configured ReplicationTrackerChore to run every second. Sleeping so that it will + // create enough sentinel rows. + Thread.sleep(5000); + WAL wal1 = utility1.getHBaseCluster().getRegionServer(0).getWAL(null); + String walName1ForCluster1 = ((AbstractFSWAL) wal1).getCurrentFileName().getName(); + String rs1Name = utility1.getHBaseCluster().getRegionServer(0).getServerName().getHostname(); + // Since we sync the marker edits while appending to wal, all the edits should be visible + // to Replication threads immediately. + assertTrue(getReplicatedEntries() >= 5); + // Force log roll. + wal1.rollWriter(true); + String walName2ForCluster1 = ((AbstractFSWAL) wal1).getCurrentFileName().getName(); + Connection connection2 = utility2.getMiniHBaseCluster().getRegionServer(0).getConnection(); + // Sleep for 5 more seconds to get marker rows with new wal name. + Thread.sleep(5000); + // Wait for cluster 2 to have atleast 8 tracker rows from cluster1. + utility2.waitFor(5000, () -> getTableCount(connection2) >= 8); + // Get replication marker rows from cluster2 + List list = getRows(connection2); + for (ReplicationSinkTrackerRow desc : list) { + // All the tracker rows should have same region server name i.e. rs of cluster1 + assertEquals(rs1Name, desc.getRegionServerName()); + // All the tracker rows will have either wal1 or wal2 name. + assertTrue(walName1ForCluster1.equals(desc.getWalName()) + || walName2ForCluster1.equals(desc.getWalName())); + } + + // This table shouldn't exist on cluster1 since + // hbase.regionserver.replication.sink.tracker.enabled is not enabled on this cluster. + assertFalse(utility1.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME)); + // This table shouldn't exist on cluster1 since + // hbase.regionserver.replication.sink.tracker.enabled is enabled on this cluster. + assertTrue(utility2.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME)); + } + + /* + * Get rows for replication sink tracker table. + */ + private List getRows(Connection connection) throws IOException { + List list = new ArrayList<>(); + Scan scan = new Scan(); + Table table = connection.getTable(REPLICATION_SINK_TRACKER_TABLE_NAME); + ResultScanner scanner = table.getScanner(scan); + + Result r; + while ((r = scanner.next()) != null) { + List cells = r.listCells(); + list.add(getPayload(cells)); + } + return list; + } + + private ReplicationSinkTrackerRow getPayload(List cells) { + String rsName = null, walName = null; + Long offset = null; + long timestamp = 0L; + for (Cell cell : cells) { + byte[] qualifier = CellUtil.cloneQualifier(cell); + byte[] value = CellUtil.cloneValue(cell); + + if (Bytes.equals(RS_COLUMN, qualifier)) { + rsName = Bytes.toString(value); + } else if (Bytes.equals(WAL_NAME_COLUMN, qualifier)) { + walName = Bytes.toString(value); + } else if (Bytes.equals(TIMESTAMP_COLUMN, qualifier)) { + timestamp = Bytes.toLong(value); + } else if (Bytes.equals(OFFSET_COLUMN, qualifier)) { + offset = Bytes.toLong(value); + } + } + ReplicationSinkTrackerRow row = + new ReplicationSinkTrackerRow(rsName, walName, timestamp, offset); + return row; + } + + class ReplicationSinkTrackerRow { + private String region_server_name; + private String wal_name; + private long timestamp; + private long offset; + + public ReplicationSinkTrackerRow(String region_server_name, String wal_name, long timestamp, + long offset) { + this.region_server_name = region_server_name; + this.wal_name = wal_name; + this.timestamp = timestamp; + this.offset = offset; + } + + public String getRegionServerName() { + return region_server_name; + } + + public String getWalName() { + return wal_name; + } + + public long getTimestamp() { + return timestamp; + } + + public long getOffset() { + return offset; + } + + @Override + public String toString() { + return "ReplicationSinkTrackerRow{" + "region_server_name='" + region_server_name + '\'' + + ", wal_name='" + wal_name + '\'' + ", timestamp=" + timestamp + ", offset=" + offset + + '}'; + } + } + + private int getTableCount(Connection connection) throws Exception { + Table table = connection.getTable(REPLICATION_SINK_TRACKER_TABLE_NAME); + ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM)); + int count = 0; + while (resultScanner.next() != null) { + count++; + } + LOG.info("Table count: " + count); + return count; + } + + /* + * Return replicated entries from cluster1. + */ + private long getReplicatedEntries() { + ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0) + .getReplicationSourceService().getReplicationManager(); + List sources = manager.getSources(); + assertEquals(1, sources.size()); + ReplicationSource source = (ReplicationSource) sources.get(0); + return source.getTotalReplicatedEdits(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index eab7869459f..48c877423d0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.wal; +import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.getRowKey; +import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY; +import static org.apache.hadoop.hbase.wal.WALEdit.REPLICATION_MARKER; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -59,10 +63,12 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; +import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; +import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -484,6 +490,39 @@ public class TestWALSplit { assertEquals(11, countWAL(splitLog[0])); } + /* + * Tests that WalSplitter ignores replication marker edits. + */ + @Test(timeout = 30000) + public void testSplitRemovesReplicationMarkerEdits() throws IOException { + RegionInfo regionInfo = ReplicationMarkerChore.REGION_INFO; + Path path = new Path(WALDIR, WAL_FILE_PREFIX + "1"); + generateReplicationMarkerEdits(path, regionInfo); + useDifferentDFSClient(); + List logFiles = + SplitLogManager.getFileList(conf, Collections.singletonList(WALDIR), null); + assertEquals(1, logFiles.size()); + assertEquals(path, logFiles.get(0).getPath()); + List splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals); + // Make sure that WALSplitter doesn't fail. + assertEquals(0, splitPaths.size()); + } + + private void generateReplicationMarkerEdits(Path path, RegionInfo regionInfo) throws IOException { + long timestamp = EnvironmentEdgeManager.currentTime(); + fs.mkdirs(WALDIR); + try (Writer writer = wals.createWALWriter(fs, path)) { + WALProtos.ReplicationMarkerDescriptor.Builder builder = + WALProtos.ReplicationMarkerDescriptor.newBuilder(); + builder.setWalName("wal-name"); + builder.setRegionServerName("rs-name"); + builder.setOffset(0L); + WALProtos.ReplicationMarkerDescriptor desc = builder.build(); + appendEntry(writer, REPLICATION_SINK_TRACKER_TABLE_NAME, regionInfo.getEncodedNameAsBytes(), + getRowKey(desc.getRegionServerName(), timestamp), METAFAMILY, REPLICATION_MARKER, VALUE, 1); + } + } + /** * @param expectedEntries -1 to not assert * @return the count across all regions