HBASE-26913 Replication Observability Framework (#4862)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Rushabh Shah 2022-11-08 13:05:42 -08:00 committed by GitHub
parent ea4ccf0044
commit ecf3debd42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1971 additions and 82 deletions

View File

@ -21,12 +21,10 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
@ -48,8 +46,6 @@ public class SlowLogTableAccessor {
private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableAccessor.class); private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableAccessor.class);
private static Connection connection;
/** /**
* hbase:slowlog table name - can be enabled with config - * hbase:slowlog table name - can be enabled with config -
* hbase.regionserver.slowlog.systable.enabled * hbase.regionserver.slowlog.systable.enabled
@ -66,10 +62,10 @@ public class SlowLogTableAccessor {
/** /**
* Add slow/large log records to hbase:slowlog table * Add slow/large log records to hbase:slowlog table
* @param slowLogPayloads List of SlowLogPayload to process * @param slowLogPayloads List of SlowLogPayload to process
* @param configuration Configuration to use for connection * @param connection connection
*/ */
public static void addSlowLogRecords(final List<TooSlowLog.SlowLogPayload> slowLogPayloads, public static void addSlowLogRecords(final List<TooSlowLog.SlowLogPayload> slowLogPayloads,
final Configuration configuration) { Connection connection) {
List<Put> puts = new ArrayList<>(slowLogPayloads.size()); List<Put> puts = new ArrayList<>(slowLogPayloads.size());
for (TooSlowLog.SlowLogPayload slowLogPayload : slowLogPayloads) { for (TooSlowLog.SlowLogPayload slowLogPayload : slowLogPayloads) {
final byte[] rowKey = getRowKey(slowLogPayload); final byte[] rowKey = getRowKey(slowLogPayload);
@ -102,26 +98,12 @@ public class SlowLogTableAccessor {
puts.add(put); puts.add(put);
} }
try { try {
if (connection == null) {
createConnection(configuration);
}
doPut(connection, puts); doPut(connection, puts);
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Failed to add slow/large log records to hbase:slowlog table.", e); LOG.warn("Failed to add slow/large log records to hbase:slowlog table.", e);
} }
} }
private static synchronized void createConnection(Configuration configuration)
throws IOException {
Configuration conf = new Configuration(configuration);
// rpc timeout: 20s
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 20000);
// retry count: 5
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
connection = ConnectionFactory.createConnection(conf);
}
/** /**
* Create rowKey: currentTime APPEND slowLogPayload.hashcode Scan on slowlog table should keep * Create rowKey: currentTime APPEND slowLogPayload.hashcode Scan on slowlog table should keep
* records with sorted order of time, however records added at the very same time could be in * records with sorted order of time, however records added at the very same time could be in
@ -140,5 +122,4 @@ public class SlowLogTableAccessor {
final long rowKeyLong = Long.parseLong(timeAndHashcode); final long rowKeyLong = Long.parseLong(timeAndHashcode);
return Bytes.toBytes(rowKeyLong); return Bytes.toBytes(rowKeyLong);
} }
} }

View File

@ -1643,6 +1643,14 @@ public final class HConstants {
"hbase.regionserver.slowlog.systable.enabled"; "hbase.regionserver.slowlog.systable.enabled";
public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false; public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false;
@Deprecated
// since <need to know the version number> and will be removed in <version number>
// Instead use hbase.regionserver.named.queue.chore.duration config property
public static final String SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY =
"hbase.slowlog.systable.chore.duration";
// Default 10 mins.
public static final int DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION = 10 * 60 * 1000;
public static final String SHELL_TIMESTAMP_FORMAT_EPOCH_KEY = public static final String SHELL_TIMESTAMP_FORMAT_EPOCH_KEY =
"hbase.shell.timestamp.format.epoch"; "hbase.shell.timestamp.format.epoch";

View File

@ -2004,7 +2004,7 @@ possible configurations would overwhelm and obscure the important.
</property> </property>
<property> <property>
<name>hbase.namedqueue.provider.classes</name> <name>hbase.namedqueue.provider.classes</name>
<value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerRejectionQueueService</value> <value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerRejectionQueueService,org.apache.hadoop.hbase.namequeues.WALEventTrackerQueueService</value>
<description> <description>
Default values for NamedQueueService implementors. This comma separated full class names Default values for NamedQueueService implementors. This comma separated full class names
represent all implementors of NamedQueueService that we would like to be invoked by represent all implementors of NamedQueueService that we would like to be invoked by

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public interface MetricsWALEventTrackerSource extends BaseSource {
/**
* The name of the metrics
*/
String METRICS_NAME = "WALEventTracker";
/**
* The name of the metrics context that metrics will be under.
*/
String METRICS_CONTEXT = "regionserver";
/**
* Description
*/
String METRICS_DESCRIPTION = "Metrics about HBase RegionServer WALEventTracker";
/**
* The name of the metrics context that metrics will be under in jmx
*/
String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
String NUM_FAILED_PUTS = "numFailedPuts";
String NUM_FAILED_PUTS_DESC = "Number of put requests that failed";
String NUM_RECORDS_FAILED_PUTS = "numRecordsFailedPuts";
String NUM_RECORDS_FAILED_PUTS_DESC = "number of records in failed puts";
/*
* Increment 2 counters, numFailedPuts and numRecordsFailedPuts
*/
void incrFailedPuts(long numRecords);
/*
* Get the failed puts counter.
*/
long getFailedPuts();
/*
* Get the number of records in failed puts.
*/
long getNumRecordsFailedPuts();
}

View File

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
org.apache.hadoop.hbase.namequeues.MetricsWALEventTrackerSourceImpl

View File

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
import org.apache.hadoop.metrics2.lib.MutableFastCounter;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class MetricsWALEventTrackerSourceImpl extends BaseSourceImpl
implements MetricsWALEventTrackerSource {
private final MutableFastCounter numFailedPutsCount;
private final MutableFastCounter numRecordsFailedPutsCount;
public MetricsWALEventTrackerSourceImpl() {
this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
}
public MetricsWALEventTrackerSourceImpl(String metricsName, String metricsDescription,
String metricsContext, String metricsJmxContext) {
super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
numFailedPutsCount =
this.getMetricsRegistry().newCounter(NUM_FAILED_PUTS, NUM_FAILED_PUTS_DESC, 0L);
numRecordsFailedPutsCount = this.getMetricsRegistry().newCounter(NUM_RECORDS_FAILED_PUTS,
NUM_RECORDS_FAILED_PUTS_DESC, 0L);
}
@Override
public void incrFailedPuts(long numRecords) {
numFailedPutsCount.incr();
numRecordsFailedPutsCount.incr(numRecords);
}
@Override
public long getFailedPuts() {
return numFailedPutsCount.value();
}
@Override
public long getNumRecordsFailedPuts() {
return numRecordsFailedPutsCount.value();
}
}

View File

@ -182,3 +182,12 @@ message RegionEventDescriptor {
*/ */
message WALTrailer { message WALTrailer {
} }
/**
* Special WAL entry for replication marker event.
*/
message ReplicationMarkerDescriptor {
required string region_server_name = 1;
required string wal_name = 2;
required uint64 offset = 3;
}

View File

@ -172,6 +172,7 @@ import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure; import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService; import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager; import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator;
import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer; import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer; import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
import org.apache.hadoop.hbase.mob.MobFileCleanerChore; import org.apache.hadoop.hbase.mob.MobFileCleanerChore;
@ -214,6 +215,7 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader; import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader;
import org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.SecurityConstants; import org.apache.hadoop.hbase.security.SecurityConstants;
@ -1243,6 +1245,10 @@ public class HMaster extends HRegionServer implements MasterServices {
final SlowLogMasterService slowLogMasterService = new SlowLogMasterService(conf, this); final SlowLogMasterService slowLogMasterService = new SlowLogMasterService(conf, this);
slowLogMasterService.init(); slowLogMasterService.init();
WALEventTrackerTableCreator.createIfNeededAndNotExists(conf, this);
// Create REPLICATION.SINK_TRACKER table if needed.
ReplicationSinkTrackerTableCreator.createIfNeededAndNotExists(conf, this);
// clear the dead servers with same host name and port of online server because we are not // clear the dead servers with same host name and port of online server because we are not
// removing dead server with same hostname and port of rs which is trying to check in before // removing dead server with same hostname and port of rs which is trying to check in before
// master initialization. See HBASE-5916. // master initialization. See HBASE-5916.

View File

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.waleventtracker;
import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME_STR;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* WALEventTracker Table creation to be used by HMaster
*/
@InterfaceAudience.Private
public final class WALEventTrackerTableCreator {
private static final Logger LOG = LoggerFactory.getLogger(WALEventTrackerTableCreator.class);
public static final String WAL_EVENT_TRACKER_ENABLED_KEY =
"hbase.regionserver.wal.event.tracker.enabled";
public static final boolean WAL_EVENT_TRACKER_ENABLED_DEFAULT = false;
/** The walEventTracker info family as a string */
private static final String WAL_EVENT_TRACKER_INFO_FAMILY_STR = "info";
/** The walEventTracker info family in array of bytes */
public static final byte[] WAL_EVENT_TRACKER_INFO_FAMILY =
Bytes.toBytes(WAL_EVENT_TRACKER_INFO_FAMILY_STR);
private static final long TTL = TimeUnit.DAYS.toSeconds(365); // 1 year in seconds
private static final TableDescriptorBuilder TABLE_DESCRIPTOR_BUILDER = TableDescriptorBuilder
.newBuilder(WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME).setRegionReplication(1)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(WAL_EVENT_TRACKER_INFO_FAMILY)
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBlockCacheEnabled(false).setMaxVersions(1)
.setTimeToLive((int) TTL).build());
/* Private default constructor */
private WALEventTrackerTableCreator() {
}
/*
* We will create this table only if hbase.regionserver.wal.event.tracker.enabled is enabled and
* table doesn't exists already.
*/
public static void createIfNeededAndNotExists(Configuration conf, MasterServices masterServices)
throws IOException {
boolean walEventTrackerEnabled =
conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT);
if (!walEventTrackerEnabled) {
LOG.info("wal event tracker requests logging to table " + WAL_EVENT_TRACKER_TABLE_NAME_STR
+ " is disabled. Quitting.");
return;
}
if (
!masterServices.getTableDescriptors()
.exists(WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME)
) {
LOG.info(WAL_EVENT_TRACKER_TABLE_NAME_STR + " table not found. Creating.");
masterServices.createTable(TABLE_DESCRIPTOR_BUILDER.build(), null, 0L, NO_NONCE);
}
}
}

