HBASE-19926 Use a separated class to implement the WALActionListener for Replication

This commit is contained in:
zhangduo 2018-02-04 10:42:33 +08:00
parent 0ca7a2e916
commit 3b603d2c08
4 changed files with 108 additions and 89 deletions

View File

@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService; import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationPeers;
@ -44,8 +43,6 @@ import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
@ -130,23 +127,8 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId, replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty()); walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty());
if (walProvider != null) { if (walProvider != null) {
walProvider.addWALActionsListener(new WALActionsListener() { walProvider
.addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
@Override
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
replicationManager.preLogRoll(newPath);
}
@Override
public void postLogRoll(Path oldPath, Path newPath) throws IOException {
replicationManager.postLogRoll(newPath);
}
@Override
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
replicationManager.scopeWALEdits(logKey, logEdit);
}
});
} }
this.statsThreadPeriod = this.statsThreadPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60); this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);

View File

@ -40,12 +40,9 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
@ -64,21 +61,16 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues; import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationTracker; import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
/** /**
* This class is responsible to manage all the replication * This class is responsible to manage all the replication
* sources. There are two classes of sources: * sources. There are two classes of sources:
@ -471,43 +463,6 @@ public class ReplicationSourceManager implements ReplicationListener {
return totalBufferUsed; return totalBufferUsed;
} }
void scopeWALEdits(WALKey logKey, WALEdit logEdit) throws IOException {
scopeWALEdits(logKey, logEdit, this.conf);
}
/**
* Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
* compaction WAL edits and if the scope is local.
* @param logKey Key that may get scoped according to its edits
* @param logEdit Edits used to lookup the scopes
* @throws IOException If failed to parse the WALEdit
*/
@VisibleForTesting
static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) throws IOException {
boolean replicationForBulkLoadEnabled =
ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf);
boolean foundOtherEdits = false;
for (Cell cell : logEdit.getCells()) {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
foundOtherEdits = true;
break;
}
}
if (!foundOtherEdits && logEdit.getCells().size() > 0) {
WALProtos.RegionEventDescriptor maybeEvent =
WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0));
if (maybeEvent != null &&
(maybeEvent.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
// In serially replication, we use scopes when reading close marker.
foundOtherEdits = true;
}
}
if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {
((WALKeyImpl) logKey).serializeReplicationScope(false);
}
}
/** /**
* Factory method to create a replication source * Factory method to create a replication source
* @param conf the configuration to use * @param conf the configuration to use

View File

@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
/**
* Used to receive new wals.
*/
@InterfaceAudience.Private
class ReplicationSourceWALActionListener implements WALActionsListener {
private final Configuration conf;
private final ReplicationSourceManager manager;
public ReplicationSourceWALActionListener(Configuration conf, ReplicationSourceManager manager) {
this.conf = conf;
this.manager = manager;
}
@Override
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
manager.preLogRoll(newPath);
}
@Override
public void postLogRoll(Path oldPath, Path newPath) throws IOException {
manager.postLogRoll(newPath);
}
@Override
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
scopeWALEdits(logKey, logEdit, conf);
}
/**
* Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
* compaction WAL edits and if the scope is local.
* @param logKey Key that may get scoped according to its edits
* @param logEdit Edits used to lookup the scopes
* @throws IOException If failed to parse the WALEdit
*/
@VisibleForTesting
static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) throws IOException {
boolean replicationForBulkLoadEnabled =
ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf);
boolean foundOtherEdits = false;
for (Cell cell : logEdit.getCells()) {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
foundOtherEdits = true;
break;
}
}
if (!foundOtherEdits && logEdit.getCells().size() > 0) {
WALProtos.RegionEventDescriptor maybeEvent =
WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0));
if (maybeEvent != null &&
(maybeEvent.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
// In serially replication, we use scopes when reading close marker.
foundOtherEdits = true;
}
}
if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {
((WALKeyImpl) logKey).serializeReplicationScope(false);
}
}
}

View File

@ -40,7 +40,6 @@ import java.util.TreeSet;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -65,7 +64,6 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@ -84,7 +82,6 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@ -100,8 +97,10 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets; import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
@ -270,23 +269,8 @@ public abstract class TestReplicationSourceManager {
WALFactory wals = WALFactory wals =
new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8")); new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8"));
ReplicationSourceManager replicationManager = replication.getReplicationManager(); ReplicationSourceManager replicationManager = replication.getReplicationManager();
wals.getWALProvider().addWALActionsListener(new WALActionsListener() { wals.getWALProvider()
.addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
@Override
public void preLogRoll(Path oldPath, Path newPath) throws IOException {
replicationManager.preLogRoll(newPath);
}
@Override
public void postLogRoll(Path oldPath, Path newPath) throws IOException {
replicationManager.postLogRoll(newPath);
}
@Override
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
replicationManager.scopeWALEdits(logKey, logEdit);
}
});
final WAL wal = wals.getWAL(hri); final WAL wal = wals.getWAL(hri);
manager.init(); manager.init();
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame")); HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame"));
@ -459,7 +443,7 @@ public abstract class TestReplicationSourceManager {
RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW) RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW)
.setEndKey(HConstants.EMPTY_END_ROW).build(); .setEndKey(HConstants.EMPTY_END_ROW).build();
WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor); WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
ReplicationSourceManager.scopeWALEdits(new WALKeyImpl(), edit, conf); ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, conf);
} }
@Test @Test
@ -471,7 +455,7 @@ public abstract class TestReplicationSourceManager {
WALKeyImpl logKey = new WALKeyImpl(scope); WALKeyImpl logKey = new WALKeyImpl(scope);
// 3. Get the scopes for the key // 3. Get the scopes for the key
ReplicationSourceManager.scopeWALEdits(logKey, logEdit, conf); ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf);
// 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled
assertNull("No bulk load entries scope should be added if bulk load replication is disabled.", assertNull("No bulk load entries scope should be added if bulk load replication is disabled.",
@ -490,7 +474,7 @@ public abstract class TestReplicationSourceManager {
bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
// 4. Get the scopes for the key // 4. Get the scopes for the key
ReplicationSourceManager.scopeWALEdits(logKey, logEdit, bulkLoadConf); ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf);
NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes(); NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
// Assert family with replication scope global is present in the key scopes // Assert family with replication scope global is present in the key scopes