HBASE-19926 Use a separated class to implement the WALActionListener for Replication

This commit is contained in:
zhangduo 2018-02-04 10:42:33 +08:00
parent 397d34736e
commit 14420e1b41
4 changed files with 105 additions and 87 deletions

View File

@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
@ -42,8 +41,6 @@ import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.yetus.audience.InterfaceAudience;
@ -127,23 +124,8 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty());
if (walProvider != null) {
walProvider.addWALActionsListener(new WALActionsListener() {
@Override
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
replicationManager.preLogRoll(newPath);
}
@Override
public void postLogRoll(Path oldPath, Path newPath) throws IOException {
replicationManager.postLogRoll(newPath);
}
@Override
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
replicationManager.scopeWALEdits(logKey, logEdit);
}
});
walProvider
.addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
}
this.statsThreadPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);

View File

@ -43,8 +43,6 @@ import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Server;
@ -60,13 +58,9 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -74,8 +68,6 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
/**
* This class is responsible to manage all the replication sources. There are two classes of
* sources:
@ -609,43 +601,6 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
void scopeWALEdits(WALKey logKey, WALEdit logEdit) throws IOException {
scopeWALEdits(logKey, logEdit, this.conf);
}
/**
* Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
* compaction WAL edits and if the scope is local.
* @param logKey Key that may get scoped according to its edits
* @param logEdit Edits used to lookup the scopes
* @throws IOException If failed to parse the WALEdit
*/
@VisibleForTesting
static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) throws IOException {
boolean replicationForBulkLoadEnabled =
ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf);
boolean foundOtherEdits = false;
for (Cell cell : logEdit.getCells()) {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
foundOtherEdits = true;
break;
}
}
if (!foundOtherEdits && logEdit.getCells().size() > 0) {
WALProtos.RegionEventDescriptor maybeEvent =
WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0));
if (maybeEvent != null &&
(maybeEvent.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
// In serially replication, we use scopes when reading close marker.
foundOtherEdits = true;
}
}
if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {
((WALKeyImpl) logKey).serializeReplicationScope(false);
}
}
@Override
public void regionServerRemoved(String regionserver) {
transferQueues(ServerName.valueOf(regionserver));

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
/**
* Used to receive new wals.
*/
@InterfaceAudience.Private
class ReplicationSourceWALActionListener implements WALActionsListener {
private final Configuration conf;
private final ReplicationSourceManager manager;
public ReplicationSourceWALActionListener(Configuration conf, ReplicationSourceManager manager) {
this.conf = conf;
this.manager = manager;
}
@Override
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
manager.preLogRoll(newPath);
}
@Override
public void postLogRoll(Path oldPath, Path newPath) throws IOException {
manager.postLogRoll(newPath);
}
@Override
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
scopeWALEdits(logKey, logEdit, conf);
}
/**
* Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
* compaction WAL edits and if the scope is local.
* @param logKey Key that may get scoped according to its edits
* @param logEdit Edits used to lookup the scopes
* @throws IOException If failed to parse the WALEdit
*/
@VisibleForTesting
static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) throws IOException {
boolean replicationForBulkLoadEnabled =
ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf);
boolean foundOtherEdits = false;
for (Cell cell : logEdit.getCells()) {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
foundOtherEdits = true;
break;
}
}
if (!foundOtherEdits && logEdit.getCells().size() > 0) {
WALProtos.RegionEventDescriptor maybeEvent =
WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0));
if (maybeEvent != null &&
(maybeEvent.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
// In serially replication, we use scopes when reading close marker.
foundOtherEdits = true;
}
}
if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {
((WALKeyImpl) logKey).serializeReplicationScope(false);
}
}
}

View File

@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@ -82,7 +81,6 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@ -276,23 +274,8 @@ public abstract class TestReplicationSourceManager {
WALFactory wals =
new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8"));
ReplicationSourceManager replicationManager = replication.getReplicationManager();
wals.getWALProvider().addWALActionsListener(new WALActionsListener() {
@Override
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
replicationManager.preLogRoll(newPath);
}
@Override
public void postLogRoll(Path oldPath, Path newPath) throws IOException {
replicationManager.postLogRoll(newPath);
}
@Override
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
replicationManager.scopeWALEdits(logKey, logEdit);
}
});
wals.getWALProvider()
.addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
final WAL wal = wals.getWAL(hri);
manager.init();
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame"));
@ -450,7 +433,7 @@ public abstract class TestReplicationSourceManager {
RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW)
.setEndKey(HConstants.EMPTY_END_ROW).build();
WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
ReplicationSourceManager.scopeWALEdits(new WALKeyImpl(), edit, conf);
ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, conf);
}
@Test
@ -462,7 +445,7 @@ public abstract class TestReplicationSourceManager {
WALKeyImpl logKey = new WALKeyImpl(scope);
// 3. Get the scopes for the key
ReplicationSourceManager.scopeWALEdits(logKey, logEdit, conf);
ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf);
// 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled
assertNull("No bulk load entries scope should be added if bulk load replication is disabled.",
@ -481,7 +464,7 @@ public abstract class TestReplicationSourceManager {
bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
// 4. Get the scopes for the key
ReplicationSourceManager.scopeWALEdits(logKey, logEdit, bulkLoadConf);
ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf);
NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
// Assert family with replication scope global is present in the key scopes