View File

@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTargetException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -70,7 +71,8 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
namedQueueServices.put(namedQueueService.getEvent(), namedQueueService); namedQueueServices.put(namedQueueService.getEvent(), namedQueueService);
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException } catch (InstantiationException | IllegalAccessException | NoSuchMethodException
| InvocationTargetException e) { | InvocationTargetException e) {
LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.", clz); LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.", clz,
e);
} }
} }
} }
@ -105,8 +107,8 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
* Add all in memory queue records to system table. The implementors can use system table or * Add all in memory queue records to system table. The implementors can use system table or
* direct HDFS file or ZK as persistence system. * direct HDFS file or ZK as persistence system.
*/ */
void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent, Connection connection) {
namedQueueServices.get(namedQueueEvent).persistAll(); namedQueueServices.get(namedQueueEvent).persistAll(connection);
} }
/** /**

View File

@ -29,7 +29,8 @@ public class NamedQueuePayload {
public enum NamedQueueEvent { public enum NamedQueueEvent {
SLOW_LOG(0), SLOW_LOG(0),
BALANCE_DECISION(1), BALANCE_DECISION(1),
BALANCE_REJECTION(2); BALANCE_REJECTION(2),
WAL_EVENT_TRACKER(3);
private final int value; private final int value;
@ -48,6 +49,9 @@ public class NamedQueuePayload {
case 2: { case 2: {
return BALANCE_REJECTION; return BALANCE_REJECTION;
} }
case 3: {
return WAL_EVENT_TRACKER;
}
default: { default: {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"NamedQueue event with ordinal " + value + " not defined"); "NamedQueue event with ordinal " + value + " not defined");

View File

@ -22,6 +22,7 @@ import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType; import com.lmax.disruptor.dsl.ProducerType;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
@ -60,7 +61,7 @@ public class NamedQueueRecorder {
// disruptor initialization with BlockingWaitStrategy // disruptor initialization with BlockingWaitStrategy
this.disruptor = new Disruptor<>(RingBufferEnvelope::new, getEventCount(eventCount), this.disruptor = new Disruptor<>(RingBufferEnvelope::new, getEventCount(eventCount),
new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".slowlog.append-pool-%d") new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".named-queue-events-pool-%d")
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
ProducerType.MULTI, new BlockingWaitStrategy()); ProducerType.MULTI, new BlockingWaitStrategy());
this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler()); this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
@ -139,10 +140,9 @@ public class NamedQueueRecorder {
* Add all in memory queue records to system table. The implementors can use system table or * Add all in memory queue records to system table. The implementors can use system table or
* direct HDFS file or ZK as persistence system. * direct HDFS file or ZK as persistence system.
*/ */
public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) { public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent, Connection connection) {
if (this.logEventHandler != null) { if (this.logEventHandler != null) {
this.logEventHandler.persistAll(namedQueueEvent); this.logEventHandler.persistAll(namedQueueEvent, connection);
} }
} }
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.namequeues; package org.apache.hadoop.hbase.namequeues;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -58,5 +59,5 @@ public interface NamedQueueService {
* Add all in memory queue records to system table. The implementors can use system table or * Add all in memory queue records to system table. The implementors can use system table or
* direct HDFS file or ZK as persistence system. * direct HDFS file or ZK as persistence system.
*/ */
void persistAll(); void persistAll(Connection connection);
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.namequeues;
import org.apache.hadoop.hbase.ScheduledChore; import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -27,11 +28,16 @@ import org.slf4j.LoggerFactory;
* Chore to insert multiple accumulated slow/large logs to hbase:slowlog system table * Chore to insert multiple accumulated slow/large logs to hbase:slowlog system table
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class SlowLogTableOpsChore extends ScheduledChore { public class NamedQueueServiceChore extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableOpsChore.class); private static final Logger LOG = LoggerFactory.getLogger(NamedQueueServiceChore.class);
public static final String NAMED_QUEUE_CHORE_DURATION_KEY =
"hbase.regionserver.named.queue.chore.duration";
// 10 mins default.
public static final int NAMED_QUEUE_CHORE_DURATION_DEFAULT = 10 * 60 * 1000;
private final NamedQueueRecorder namedQueueRecorder; private final NamedQueueRecorder namedQueueRecorder;
private final Connection connection;
/** /**
* Chore Constructor * Chore Constructor
@ -41,21 +47,23 @@ public class SlowLogTableOpsChore extends ScheduledChore {
* scheduled * scheduled
* @param namedQueueRecorder {@link NamedQueueRecorder} instance * @param namedQueueRecorder {@link NamedQueueRecorder} instance
*/ */
public SlowLogTableOpsChore(final Stoppable stopper, final int period, public NamedQueueServiceChore(final Stoppable stopper, final int period,
final NamedQueueRecorder namedQueueRecorder) { final NamedQueueRecorder namedQueueRecorder, Connection connection) {
super("SlowLogTableOpsChore", stopper, period); super("NamedQueueServiceChore", stopper, period);
this.namedQueueRecorder = namedQueueRecorder; this.namedQueueRecorder = namedQueueRecorder;
this.connection = connection;
} }
@Override @Override
protected void chore() { protected void chore() {
if (LOG.isTraceEnabled()) { for (NamedQueuePayload.NamedQueueEvent event : NamedQueuePayload.NamedQueueEvent.values()) {
LOG.trace("SlowLog Table Ops Chore is starting up."); if (LOG.isDebugEnabled()) {
} LOG.debug(String.format("Starting chore for event %s", event.name()));
namedQueueRecorder.persistAll(NamedQueuePayload.NamedQueueEvent.SLOW_LOG); }
if (LOG.isTraceEnabled()) { namedQueueRecorder.persistAll(event, connection);
LOG.trace("SlowLog Table Ops Chore is closing."); if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Stopping chore for event %s", event.name()));
}
} }
} }
} }

View File

@ -22,6 +22,7 @@ import java.util.List;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor; import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -66,7 +67,7 @@ public class SlowLogPersistentService {
/** /**
* Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
*/ */
public void addAllLogsToSysTable() { public void addAllLogsToSysTable(Connection connection) {
if (queueForSysTable == null) { if (queueForSysTable == null) {
LOG.trace("hbase.regionserver.slowlog.systable.enabled is turned off. Exiting."); LOG.trace("hbase.regionserver.slowlog.systable.enabled is turned off. Exiting.");
return; return;
@ -82,13 +83,13 @@ public class SlowLogPersistentService {
slowLogPayloads.add(queueForSysTable.poll()); slowLogPayloads.add(queueForSysTable.poll());
i++; i++;
if (i == SYSTABLE_PUT_BATCH_SIZE) { if (i == SYSTABLE_PUT_BATCH_SIZE) {
SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, connection);
slowLogPayloads.clear(); slowLogPayloads.clear();
i = 0; i = 0;
} }
} }
if (slowLogPayloads.size() > 0) { if (slowLogPayloads.size() > 0) {
SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration); SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, connection);
} }
} finally { } finally {
LOCK.unlock(); LOCK.unlock();

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class WALEventTrackerPayload extends NamedQueuePayload {
private final String rsName;
private final String walName;
private final long timeStamp;
private final String state;
private final long walLength;
public WALEventTrackerPayload(String rsName, String walName, long timeStamp, String state,
long walLength) {
super(NamedQueueEvent.WAL_EVENT_TRACKER.getValue());
this.rsName = rsName;
this.walName = walName;
this.timeStamp = timeStamp;
this.state = state;
this.walLength = walLength;
}
public String getRsName() {
return rsName;
}
public String getWalName() {
return walName;
}
public long getTimeStamp() {
return timeStamp;
}
public String getState() {
return state;
}
public long getWalLength() {
return walLength;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(this.getClass().getSimpleName());
sb.append("[");
sb.append("rsName=").append(rsName);
sb.append(", walName=").append(walName);
sb.append(", timeStamp=").append(timeStamp);
sb.append(", walState=").append(state);
sb.append(", walLength=").append(walLength);
sb.append("]");
return sb.toString();
}
}

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
/*
This class provides the queue to save Wal events from backing RingBuffer.
*/
@InterfaceAudience.Private
public class WALEventTrackerQueueService implements NamedQueueService {
private EvictingQueue<WALEventTrackerPayload> queue;
private static final String WAL_EVENT_TRACKER_RING_BUFFER_SIZE =
"hbase.regionserver.wal.event.tracker.ringbuffer.size";
private final boolean walEventTrackerEnabled;
private int queueSize;
private MetricsWALEventTrackerSource source = null;
private static final Logger LOG = LoggerFactory.getLogger(WALEventTrackerQueueService.class);
public WALEventTrackerQueueService(Configuration conf) {
this(conf, null);
}
public WALEventTrackerQueueService(Configuration conf, MetricsWALEventTrackerSource source) {
this.walEventTrackerEnabled =
conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT);
if (!walEventTrackerEnabled) {
return;
}
this.queueSize = conf.getInt(WAL_EVENT_TRACKER_RING_BUFFER_SIZE, 256);
queue = EvictingQueue.create(queueSize);
if (source == null) {
this.source = CompatibilitySingletonFactory.getInstance(MetricsWALEventTrackerSource.class);
} else {
this.source = source;
}
}
@Override
public NamedQueuePayload.NamedQueueEvent getEvent() {
return NamedQueuePayload.NamedQueueEvent.WAL_EVENT_TRACKER;
}
@Override
public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
if (!walEventTrackerEnabled) {
return;
}
if (!(namedQueuePayload instanceof WALEventTrackerPayload)) {
LOG.warn("WALEventTrackerQueueService: NamedQueuePayload is not of type"
+ " WALEventTrackerPayload.");
return;
}
WALEventTrackerPayload payload = (WALEventTrackerPayload) namedQueuePayload;
if (LOG.isDebugEnabled()) {
LOG.debug("Adding wal event tracker payload " + payload);
}
addToQueue(payload);
}
/*
* Made it default to use it in testing.
*/
synchronized void addToQueue(WALEventTrackerPayload payload) {
queue.add(payload);
}
@Override
public boolean clearNamedQueue() {
if (!walEventTrackerEnabled) {
return false;
}
LOG.debug("Clearing wal event tracker queue");
queue.clear();
return true;
}
@Override
public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
return null;
}
@Override
public void persistAll(Connection connection) {
if (!walEventTrackerEnabled) {
return;
}
if (queue.isEmpty()) {
LOG.debug("Wal Event tracker queue is empty.");
return;
}
Queue<WALEventTrackerPayload> queue = getWALEventTrackerList();
try {
WALEventTrackerTableAccessor.addWalEventTrackerRows(queue, connection);
} catch (Exception ioe) {
// If we fail to persist the records with retries then just forget about them.
// This is a best effort service.
LOG.error("Failed while persisting wal tracker records", ioe);
// Increment metrics for failed puts
source.incrFailedPuts(queue.size());
}
}
private synchronized Queue<WALEventTrackerPayload> getWALEventTrackerList() {
Queue<WALEventTrackerPayload> retQueue = new ArrayDeque<>();
Iterator<WALEventTrackerPayload> iterator = queue.iterator();
while (iterator.hasNext()) {
retQueue.add(iterator.next());
}
queue.clear();
return retQueue;
}
}

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_INFO_FAMILY;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public final class WALEventTrackerTableAccessor {
public static final String RS_COLUMN = "region_server_name";
public static final String WAL_NAME_COLUMN = "wal_name";
public static final String TIMESTAMP_COLUMN = "timestamp";
public static final String WAL_STATE_COLUMN = "wal_state";
public static final String WAL_LENGTH_COLUMN = "wal_length";
public static final String MAX_ATTEMPTS_KEY = "wal.event.tracker.max.attempts";
public static final String SLEEP_INTERVAL_KEY = "wal.event.tracker.sleep.interval.msec";
public static final String MAX_SLEEP_TIME_KEY = "wal.event.tracker.max.sleep.time.msec";
public static final int DEFAULT_MAX_ATTEMPTS = 3;
public static final long DEFAULT_SLEEP_INTERVAL = 1000L; // 1 second
public static final long DEFAULT_MAX_SLEEP_TIME = 60000L; // 60 seconds
public static final String WAL_EVENT_TRACKER_TABLE_NAME_STR = "REPLICATION.WALEVENTTRACKER";
public static final String DELIMITER = "_";
private WALEventTrackerTableAccessor() {
}
/**
* {@link #WAL_EVENT_TRACKER_TABLE_NAME_STR} table name - can be enabled with config -
* hbase.regionserver.wal.event.tracker.enabled
*/
public static final TableName WAL_EVENT_TRACKER_TABLE_NAME =
TableName.valueOf(WAL_EVENT_TRACKER_TABLE_NAME_STR);
private static void doPut(final Connection connection, final List<Put> puts) throws Exception {
RetryCounter retryCounter = getRetryFactory(connection.getConfiguration()).create();
while (true) {
try (Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME)) {
table.put(puts);
return;
} catch (IOException ioe) {
retryOrThrow(retryCounter, ioe);
}
retryCounter.sleepUntilNextRetry();
}
}
private static RetryCounterFactory getRetryFactory(Configuration conf) {
int maxAttempts = conf.getInt(MAX_ATTEMPTS_KEY, DEFAULT_MAX_ATTEMPTS);
long sleepIntervalMs = conf.getLong(SLEEP_INTERVAL_KEY, DEFAULT_SLEEP_INTERVAL);
long maxSleepTimeMs = conf.getLong(MAX_SLEEP_TIME_KEY, DEFAULT_MAX_SLEEP_TIME);
RetryCounter.RetryConfig retryConfig =
new RetryCounter.RetryConfig(maxAttempts, sleepIntervalMs, maxSleepTimeMs,
TimeUnit.MILLISECONDS, new RetryCounter.ExponentialBackoffPolicyWithLimit());
return new RetryCounterFactory(retryConfig);
}
private static void retryOrThrow(RetryCounter retryCounter, IOException ioe) throws IOException {
if (retryCounter.shouldRetry()) {
return;
}
throw ioe;
}
/**
* Add wal event tracker rows to hbase:waleventtracker table
* @param walEventPayloads List of walevents to process
* @param connection Connection to use.
*/
public static void addWalEventTrackerRows(Queue<WALEventTrackerPayload> walEventPayloads,
final Connection connection) throws Exception {
List<Put> puts = new ArrayList<>(walEventPayloads.size());
for (WALEventTrackerPayload payload : walEventPayloads) {
final byte[] rowKey = getRowKey(payload);
final Put put = new Put(rowKey);
// TODO Do we need to SKIP_WAL ?
put.setPriority(HConstants.NORMAL_QOS);
put
.addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(RS_COLUMN),
Bytes.toBytes(payload.getRsName()))
.addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_NAME_COLUMN),
Bytes.toBytes(payload.getWalName()))
.addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(TIMESTAMP_COLUMN),
Bytes.toBytes(payload.getTimeStamp()))
.addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_STATE_COLUMN),
Bytes.toBytes(payload.getState()))
.addColumn(WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_LENGTH_COLUMN),
Bytes.toBytes(payload.getWalLength()));
puts.add(put);
}
doPut(connection, puts);
}
/**
* Create rowKey: 1. We want RS name to be the leading part of rowkey so that we can query by RS
* name filter. WAL name contains rs name as a leading part. 2. Timestamp when the event was
* generated. 3. Add state of the wal. Combination of 1 + 2 + 3 is definitely going to create a
* unique rowkey.
* @param payload payload to process
* @return rowKey byte[]
*/
public static byte[] getRowKey(final WALEventTrackerPayload payload) {
String walName = payload.getWalName();
// converting to string since this will help seeing the timestamp in string format using
// hbase shell commands.
String timestampStr = String.valueOf(payload.getTimeStamp());
String walState = payload.getState();
final String rowKeyStr = walName + DELIMITER + timestampStr + DELIMITER + walState;
return Bytes.toBytes(rowKeyStr);
}
}

View File

@ -24,6 +24,7 @@ import java.util.Queue;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.BalancerDecision; import org.apache.hadoop.hbase.client.BalancerDecision;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails; import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
@ -141,7 +142,7 @@ public class BalancerDecisionQueueService implements NamedQueueService {
} }
@Override @Override
public void persistAll() { public void persistAll(Connection connection) {
// no-op for now // no-op for now
} }

View File

@ -24,6 +24,7 @@ import java.util.Queue;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.BalancerRejection; import org.apache.hadoop.hbase.client.BalancerRejection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails; import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
@ -127,7 +128,7 @@ public class BalancerRejectionQueueService implements NamedQueueService {
} }
@Override @Override
public void persistAll() { public void persistAll(Connection connection) {
// no-op for now // no-op for now
} }

View File

@ -25,6 +25,7 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.SlowLogParams; import org.apache.hadoop.hbase.client.SlowLogParams;
import org.apache.hadoop.hbase.ipc.RpcCall; import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.namequeues.LogHandlerUtils; import org.apache.hadoop.hbase.namequeues.LogHandlerUtils;
@ -223,12 +224,12 @@ public class SlowLogQueueService implements NamedQueueService {
* table. * table.
*/ */
@Override @Override
public void persistAll() { public void persistAll(Connection connection) {
if (!isOnlineLogProviderEnabled) { if (!isOnlineLogProviderEnabled) {
return; return;
} }
if (slowLogPersistentService != null) { if (slowLogPersistentService != null) {
slowLogPersistentService.addAllLogsToSysTable(); slowLogPersistentService.addAllLogsToSysTable(connection);
} }
} }

View File

@ -19,8 +19,17 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_DEFAULT;
import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY;
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_DEFAULT;
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY;
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY; import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.Span;
@ -129,12 +138,11 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.mob.MobFileCache; import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore; import org.apache.hadoop.hbase.mob.RSMobFileCleanerChore;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore; import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore;
import org.apache.hadoop.hbase.net.Address; import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
@ -156,7 +164,10 @@ import org.apache.hadoop.hbase.regionserver.http.RSDumpServlet;
import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet; import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad; import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
import org.apache.hadoop.hbase.security.SecurityConstants; import org.apache.hadoop.hbase.security.SecurityConstants;
@ -457,7 +468,7 @@ public class HRegionServer extends Thread
private final RegionServerAccounting regionServerAccounting; private final RegionServerAccounting regionServerAccounting;
private SlowLogTableOpsChore slowLogTableOpsChore = null; private NamedQueueServiceChore namedQueueServiceChore = null;
// Block cache // Block cache
private BlockCache blockCache; private BlockCache blockCache;
@ -590,6 +601,11 @@ public class HRegionServer extends Thread
// A timer to shutdown the process if abort takes too long // A timer to shutdown the process if abort takes too long
private Timer abortMonitor; private Timer abortMonitor;
/*
* Chore that creates replication marker rows.
*/
private ReplicationMarkerChore replicationMarkerChore;
/** /**
* Starts a HRegionServer at the default location. * Starts a HRegionServer at the default location.
* <p/> * <p/>
@ -636,7 +652,7 @@ public class HRegionServer extends Thread
this.abortRequested = new AtomicBoolean(false); this.abortRequested = new AtomicBoolean(false);
this.stopped = false; this.stopped = false;
initNamedQueueRecorder(conf); this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
rpcServices = createRpcServices(); rpcServices = createRpcServices();
useThisHostnameInstead = getUseThisHostnameInstead(conf); useThisHostnameInstead = getUseThisHostnameInstead(conf);
@ -725,26 +741,6 @@ public class HRegionServer extends Thread
} }
} }
private void initNamedQueueRecorder(Configuration conf) {
if (!(this instanceof HMaster)) {
final boolean isOnlineLogProviderEnabled = conf.getBoolean(
HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
if (isOnlineLogProviderEnabled) {
this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
}
} else {
final boolean isBalancerDecisionRecording =
conf.getBoolean(BaseLoadBalancer.BALANCER_DECISION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_DECISION_BUFFER_ENABLED);
final boolean isBalancerRejectionRecording =
conf.getBoolean(BaseLoadBalancer.BALANCER_REJECTION_BUFFER_ENABLED,
BaseLoadBalancer.DEFAULT_BALANCER_REJECTION_BUFFER_ENABLED);
if (isBalancerDecisionRecording || isBalancerRejectionRecording) {
this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
}
}
}
// HMaster should override this method to load the specific config for master // HMaster should override this method to load the specific config for master
protected String getUseThisHostnameInstead(Configuration conf) throws IOException { protected String getUseThisHostnameInstead(Configuration conf) throws IOException {
String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY); String hostname = conf.get(UNSAFE_RS_HOSTNAME_KEY);
@ -1036,6 +1032,17 @@ public class HRegionServer extends Thread
|| (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp()); || (this.clusterStatusTracker != null && this.clusterStatusTracker.isClusterUp());
} }
private void initializeReplicationMarkerChore() {
boolean replicationMarkerEnabled =
conf.getBoolean(REPLICATION_MARKER_ENABLED_KEY, REPLICATION_MARKER_ENABLED_DEFAULT);
// If replication or replication marker is not enabled then return immediately.
if (replicationMarkerEnabled) {
int period = conf.getInt(REPLICATION_MARKER_CHORE_DURATION_KEY,
REPLICATION_MARKER_CHORE_DURATION_DEFAULT);
replicationMarkerChore = new ReplicationMarkerChore(this, this, period);
}
}
/** /**
* The HRegionServer sticks in this loop until closed. * The HRegionServer sticks in this loop until closed.
*/ */
@ -2049,9 +2056,23 @@ public class HRegionServer extends Thread
} }
// Instantiate replication if replication enabled. Pass it the log directories. // Instantiate replication if replication enabled. Pass it the log directories.
createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory); createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
WALActionsListener walEventListener = getWALEventTrackerListener(conf);
if (walEventListener != null && factory.getWALProvider() != null) {
factory.getWALProvider().addWALActionsListener(walEventListener);
}
this.walFactory = factory; this.walFactory = factory;
} }
private WALActionsListener getWALEventTrackerListener(Configuration conf) {
if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) {
WALEventTrackerListener listener =
new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName());
return listener;
}
return null;
}
/** /**
* Start up replication source and sink handlers. * Start up replication source and sink handlers.
*/ */
@ -2205,16 +2226,19 @@ public class HRegionServer extends Thread
if (this.fsUtilizationChore != null) { if (this.fsUtilizationChore != null) {
choreService.scheduleChore(fsUtilizationChore); choreService.scheduleChore(fsUtilizationChore);
} }
if (this.slowLogTableOpsChore != null) { if (this.namedQueueServiceChore != null) {
choreService.scheduleChore(slowLogTableOpsChore); choreService.scheduleChore(namedQueueServiceChore);
} }
if (this.brokenStoreFileCleaner != null) { if (this.brokenStoreFileCleaner != null) {
choreService.scheduleChore(brokenStoreFileCleaner); choreService.scheduleChore(brokenStoreFileCleaner);
} }
if (this.rsMobFileCleanerChore != null) { if (this.rsMobFileCleanerChore != null) {
choreService.scheduleChore(rsMobFileCleanerChore); choreService.scheduleChore(rsMobFileCleanerChore);
} }
if (replicationMarkerChore != null) {
LOG.info("Starting replication marker chore");
choreService.scheduleChore(replicationMarkerChore);
}
// Leases is not a Thread. Internally it runs a daemon thread. If it gets // Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit. // an unhandled exception, it will just exit.
@ -2262,10 +2286,22 @@ public class HRegionServer extends Thread
final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY); HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
if (isSlowLogTableEnabled) { final boolean walEventTrackerEnabled =
conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT);
if (isSlowLogTableEnabled || walEventTrackerEnabled) {
// default chore duration: 10 min // default chore duration: 10 min
final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000); // After <version number>, we will remove hbase.slowlog.systable.chore.duration conf property
slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder); final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY,
DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION);
final int namedQueueChoreDuration =
conf.getInt(NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT);
// Considering min of slowLogChoreDuration and namedQueueChoreDuration
int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration);
namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration,
this.namedQueueRecorder, this.getConnection());
} }
if (this.nonceManager != null) { if (this.nonceManager != null) {
@ -2315,6 +2351,7 @@ public class HRegionServer extends Thread
this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this); this.rsMobFileCleanerChore = new RSMobFileCleanerChore(this);
registerConfigurationObservers(); registerConfigurationObservers();
initializeReplicationMarkerChore();
} }
private void registerConfigurationObservers() { private void registerConfigurationObservers() {
@ -2795,7 +2832,8 @@ public class HRegionServer extends Thread
shutdownChore(healthCheckChore); shutdownChore(healthCheckChore);
shutdownChore(storefileRefresher); shutdownChore(storefileRefresher);
shutdownChore(fsUtilizationChore); shutdownChore(fsUtilizationChore);
shutdownChore(slowLogTableOpsChore); shutdownChore(namedQueueServiceChore);
shutdownChore(replicationMarkerChore);
shutdownChore(rsMobFileCleanerChore); shutdownChore(rsMobFileCleanerChore);
// cancel the remaining scheduled chores (in case we missed out any) // cancel the remaining scheduled chores (in case we missed out any)
// TODO: cancel will not cleanup the chores, so we need make sure we do not miss any // TODO: cancel will not cleanup the chores, so we need make sure we do not miss any

View File

@ -601,7 +601,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
return newPath; return newPath;
} }
Path getOldPath() { public Path getOldPath() {
long currentFilenum = this.filenum.get(); long currentFilenum = this.filenum.get();
Path oldPath = null; Path oldPath = null;
if (currentFilenum > 0) { if (currentFilenum > 0) {

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.namequeues.WALEventTrackerPayload;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class WALEventTrackerListener implements WALActionsListener {
private final Configuration conf;
private final NamedQueueRecorder namedQueueRecorder;
private final String serverName;
public enum WalState {
ROLLING,
ROLLED,
ACTIVE
}
public WALEventTrackerListener(Configuration conf, NamedQueueRecorder namedQueueRecorder,
ServerName serverName) {
this.conf = conf;
this.namedQueueRecorder = namedQueueRecorder;
this.serverName = serverName.getHostname();
}
@Override
public void preLogRoll(Path oldPath, Path newPath) {
if (oldPath != null) {
// oldPath can be null for first wal
// Just persist the last component of path not the whole walName which includes filesystem
// scheme, walDir.
WALEventTrackerPayload payloadForOldPath =
getPayload(oldPath.getName(), WalState.ROLLING.name(), 0L);
this.namedQueueRecorder.addRecord(payloadForOldPath);
}
}
@Override
public void postLogRoll(Path oldPath, Path newPath) {
// Create 2 entries entry in RingBuffer.
// 1. Change state to Rolled for oldPath
// 2. Change state to Active for newPath.
if (oldPath != null) {
// oldPath can be null for first wal
// Just persist the last component of path not the whole walName which includes filesystem
// scheme, walDir.
long fileLength = 0L;
try {
FileSystem fs = oldPath.getFileSystem(this.conf);
fileLength = fs.getFileStatus(oldPath).getLen();
} catch (IOException ioe) {
// Saving wal length is best effort. In case of any exception just ignore.
}
WALEventTrackerPayload payloadForOldPath =
getPayload(oldPath.getName(), WalState.ROLLED.name(), fileLength);
this.namedQueueRecorder.addRecord(payloadForOldPath);
}
WALEventTrackerPayload payloadForNewPath =
getPayload(newPath.getName(), WalState.ACTIVE.name(), 0L);
this.namedQueueRecorder.addRecord(payloadForNewPath);
}
private WALEventTrackerPayload getPayload(String path, String state, long walLength) {
long timestamp = EnvironmentEdgeManager.currentTime();
WALEventTrackerPayload payload =
new WALEventTrackerPayload(serverName, path, timestamp, state, walLength);
return payload;
}
}

View File

@ -17,10 +17,13 @@
*/ */
package org.apache.hadoop.hbase.regionserver.wal; package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.function.Function; import java.util.function.Function;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -28,6 +31,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
@ -217,4 +221,12 @@ public class WALUtil {
cells.trimToSize(); cells.trimToSize();
} }
} }
public static void writeReplicationMarkerAndSync(WAL wal, MultiVersionConcurrencyControl mvcc,
RegionInfo regionInfo, byte[] rowKey, long timestamp) throws IOException {
NavigableMap<byte[], Integer> replicationScope = new TreeMap<>(Bytes.BYTES_COMPARATOR);
replicationScope.put(WALEdit.METAFAMILY, REPLICATION_SCOPE_GLOBAL);
writeMarker(wal, replicationScope, regionInfo,
WALEdit.createReplicationMarkerEdit(rowKey, timestamp), mvcc, null);
}
} }

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.master;
import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This will create {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} table if
* hbase.regionserver.replication.sink.tracker.enabled config key is enabled and table not created
**/
@InterfaceAudience.Private
public final class ReplicationSinkTrackerTableCreator {
private static final Logger LOG =
LoggerFactory.getLogger(ReplicationSinkTrackerTableCreator.class);
private static final long TTL = TimeUnit.DAYS.toSeconds(365); // 1 year in seconds
public static final byte[] RS_COLUMN = Bytes.toBytes("region_server_name");
public static final byte[] WAL_NAME_COLUMN = Bytes.toBytes("wal_name");
public static final byte[] TIMESTAMP_COLUMN = Bytes.toBytes("timestamp");
public static final byte[] OFFSET_COLUMN = Bytes.toBytes("offset");
/** Will create {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} table if this conf is enabled **/
public static final String REPLICATION_SINK_TRACKER_ENABLED_KEY =
"hbase.regionserver.replication.sink.tracker.enabled";
public static final boolean REPLICATION_SINK_TRACKER_ENABLED_DEFAULT = false;
/** The {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} info family as a string */
private static final String REPLICATION_SINK_TRACKER_INFO_FAMILY_STR = "info";
/** The {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} info family in array of bytes */
public static final byte[] REPLICATION_SINK_TRACKER_INFO_FAMILY =
Bytes.toBytes(REPLICATION_SINK_TRACKER_INFO_FAMILY_STR);
public static final String REPLICATION_SINK_TRACKER_TABLE_NAME_STR = "REPLICATION.SINK_TRACKER";
/* Private default constructor */
private ReplicationSinkTrackerTableCreator() {
}
/**
* {@link #REPLICATION_SINK_TRACKER_TABLE_NAME_STR} table name - can be enabled with config -
* hbase.regionserver.replication.sink.tracker.enabled
*/
public static final TableName REPLICATION_SINK_TRACKER_TABLE_NAME =
TableName.valueOf(REPLICATION_SINK_TRACKER_TABLE_NAME_STR);
private static final TableDescriptorBuilder TABLE_DESCRIPTOR_BUILDER = TableDescriptorBuilder
.newBuilder(REPLICATION_SINK_TRACKER_TABLE_NAME).setRegionReplication(1)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(REPLICATION_SINK_TRACKER_INFO_FAMILY)
.setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBlockCacheEnabled(false).setMaxVersions(1)
.setTimeToLive((int) TTL).build());
/*
* We will create this table only if hbase.regionserver.replication.sink.tracker.enabled is
* enabled and table doesn't exists already.
*/
public static void createIfNeededAndNotExists(Configuration conf, MasterServices masterServices)
throws IOException {
boolean replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY,
REPLICATION_SINK_TRACKER_ENABLED_DEFAULT);
if (!replicationSinkTrackerEnabled) {
LOG.info("replication sink tracker requests logging to table {} is disabled." + " Quitting.",
REPLICATION_SINK_TRACKER_TABLE_NAME_STR);
return;
}
if (!masterServices.getTableDescriptors().exists(REPLICATION_SINK_TRACKER_TABLE_NAME)) {
LOG.info("{} table not found. Creating.", REPLICATION_SINK_TRACKER_TABLE_NAME_STR);
masterServices.createTable(TABLE_DESCRIPTOR_BUILDER.build(), null, 0L, NO_NONCE);
}
}
}

View File

@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This chore is responsible to create replication marker rows with special WALEdit with family as
* {@link org.apache.hadoop.hbase.wal.WALEdit#METAFAMILY} and column qualifier as
* {@link WALEdit#REPLICATION_MARKER} and empty value. If config key
* {@link #REPLICATION_MARKER_ENABLED_KEY} is set to true, then we will create 1 marker row every
* {@link #REPLICATION_MARKER_CHORE_DURATION_KEY} ms
* {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader} will populate
* the Replication Marker edit with region_server_name, wal_name and wal_offset encoded in
* {@link org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ReplicationMarkerDescriptor}
* object. {@link org.apache.hadoop.hbase.replication.regionserver.Replication} will change the
* REPLICATION_SCOPE for this edit to GLOBAL so that it can replicate. On the sink cluster,
* {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSink} will convert the
* ReplicationMarkerDescriptor into a Put mutation to REPLICATION_SINK_TRACKER_TABLE_NAME_STR table.
*/
@InterfaceAudience.Private
public class ReplicationMarkerChore extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationMarkerChore.class);
private static final MultiVersionConcurrencyControl MVCC = new MultiVersionConcurrencyControl();
public static final RegionInfo REGION_INFO =
RegionInfoBuilder.newBuilder(REPLICATION_SINK_TRACKER_TABLE_NAME).build();
private static final String DELIMITER = "_";
private final RegionServerServices rsServices;
private WAL wal;
public static final String REPLICATION_MARKER_ENABLED_KEY =
"hbase.regionserver.replication.marker.enabled";
public static final boolean REPLICATION_MARKER_ENABLED_DEFAULT = false;
public static final String REPLICATION_MARKER_CHORE_DURATION_KEY =
"hbase.regionserver.replication.marker.chore.duration";
public static final int REPLICATION_MARKER_CHORE_DURATION_DEFAULT = 30 * 1000; // 30 seconds
public ReplicationMarkerChore(final Stoppable stopper, final RegionServerServices rsServices,
int period) {
super("ReplicationTrackerChore", stopper, period);
this.rsServices = rsServices;
}
@Override
protected void chore() {
if (wal == null) {
try {
// TODO: We need to add support for multi WAL implementation.
wal = rsServices.getWAL(null);
} catch (IOException ioe) {
LOG.warn("Unable to get WAL ", ioe);
// Shouldn't happen. Ignore and wait for the next chore run.
return;
}
}
String serverName = rsServices.getServerName().getServerName();
long timeStamp = EnvironmentEdgeManager.currentTime();
// We only have timestamp in ReplicationMarkerDescriptor and the remaining properties walname,
// regionserver name and wal offset at ReplicationSourceWALReaderThread.
byte[] rowKey = getRowKey(serverName, timeStamp);
if (LOG.isTraceEnabled()) {
LOG.trace("Creating replication marker edit.");
}
// This creates a new ArrayList of all the online regions for every call.
List<? extends Region> regions = rsServices.getRegions();
if (regions.isEmpty()) {
LOG.info("There are no online regions for this server, so skipping adding replication marker"
+ " rows for this regionserver");
return;
}
Region region = regions.get(ThreadLocalRandom.current().nextInt(regions.size()));
try {
WALUtil.writeReplicationMarkerAndSync(wal, MVCC, region.getRegionInfo(), rowKey, timeStamp);
} catch (IOException ioe) {
LOG.error("Exception while sync'ing replication tracker edit", ioe);
// TODO: Should we stop region server or add a metric and keep going.
}
}
/**
* Creates a rowkey with region server name and timestamp.
* @param serverName region server name
* @param timestamp timestamp
*/
public static byte[] getRowKey(String serverName, long timestamp) {
// converting to string since this will help seeing the timestamp in string format using
// hbase shell commands.
String timestampStr = String.valueOf(timestamp);
final String rowKeyStr = serverName + DELIMITER + timestampStr;
return Bytes.toBytes(rowKeyStr);
}
}

View File

@ -17,6 +17,16 @@
*/ */
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_INFO_FAMILY;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -61,6 +71,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
@ -107,6 +118,7 @@ public class ReplicationSink {
* Row size threshold for multi requests above which a warning is logged * Row size threshold for multi requests above which a warning is logged
*/ */
private final int rowSizeWarnThreshold; private final int rowSizeWarnThreshold;
private boolean replicationSinkTrackerEnabled;
/** /**
* Create a sink for replication * Create a sink for replication
@ -117,6 +129,8 @@ public class ReplicationSink {
this.conf = HBaseConfiguration.create(conf); this.conf = HBaseConfiguration.create(conf);
rowSizeWarnThreshold = rowSizeWarnThreshold =
conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT); conf.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, HConstants.BATCH_ROWS_THRESHOLD_DEFAULT);
replicationSinkTrackerEnabled = conf.getBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY,
REPLICATION_SINK_TRACKER_ENABLED_DEFAULT);
decorateConf(); decorateConf();
this.metrics = new MetricsSink(); this.metrics = new MetricsSink();
this.walEntrySinkFilter = setupWALEntrySinkFilter(); this.walEntrySinkFilter = setupWALEntrySinkFilter();
@ -230,6 +244,18 @@ public class ReplicationSink {
bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>()); bulkLoadsPerClusters.computeIfAbsent(bld.getClusterIdsList(), k -> new HashMap<>());
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
} }
} else if (CellUtil.matchingQualifier(cell, WALEdit.REPLICATION_MARKER)) {
Mutation put = processReplicationMarkerEntry(cell);
if (put == null) {
continue;
}
table = REPLICATION_SINK_TRACKER_TABLE_NAME;
List<UUID> clusterIds = new ArrayList<>();
for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
clusterIds.add(toUUID(clusterId));
}
put.setClusterIds(clusterIds);
addToHashMultiMap(rowMap, table, clusterIds, put);
} else { } else {
// Handle wal replication // Handle wal replication
if (isNewRowOrType(previousCell, cell)) { if (isNewRowOrType(previousCell, cell)) {
@ -292,6 +318,33 @@ public class ReplicationSink {
} }
} }
/*
* First check if config key hbase.regionserver.replication.sink.tracker.enabled is true or not.
* If false, then ignore this cell. If set to true, de-serialize value into
* ReplicationTrackerDescriptor. Create a Put mutation with regionserver name, walname, offset and
* timestamp from ReplicationMarkerDescriptor.
*/
private Put processReplicationMarkerEntry(Cell cell) throws IOException {
// If source is emitting replication marker rows but sink is not accepting them,
// ignore the edits.
if (!replicationSinkTrackerEnabled) {
return null;
}
WALProtos.ReplicationMarkerDescriptor descriptor =
WALProtos.ReplicationMarkerDescriptor.parseFrom(new ByteArrayInputStream(cell.getValueArray(),
cell.getValueOffset(), cell.getValueLength()));
Put put = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, RS_COLUMN, cell.getTimestamp(),
(Bytes.toBytes(descriptor.getRegionServerName())));
put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, WAL_NAME_COLUMN, cell.getTimestamp(),
Bytes.toBytes(descriptor.getWalName()));
put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, TIMESTAMP_COLUMN, cell.getTimestamp(),
Bytes.toBytes(cell.getTimestamp()));
put.addColumn(REPLICATION_SINK_TRACKER_INFO_FAMILY, OFFSET_COLUMN, cell.getTimestamp(),
Bytes.toBytes(descriptor.getOffset()));
return put;
}
private void buildBulkLoadHFileMap( private void buildBulkLoadHFileMap(
final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table, final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
BulkLoadDescriptor bld) throws IOException { BulkLoadDescriptor bld) throws IOException {

View File

@ -827,4 +827,9 @@ public class ReplicationSource implements ReplicationSourceInterface {
public String logPeerId() { public String logPeerId() {
return "peerId=" + this.getPeerId() + ","; return "peerId=" + this.getPeerId() + ",";
} }
// Visible for testing purpose
public long getTotalReplicatedEdits() {
return totalReplicatedEdits.get();
}
} }

View File

@ -69,6 +69,10 @@ class ReplicationSourceWALActionListener implements WALActionsListener {
if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) { if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
return; return;
} }
// Allow replication marker row to pass through.
if (WALEdit.isReplicationMarkerEdit(logEdit)) {
return;
}
// For replay, or if all the cells are markers, do not need to store replication scope. // For replay, or if all the cells are markers, do not need to store replication scope.
if ( if (
logEdit.isReplay() || logEdit.getCells().stream().allMatch(c -> WALEdit.isMetaEditFamily(c)) logEdit.isReplay() || logEdit.getCells().stream().allMatch(c -> WALEdit.isMetaEditFamily(c))

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -30,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
@ -42,6 +44,9 @@ import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
@ -178,6 +183,7 @@ class ReplicationSourceWALReader extends Thread {
} }
LOG.trace("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}", LOG.trace("updating TimeStampOfLastAttempted to {}, from entry {}, for source queue: {}",
entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId()); entry.getKey().getWriteTime(), entry.getKey(), this.source.getQueueId());
updateReplicationMarkerEdit(entry, batch.getLastWalPosition());
long entrySize = getEntrySizeIncludeBulkLoad(entry); long entrySize = getEntrySizeIncludeBulkLoad(entry);
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry); long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
batch.addEntry(entry, entrySize); batch.addEntry(entry, entrySize);
@ -341,6 +347,10 @@ class ReplicationSourceWALReader extends Thread {
} }
protected final Entry filterEntry(Entry entry) { protected final Entry filterEntry(Entry entry) {
// Always replicate if this edit is Replication Marker edit.
if (entry != null && WALEdit.isReplicationMarkerEdit(entry.getEdit())) {
return entry;
}
Entry filtered = filter.filter(entry); Entry filtered = filter.filter(entry);
if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) { if (entry != null && (filtered == null || filtered.getEdit().size() == 0)) {
LOG.trace("Filtered entry for replication: {}", entry); LOG.trace("Filtered entry for replication: {}", entry);
@ -451,6 +461,36 @@ class ReplicationSourceWALReader extends Thread {
return totalStoreFilesSize; return totalStoreFilesSize;
} }
/*
* Create @ReplicationMarkerDescriptor with region_server_name, wal_name and offset and set to
* cell's value.
*/
private void updateReplicationMarkerEdit(Entry entry, long offset) {
WALEdit edit = entry.getEdit();
// Return early if it is not ReplicationMarker edit.
if (!WALEdit.isReplicationMarkerEdit(edit)) {
return;
}
List<Cell> cells = edit.getCells();
Preconditions.checkArgument(cells.size() == 1, "ReplicationMarker should have only 1 cell");
Cell cell = cells.get(0);
// Create a descriptor with region_server_name, wal_name and offset
WALProtos.ReplicationMarkerDescriptor.Builder builder =
WALProtos.ReplicationMarkerDescriptor.newBuilder();
builder.setRegionServerName(this.source.getServer().getServerName().getHostname());
builder.setWalName(getCurrentPath().getName());
builder.setOffset(offset);
WALProtos.ReplicationMarkerDescriptor descriptor = builder.build();
// Create a new KeyValue
KeyValue kv = new KeyValue(CellUtil.cloneRow(cell), CellUtil.cloneFamily(cell),
CellUtil.cloneQualifier(cell), cell.getTimestamp(), descriptor.toByteArray());
ArrayList<Cell> newCells = new ArrayList<>();
newCells.add(kv);
// Update edit with new cell.
edit.setCells(newCells);
}
/** /**
* @param size delta size for grown buffer * @param size delta size for grown buffer
* @return true if we should clear buffer and push all * @return true if we should clear buffer and push all

View File

@ -132,6 +132,22 @@ public class WALEdit implements HeapSize {
@InterfaceAudience.Private @InterfaceAudience.Private
public static final byte[] BULK_LOAD = Bytes.toBytes("HBASE::BULK_LOAD"); public static final byte[] BULK_LOAD = Bytes.toBytes("HBASE::BULK_LOAD");
/**
* Periodically {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore}
* will create marker edits with family as {@link WALEdit#METAFAMILY} and
* {@link WALEdit#REPLICATION_MARKER} as qualifier and an empty value.
* org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader will populate the
* Replication Marker edit with region_server_name, wal_name and wal_offset encoded in
* {@link org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.ReplicationMarkerDescriptor}
* object. {@link org.apache.hadoop.hbase.replication.regionserver.Replication} will change the
* REPLICATION_SCOPE for this edit to GLOBAL so that it can replicate. On the sink cluster,
* {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationSink} will convert the
* ReplicationMarkerDescriptor into a Put mutation to REPLICATION_SINK_TRACKER_TABLE_NAME_STR
* table.
*/
@InterfaceAudience.Private
public static final byte[] REPLICATION_MARKER = Bytes.toBytes("HBASE::REPLICATION_MARKER");
private final transient boolean replay; private final transient boolean replay;
private ArrayList<Cell> cells; private ArrayList<Cell> cells;
@ -454,4 +470,28 @@ public class WALEdit implements HeapSize {
this.cells.add(cell); this.cells.add(cell);
return this; return this;
} }
/**
* Creates a replication tracker edit with {@link #METAFAMILY} family and
* {@link #REPLICATION_MARKER} qualifier and has null value.
* @param rowKey rowkey
* @param timestamp timestamp
*/
public static WALEdit createReplicationMarkerEdit(byte[] rowKey, long timestamp) {
KeyValue kv =
new KeyValue(rowKey, METAFAMILY, REPLICATION_MARKER, timestamp, KeyValue.Type.Put);
return new WALEdit().add(kv);
}
/**
* Checks whether this edit is a replication marker edit.
* @param edit edit
* @return true if the cell within an edit has column = METAFAMILY and qualifier =
* REPLICATION_MARKER, false otherwise
*/
public static boolean isReplicationMarkerEdit(WALEdit edit) {
// Check just the first cell from the edit. ReplicationMarker edit will have only 1 cell.
return edit.getCells().size() == 1
&& CellUtil.matchingColumn(edit.getCells().get(0), METAFAMILY, REPLICATION_MARKER);
}
} }

View File

@ -318,6 +318,13 @@ public class WALSplitter {
Entry entry; Entry entry;
startTS = EnvironmentEdgeManager.currentTime(); startTS = EnvironmentEdgeManager.currentTime();
while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) { while ((entry = getNextLogLine(walReader, wal, this.skipErrors)) != null) {
if (WALEdit.isReplicationMarkerEdit(entry.getEdit())) {
// Skip processing the replication marker edits.
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring Replication marker edits.");
}
continue;
}
byte[] region = entry.getKey().getEncodedRegionName(); byte[] region = entry.getKey().getEncodedRegionName();
String encodedRegionNameAsStr = Bytes.toString(region); String encodedRegionNameAsStr = Bytes.toString(region);
Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); Long lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);

View File

@ -0,0 +1,246 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
import static org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore.NAMED_QUEUE_CHORE_DURATION_KEY;
import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.RS_COLUMN;
import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.TIMESTAMP_COLUMN;
import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME;
import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_LENGTH_COLUMN;
import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_NAME_COLUMN;
import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_STATE_COLUMN;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ RegionServerTests.class, MediumTests.class })
public class TestWALEventTracker {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALEventTracker.class);
private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);
private static HBaseTestingUtility TEST_UTIL;
public static Configuration CONF;
@BeforeClass
public static void setup() throws Exception {
CONF = HBaseConfiguration.create();
CONF.setBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, true);
// Set the chore for less than a second.
CONF.setInt(NAMED_QUEUE_CHORE_DURATION_KEY, 900);
CONF.setLong(WALEventTrackerTableAccessor.SLEEP_INTERVAL_KEY, 100);
TEST_UTIL = new HBaseTestingUtility(CONF);
TEST_UTIL.startMiniCluster();
}
@AfterClass
public static void teardown() throws Exception {
LOG.info("Calling teardown");
TEST_UTIL.shutdownMiniHBaseCluster();
}
@Before
public void waitForWalEventTrackerTableCreation() {
Waiter.waitFor(CONF, 10000,
(Waiter.Predicate) () -> TEST_UTIL.getAdmin().tableExists(WAL_EVENT_TRACKER_TABLE_NAME));
}
@Test
public void testWALRolling() throws Exception {
Connection connection = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getConnection();
waitForWALEventTrackerTable(connection);
List<WAL> wals = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWALs();
assertEquals(1, wals.size());
AbstractFSWAL wal = (AbstractFSWAL) wals.get(0);
Path wal1Path = wal.getOldPath();
wal.rollWriter(true);
FileSystem fs = TEST_UTIL.getTestFileSystem();
long wal1Length = fs.getFileStatus(wal1Path).getLen();
Path wal2Path = wal.getOldPath();
String hostName =
TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName().getHostname();
TEST_UTIL.waitFor(5000, () -> getTableCount(connection) >= 3);
List<WALEventTrackerPayload> walEventsList = getRows(hostName, connection);
// There should be atleast 2 events for wal1Name, with ROLLING and ROLLED state. Most of the
// time we will lose ACTIVE event for the first wal creates since hmaster will take some time
// to create hbase:waleventtracker table and by that time RS will already create the first wal
// and will try to persist it.
compareEvents(hostName, wal1Path.getName(), walEventsList,
new ArrayList<>(Arrays.asList(WALEventTrackerListener.WalState.ROLLING.name(),
WALEventTrackerListener.WalState.ROLLED.name())),
false);
// There should be only 1 event for wal2Name which is current wal, with ACTIVE state
compareEvents(hostName, wal2Path.getName(), walEventsList,
new ArrayList<>(Arrays.asList(WALEventTrackerListener.WalState.ACTIVE.name())), true);
// Check that event with wal1Path and state ROLLED has the wal length set.
checkWALRolledEventHasSize(walEventsList, wal1Path.getName(), wal1Length);
}
private void checkWALRolledEventHasSize(List<WALEventTrackerPayload> walEvents, String walName,
long actualSize) {
List<WALEventTrackerPayload> eventsFilteredByNameState = new ArrayList<>();
// Filter the list by walName and wal state.
for (WALEventTrackerPayload event : walEvents) {
if (
walName.equals(event.getWalName())
&& WALEventTrackerListener.WalState.ROLLED.name().equals(event.getState())
) {
eventsFilteredByNameState.add(event);
}
}
assertEquals(1, eventsFilteredByNameState.size());
// We are not comparing the size of the WAL in the tracker table with actual size.
// For AsyncWAL implementation, since the WAL file is closed in an async fashion, the WAL length
// will always be incorrect.
// For FSHLog implementation, we close the WAL in an executor thread. So there will always be
// a difference of trailer size bytes.
// assertEquals(actualSize, eventsFilteredByNameState.get(0).getWalLength());
}
/**
* Compare the events from @{@link WALEventTrackerTableAccessor#WAL_EVENT_TRACKER_TABLE_NAME}
* @param hostName hostname
* @param walName walname
* @param walEvents event from table
* @param expectedStates expected states for the hostname and wal name
* @param strict whether to check strictly or not. Sometimes we lose the ACTIVE state
* event for the first wal since it takes some time for hmaster to create
* the table and by that time RS already creates the first WAL and will try
* to persist ACTIVE event to waleventtracker table.
*/
private void compareEvents(String hostName, String walName,
List<WALEventTrackerPayload> walEvents, List<String> expectedStates, boolean strict) {
List<WALEventTrackerPayload> eventsFilteredByWalName = new ArrayList<>();
// Assert that all the events have the same host name i.e they came from the same RS.
for (WALEventTrackerPayload event : walEvents) {
assertEquals(hostName, event.getRsName());
}
// Filter the list by walName.
for (WALEventTrackerPayload event : walEvents) {
if (walName.equals(event.getWalName())) {
eventsFilteredByWalName.add(event);
}
}
// Assert that the list of events after filtering by walName should be same as expected states.
if (strict) {
assertEquals(expectedStates.size(), eventsFilteredByWalName.size());
}
for (WALEventTrackerPayload event : eventsFilteredByWalName) {
expectedStates.remove(event.getState());
}
assertEquals(0, expectedStates.size());
}
private void waitForWALEventTrackerTable(Connection connection) throws IOException {
TEST_UTIL.waitFor(5000, () -> TEST_UTIL.getAdmin().tableExists(WAL_EVENT_TRACKER_TABLE_NAME));
}
private List<WALEventTrackerPayload> getRows(String rowKeyPrefix, Connection connection)
throws IOException {
List<WALEventTrackerPayload> list = new ArrayList<>();
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(rowKeyPrefix));
Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME);
ResultScanner scanner = table.getScanner(scan);
Result r;
while ((r = scanner.next()) != null) {
List<Cell> cells = r.listCells();
list.add(getPayload(cells));
}
return list;
}
private WALEventTrackerPayload getPayload(List<Cell> cells) {
String rsName = null, walName = null, walState = null;
long timestamp = 0L, walLength = 0L;
for (Cell cell : cells) {
byte[] qualifier = CellUtil.cloneQualifier(cell);
byte[] value = CellUtil.cloneValue(cell);
String qualifierStr = Bytes.toString(qualifier);
if (RS_COLUMN.equals(qualifierStr)) {
rsName = Bytes.toString(value);
} else if (WAL_NAME_COLUMN.equals(qualifierStr)) {
walName = Bytes.toString(value);
} else if (WAL_STATE_COLUMN.equals(qualifierStr)) {
walState = Bytes.toString(value);
} else if (TIMESTAMP_COLUMN.equals(qualifierStr)) {
timestamp = Bytes.toLong(value);
} else if (WAL_LENGTH_COLUMN.equals(qualifierStr)) {
walLength = Bytes.toLong(value);
}
}
return new WALEventTrackerPayload(rsName, walName, timestamp, walState, walLength);
}
private int getTableCount(Connection connection) throws Exception {
Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME);
ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM));
int count = 0;
while (resultScanner.next() != null) {
count++;
}
LOG.info("Table count: " + count);
return count;
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestWALEventTrackerTableAccessor {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALEventTrackerTableAccessor.class);
/*
* Tests that rowkey is getting constructed correctly.
*/
@Test
public void testRowKey() {
String rsName = "test-region-server";
String walName = "test-wal-0";
long timeStamp = EnvironmentEdgeManager.currentTime();
String walState = WALEventTrackerListener.WalState.ACTIVE.name();
long walLength = 100L;
WALEventTrackerPayload payload =
new WALEventTrackerPayload(rsName, walName, timeStamp, walState, walLength);
byte[] rowKeyBytes = WALEventTrackerTableAccessor.getRowKey(payload);
String rowKeyBytesStr = Bytes.toString(rowKeyBytes);
String[] fields = rowKeyBytesStr.split(WALEventTrackerTableAccessor.DELIMITER, -1);
// This is the format of rowkey: walName_timestamp_walState;
assertEquals(walName, fields[0]);
assertEquals(timeStamp, Long.valueOf(fields[1]).longValue());
assertEquals(walState, fields[2]);
}
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.namequeues;
import static org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator.WAL_EVENT_TRACKER_ENABLED_KEY;
import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
@Category(SmallTests.class)
public class TestWalEventTrackerQueueService {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWalEventTrackerQueueService.class);
@Rule
public TestName name = new TestName();
/*
* Test whether wal event tracker metrics are being incremented.
*/
@Test
public void testMetrics() throws Exception {
String rsName = "test-region-server";
String walName = "test-wal-0";
long timeStamp = EnvironmentEdgeManager.currentTime();
String walState = WALEventTrackerListener.WalState.ACTIVE.name();
long walLength = 100L;
WALEventTrackerPayload payload =
new WALEventTrackerPayload(rsName, walName, timeStamp, walState, walLength);
Configuration conf = HBaseConfiguration.create();
conf.setBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, true);
conf.setLong(WALEventTrackerTableAccessor.SLEEP_INTERVAL_KEY, 100);
MetricsWALEventTrackerSourceImpl source = new MetricsWALEventTrackerSourceImpl(
name.getMethodName(), name.getMethodName(), name.getMethodName(), name.getMethodName());
WALEventTrackerQueueService service = new WALEventTrackerQueueService(conf, source);
service.addToQueue(payload);
Connection mockConnection = mock(Connection.class);
doReturn(conf).when(mockConnection).getConfiguration();
// Always throw IOException whenever mock connection is being used.
doThrow(new IOException()).when(mockConnection).getTable(WAL_EVENT_TRACKER_TABLE_NAME);
assertEquals(0L, source.getFailedPuts());
assertEquals(0L, source.getNumRecordsFailedPuts());
// Persist all the events.
service.persistAll(mockConnection);
assertEquals(1L, source.getFailedPuts());
assertEquals(1L, source.getNumRecordsFailedPuts());
// Verify that we tried MAX_RETRY_ATTEMPTS retry attempts to persist.
verify(mockConnection, times(1 + WALEventTrackerTableAccessor.DEFAULT_MAX_ATTEMPTS))
.getTable(WAL_EVENT_TRACKER_TABLE_NAME);
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY;
import static org.apache.hadoop.hbase.wal.WALEdit.REPLICATION_MARKER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ SmallTests.class })
public class TestWALEdit {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestWALEdit.class);
private static final String RS_NAME = "test-region-server-name";
/**
* Tests that
* {@link org.apache.hadoop.hbase.wal.WALEdit#createReplicationMarkerEdit(byte[], long)} method is
* creating WALEdit with correct family and qualifier.
*/
@Test
public void testCreateReplicationMarkerEdit() {
long timestamp = EnvironmentEdgeManager.currentTime();
byte[] rowkey = ReplicationMarkerChore.getRowKey(RS_NAME, timestamp);
WALEdit edit = WALEdit.createReplicationMarkerEdit(rowkey, timestamp);
assertEquals(1, edit.getCells().size());
Cell cell = edit.getCells().get(0);
assertTrue(CellUtil.matchingFamily(cell, METAFAMILY));
assertTrue(CellUtil.matchingQualifier(cell, REPLICATION_MARKER));
assertTrue(WALEdit.isReplicationMarkerEdit(edit));
}
}

View File

@ -0,0 +1,265 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.OFFSET_COLUMN;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_ENABLED_KEY;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.RS_COLUMN;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.TIMESTAMP_COLUMN;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.WAL_NAME_COLUMN;
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_CHORE_DURATION_KEY;
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This test creates 2 mini hbase cluster. One cluster with
* "hbase.regionserver.replication.marker.enabled" conf key. This will create
* {@link org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore} which will create
* marker rows to be replicated to sink cluster. Second cluster with
* "hbase.regionserver.replication.sink.tracker.enabled" conf key enabled. This will persist the
* marker rows coming from peer cluster to persist to REPLICATION.SINK_TRACKER table.
**/
@Category({ ReplicationTests.class, MediumTests.class })
public class TestReplicationMarker {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationMarker.class);
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationMarker.class);
private static Configuration conf1;
private static Configuration conf2;
private static HBaseTestingUtility utility1;
private static HBaseTestingUtility utility2;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1 = HBaseConfiguration.create();
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
conf2 = new Configuration(conf1);
// Run the replication marker chore in cluster1.
conf1.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true);
conf1.setLong(REPLICATION_MARKER_CHORE_DURATION_KEY, 1000); // 1 sec
utility1 = new HBaseTestingUtility(conf1);
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
// Enable the replication sink tracker for cluster 2
conf2.setBoolean(REPLICATION_SINK_TRACKER_ENABLED_KEY, true);
utility2 = new HBaseTestingUtility(conf2);
// Start cluster 2 first so that hbase:replicationsinktracker table gets created first.
utility2.startMiniCluster(1);
waitForReplicationTrackerTableCreation();
// Start cluster1
utility1.startMiniCluster(1);
Admin admin1 = utility1.getAdmin();
ReplicationPeerConfigBuilder rpcBuilder = ReplicationPeerConfig.newBuilder();
rpcBuilder.setClusterKey(utility2.getClusterKey());
admin1.addReplicationPeer("1", rpcBuilder.build());
ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0)
.getReplicationSourceService().getReplicationManager();
// Wait until the peer gets established.
Waiter.waitFor(conf1, 10000, (Waiter.Predicate) () -> manager.getSources().size() == 1);
}
private static void waitForReplicationTrackerTableCreation() {
Waiter.waitFor(conf2, 10000, (Waiter.Predicate) () -> utility2.getAdmin()
.tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME));
}
@AfterClass
public static void tearDown() throws Exception {
utility1.shutdownMiniCluster();
utility2.shutdownMiniCluster();
}
@Test
public void testReplicationMarkerRow() throws Exception {
// We have configured ReplicationTrackerChore to run every second. Sleeping so that it will
// create enough sentinel rows.
Thread.sleep(5000);
WAL wal1 = utility1.getHBaseCluster().getRegionServer(0).getWAL(null);
String walName1ForCluster1 = ((AbstractFSWAL) wal1).getCurrentFileName().getName();
String rs1Name = utility1.getHBaseCluster().getRegionServer(0).getServerName().getHostname();
// Since we sync the marker edits while appending to wal, all the edits should be visible
// to Replication threads immediately.
assertTrue(getReplicatedEntries() >= 5);
// Force log roll.
wal1.rollWriter(true);
String walName2ForCluster1 = ((AbstractFSWAL) wal1).getCurrentFileName().getName();
Connection connection2 = utility2.getMiniHBaseCluster().getRegionServer(0).getConnection();
// Sleep for 5 more seconds to get marker rows with new wal name.
Thread.sleep(5000);
// Wait for cluster 2 to have atleast 8 tracker rows from cluster1.
utility2.waitFor(5000, () -> getTableCount(connection2) >= 8);
// Get replication marker rows from cluster2
List<ReplicationSinkTrackerRow> list = getRows(connection2);
for (ReplicationSinkTrackerRow desc : list) {
// All the tracker rows should have same region server name i.e. rs of cluster1
assertEquals(rs1Name, desc.getRegionServerName());
// All the tracker rows will have either wal1 or wal2 name.
assertTrue(walName1ForCluster1.equals(desc.getWalName())
|| walName2ForCluster1.equals(desc.getWalName()));
}
// This table shouldn't exist on cluster1 since
// hbase.regionserver.replication.sink.tracker.enabled is not enabled on this cluster.
assertFalse(utility1.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME));
// This table shouldn't exist on cluster1 since
// hbase.regionserver.replication.sink.tracker.enabled is enabled on this cluster.
assertTrue(utility2.getAdmin().tableExists(REPLICATION_SINK_TRACKER_TABLE_NAME));
}
/*
* Get rows for replication sink tracker table.
*/
private List<ReplicationSinkTrackerRow> getRows(Connection connection) throws IOException {
List<ReplicationSinkTrackerRow> list = new ArrayList<>();
Scan scan = new Scan();
Table table = connection.getTable(REPLICATION_SINK_TRACKER_TABLE_NAME);
ResultScanner scanner = table.getScanner(scan);
Result r;
while ((r = scanner.next()) != null) {
List<Cell> cells = r.listCells();
list.add(getPayload(cells));
}
return list;
}
private ReplicationSinkTrackerRow getPayload(List<Cell> cells) {
String rsName = null, walName = null;
Long offset = null;
long timestamp = 0L;
for (Cell cell : cells) {
byte[] qualifier = CellUtil.cloneQualifier(cell);
byte[] value = CellUtil.cloneValue(cell);
if (Bytes.equals(RS_COLUMN, qualifier)) {
rsName = Bytes.toString(value);
} else if (Bytes.equals(WAL_NAME_COLUMN, qualifier)) {
walName = Bytes.toString(value);
} else if (Bytes.equals(TIMESTAMP_COLUMN, qualifier)) {
timestamp = Bytes.toLong(value);
} else if (Bytes.equals(OFFSET_COLUMN, qualifier)) {
offset = Bytes.toLong(value);
}
}
ReplicationSinkTrackerRow row =
new ReplicationSinkTrackerRow(rsName, walName, timestamp, offset);
return row;
}
static class ReplicationSinkTrackerRow {
private String region_server_name;
private String wal_name;
private long timestamp;
private long offset;
public ReplicationSinkTrackerRow(String region_server_name, String wal_name, long timestamp,
long offset) {
this.region_server_name = region_server_name;
this.wal_name = wal_name;
this.timestamp = timestamp;
this.offset = offset;
}
public String getRegionServerName() {
return region_server_name;
}
public String getWalName() {
return wal_name;
}
public long getTimestamp() {
return timestamp;
}
public long getOffset() {
return offset;
}
@Override
public String toString() {
return "ReplicationSinkTrackerRow{" + "region_server_name='" + region_server_name + '\''
+ ", wal_name='" + wal_name + '\'' + ", timestamp=" + timestamp + ", offset=" + offset
+ '}';
}
}
private int getTableCount(Connection connection) throws Exception {
Table table = connection.getTable(REPLICATION_SINK_TRACKER_TABLE_NAME);
ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM));
int count = 0;
while (resultScanner.next() != null) {
count++;
}
LOG.info("Table count: " + count);
return count;
}
/*
* Return replicated entries from cluster1.
*/
private long getReplicatedEntries() {
ReplicationSourceManager manager = utility1.getHBaseCluster().getRegionServer(0)
.getReplicationSourceService().getReplicationManager();
List<ReplicationSourceInterface> sources = manager.getSources();
assertEquals(1, sources.size());
ReplicationSource source = (ReplicationSource) sources.get(0);
return source.getTotalReplicatedEdits();
}
}

View File

@ -17,6 +17,10 @@
*/ */
package org.apache.hadoop.hbase.wal; package org.apache.hadoop.hbase.wal;
import static org.apache.hadoop.hbase.replication.master.ReplicationSinkTrackerTableCreator.REPLICATION_SINK_TRACKER_TABLE_NAME;
import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.getRowKey;
import static org.apache.hadoop.hbase.wal.WALEdit.METAFAMILY;
import static org.apache.hadoop.hbase.wal.WALEdit.REPLICATION_MARKER;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -60,10 +64,12 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.FaultyProtobufLogReader;
import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter; import org.apache.hadoop.hbase.regionserver.wal.InstrumentedLogWriter;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore;
import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@ -485,6 +491,39 @@ public class TestWALSplit {
assertEquals(11, countWAL(splitLog[0])); assertEquals(11, countWAL(splitLog[0]));
} }
/*
* Tests that WalSplitter ignores replication marker edits.
*/
@Test(timeout = 30000)
public void testSplitRemovesReplicationMarkerEdits() throws IOException {
RegionInfo regionInfo = ReplicationMarkerChore.REGION_INFO;
Path path = new Path(WALDIR, WAL_FILE_PREFIX + "1");
generateReplicationMarkerEdits(path, regionInfo);
useDifferentDFSClient();
List<FileStatus> logFiles =
SplitLogManager.getFileList(conf, Collections.singletonList(WALDIR), null);
assertEquals(1, logFiles.size());
assertEquals(path, logFiles.get(0).getPath());
List<Path> splitPaths = WALSplitter.split(HBASELOGDIR, WALDIR, OLDLOGDIR, fs, conf, wals);
// Make sure that WALSplitter doesn't fail.
assertEquals(0, splitPaths.size());
}
private void generateReplicationMarkerEdits(Path path, RegionInfo regionInfo) throws IOException {
long timestamp = EnvironmentEdgeManager.currentTime();
fs.mkdirs(WALDIR);
try (Writer writer = wals.createWALWriter(fs, path)) {
WALProtos.ReplicationMarkerDescriptor.Builder builder =
WALProtos.ReplicationMarkerDescriptor.newBuilder();
builder.setWalName("wal-name");
builder.setRegionServerName("rs-name");
builder.setOffset(0L);
WALProtos.ReplicationMarkerDescriptor desc = builder.build();
appendEntry(writer, REPLICATION_SINK_TRACKER_TABLE_NAME, regionInfo.getEncodedNameAsBytes(),
getRowKey(desc.getRegionServerName(), timestamp), METAFAMILY, REPLICATION_MARKER, VALUE, 1);
}
}
/** /**
* @param expectedEntries -1 to not assert * @param expectedEntries -1 to not assert
* @return the count across all regions * @return the count across all regions