HBASE-22380 break circle replication when doing bulkload (#566)
Signed-off-by: Josh Elser <elserj@apache.org>
This commit is contained in:
parent
44cdf5f4ea
commit
3ae353cbf4
|
@ -2565,13 +2565,23 @@ public final class ProtobufUtil {
|
|||
* name
|
||||
* @return The WAL log marker for bulk loads.
|
||||
*/
|
||||
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
|
||||
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
|
||||
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
|
||||
return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
|
||||
storeFilesSize, bulkloadSeqId, null);
|
||||
}
|
||||
|
||||
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
|
||||
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
|
||||
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
|
||||
Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) {
|
||||
BulkLoadDescriptor.Builder desc =
|
||||
BulkLoadDescriptor.newBuilder()
|
||||
.setTableName(ProtobufUtil.toProtoTableName(tableName))
|
||||
.setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
|
||||
if(clusterIds != null) {
|
||||
desc.addAllClusterIds(clusterIds);
|
||||
}
|
||||
|
||||
for (Map.Entry<byte[], List<Path>> entry : storeFiles.entrySet()) {
|
||||
WALProtos.StoreDescriptor.Builder builder = StoreDescriptor.newBuilder()
|
||||
|
|
|
@ -568,7 +568,7 @@ public final class RequestConverter {
|
|||
final byte[] regionName, boolean assignSeqNum,
|
||||
final Token<?> userToken, final String bulkToken) {
|
||||
return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken,
|
||||
false);
|
||||
false, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -583,9 +583,9 @@ public final class RequestConverter {
|
|||
* @return a bulk load request
|
||||
*/
|
||||
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
|
||||
final List<Pair<byte[], String>> familyPaths,
|
||||
final byte[] regionName, boolean assignSeqNum,
|
||||
final Token<?> userToken, final String bulkToken, boolean copyFiles) {
|
||||
final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
|
||||
final Token<?> userToken, final String bulkToken, boolean copyFiles,
|
||||
List<String> clusterIds) {
|
||||
RegionSpecifier region = RequestConverter.buildRegionSpecifier(
|
||||
RegionSpecifierType.REGION_NAME, regionName);
|
||||
|
||||
|
@ -623,6 +623,9 @@ public final class RequestConverter {
|
|||
request.setBulkToken(bulkToken);
|
||||
}
|
||||
request.setCopyFile(copyFiles);
|
||||
if (clusterIds != null) {
|
||||
request.addAllClusterIds(clusterIds);
|
||||
}
|
||||
return request.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -378,6 +378,7 @@ message BulkLoadHFileRequest {
|
|||
optional DelegationToken fs_token = 4;
|
||||
optional string bulk_token = 5;
|
||||
optional bool copy_file = 6 [default = false];
|
||||
repeated string cluster_ids = 7;
|
||||
|
||||
message FamilyPath {
|
||||
required bytes family = 1;
|
||||
|
|
|
@ -150,6 +150,7 @@ message BulkLoadDescriptor {
|
|||
required bytes encoded_region_name = 2;
|
||||
repeated StoreDescriptor stores = 3;
|
||||
required int64 bulkload_seq_num = 4;
|
||||
repeated string cluster_ids = 5;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -75,12 +75,38 @@ public interface AsyncClusterConnection extends AsyncConnection {
|
|||
CompletableFuture<String> prepareBulkLoad(TableName tableName);
|
||||
|
||||
/**
|
||||
* Securely bulk load a list of HFiles.
|
||||
* @param row used to locate the region
|
||||
* @deprecated Use bulkLoad(TableName tableName, List<Pair<byte[], String>> familyPaths,
|
||||
* byte[] row, boolean assignSeqNum, Token<?> userToken, String bulkToken,
|
||||
* boolean copyFiles, List<String> clusterIds)
|
||||
*/
|
||||
@Deprecated
|
||||
CompletableFuture<Boolean> bulkLoad(TableName tableName, List<Pair<byte[], String>> familyPaths,
|
||||
byte[] row, boolean assignSeqNum, Token<?> userToken, String bulkToken, boolean copyFiles);
|
||||
|
||||
/**
|
||||
* Securely bulk load a list of HFiles, passing additional list of clusters ids tracking
|
||||
* clusters where the given bulk load has already been processed
|
||||
* (important for bulk loading replication).
|
||||
*
|
||||
* Defined as default here to avoid breaking callers who rely on the bulkLoad version that
|
||||
* does not expect additional clusterIds param.
|
||||
*
|
||||
* @param tableName the target table
|
||||
* @param familyPaths hdfs path for the the table family dirs containg files to be loaded
|
||||
* @param row row key
|
||||
* @param assignSeqNum seq num for the event on WAL
|
||||
* @param userToken user token
|
||||
* @param bulkToken bulk load token
|
||||
* @param copyFiles flag for copying the loaded hfiles
|
||||
* @param clusterIds list of cluster ids where the given bulk load has already been processed.
|
||||
* @return
|
||||
*/
|
||||
default CompletableFuture<Boolean> bulkLoad(TableName tableName, List<Pair<byte[],
|
||||
String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken, String bulkToken,
|
||||
boolean copyFiles, List<String> clusterIds) {
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up after finishing bulk load, no matter success or not.
|
||||
*/
|
||||
|
|
|
@ -108,14 +108,22 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
|
|||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> bulkLoad(TableName tableName,
|
||||
List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
|
||||
String bulkToken, boolean copyFiles) {
|
||||
List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
|
||||
String bulkToken, boolean copyFiles) {
|
||||
return bulkLoad(tableName, familyPaths, row, assignSeqNum,
|
||||
userToken, bulkToken, copyFiles, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Boolean> bulkLoad(TableName tableName,
|
||||
List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
|
||||
String bulkToken, boolean copyFiles, List<String> clusterIds) {
|
||||
return callerFactory.<Boolean> single().table(tableName).row(row)
|
||||
.action((controller, loc, stub) -> ConnectionUtils
|
||||
.<Void, BulkLoadHFileRequest, BulkLoadHFileResponse, Boolean> call(controller, loc, stub,
|
||||
null,
|
||||
(rn, nil) -> RequestConverter.buildBulkLoadHFileRequest(familyPaths, rn, assignSeqNum,
|
||||
userToken, bulkToken, copyFiles),
|
||||
userToken, bulkToken, copyFiles, clusterIds),
|
||||
(s, c, req, done) -> s.bulkLoadHFile(c, req, done), (c, resp) -> resp.getLoaded()))
|
||||
.call();
|
||||
}
|
||||
|
|
|
@ -6142,7 +6142,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
*/
|
||||
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
|
||||
BulkLoadListener bulkLoadListener) throws IOException {
|
||||
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false);
|
||||
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -6187,11 +6187,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
* @param bulkLoadListener Internal hooks enabling massaging/preparation of a
|
||||
* file about to be bulk loaded
|
||||
* @param copyFile always copy hfiles if true
|
||||
* @param clusterIds ids from clusters that had already handled the given bulkload event.
|
||||
* @return Map from family to List of store file paths if successful, null if failed recoverably
|
||||
* @throws IOException if failed unrecoverably.
|
||||
*/
|
||||
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
|
||||
boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException {
|
||||
boolean assignSeqId, BulkLoadListener bulkLoadListener,
|
||||
boolean copyFile, List<String> clusterIds) throws IOException {
|
||||
long seqId = -1;
|
||||
Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||
Map<String, Long> storeFilesSizes = new HashMap<>();
|
||||
|
@ -6366,8 +6368,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
WALProtos.BulkLoadDescriptor loadDescriptor =
|
||||
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
|
||||
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
|
||||
storeFiles,
|
||||
storeFilesSizes, seqId);
|
||||
storeFiles, storeFilesSizes, seqId, clusterIds);
|
||||
WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
|
||||
loadDescriptor, mvcc);
|
||||
} catch (IOException ioe) {
|
||||
|
|
|
@ -2386,6 +2386,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
|
||||
final BulkLoadHFileRequest request) throws ServiceException {
|
||||
long start = EnvironmentEdgeManager.currentTime();
|
||||
List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
|
||||
if(clusterIds.contains(this.regionServer.clusterId)){
|
||||
return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
|
||||
} else {
|
||||
clusterIds.add(this.regionServer.clusterId);
|
||||
}
|
||||
try {
|
||||
checkOpen();
|
||||
requestCount.increment();
|
||||
|
@ -2410,7 +2416,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
}
|
||||
// secure bulk load
|
||||
Map<byte[], List<Path>> map =
|
||||
regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request);
|
||||
regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request, clusterIds);
|
||||
BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
|
||||
builder.setLoaded(map != null);
|
||||
if (map != null) {
|
||||
|
|
|
@ -214,7 +214,12 @@ public class SecureBulkLoadManager {
|
|||
}
|
||||
|
||||
public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
|
||||
final BulkLoadHFileRequest request) throws IOException {
|
||||
final BulkLoadHFileRequest request) throws IOException {
|
||||
return secureBulkLoadHFiles(region, request, null);
|
||||
}
|
||||
|
||||
public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
|
||||
final BulkLoadHFileRequest request, List<String> clusterIds) throws IOException {
|
||||
final List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
|
||||
for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
|
||||
familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath()));
|
||||
|
@ -290,7 +295,8 @@ public class SecureBulkLoadManager {
|
|||
//We call bulkLoadHFiles as requesting user
|
||||
//To enable access prior to staging
|
||||
return region.bulkLoadHFiles(familyPaths, true,
|
||||
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile());
|
||||
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(),
|
||||
clusterIds);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to complete bulk load", e);
|
||||
}
|
||||
|
|
|
@ -91,17 +91,19 @@ public class HFileReplicator {
|
|||
private ThreadPoolExecutor exec;
|
||||
private int maxCopyThreads;
|
||||
private int copiesPerThread;
|
||||
private List<String> sourceClusterIds;
|
||||
|
||||
public HFileReplicator(Configuration sourceClusterConf,
|
||||
String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
|
||||
Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf,
|
||||
AsyncClusterConnection connection) throws IOException {
|
||||
AsyncClusterConnection connection, List<String> sourceClusterIds) throws IOException {
|
||||
this.sourceClusterConf = sourceClusterConf;
|
||||
this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
|
||||
this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
|
||||
this.bulkLoadHFileMap = tableQueueMap;
|
||||
this.conf = conf;
|
||||
this.connection = connection;
|
||||
this.sourceClusterIds = sourceClusterIds;
|
||||
|
||||
userProvider = UserProvider.instantiate(conf);
|
||||
fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
|
||||
|
@ -156,6 +158,8 @@ public class HFileReplicator {
|
|||
BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf);
|
||||
// Set the staging directory which will be used by BulkLoadHFilesTool for loading the data
|
||||
loader.setBulkToken(stagingDir.toString());
|
||||
//updating list of cluster ids where this bulkload event has already been processed
|
||||
loader.setClusterIds(sourceClusterIds);
|
||||
for (int count = 0; !queue.isEmpty(); count++) {
|
||||
if (count != 0) {
|
||||
LOG.warn("Error occurred while replicating HFiles, retry attempt " + count + " with " +
|
||||
|
|
|
@ -175,9 +175,7 @@ public class ReplicationSink {
|
|||
// invocation of this method per table and cluster id.
|
||||
Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();
|
||||
|
||||
// Map of table name Vs list of pair of family and list of hfile paths from its namespace
|
||||
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
|
||||
|
||||
Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
|
||||
for (WALEntry entry : entries) {
|
||||
TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
|
||||
if (this.walEntrySinkFilter != null) {
|
||||
|
@ -204,10 +202,19 @@ public class ReplicationSink {
|
|||
Cell cell = cells.current();
|
||||
// Handle bulk load hfiles replication
|
||||
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
|
||||
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
|
||||
if(bulkLoadsPerClusters == null) {
|
||||
bulkLoadsPerClusters = new HashMap<>();
|
||||
}
|
||||
// Map of table name Vs list of pair of family and list of
|
||||
// hfile paths from its namespace
|
||||
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
|
||||
bulkLoadsPerClusters.get(bld.getClusterIdsList());
|
||||
if (bulkLoadHFileMap == null) {
|
||||
bulkLoadHFileMap = new HashMap<>();
|
||||
bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
|
||||
}
|
||||
buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell);
|
||||
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
|
||||
} else {
|
||||
// Handle wal replication
|
||||
if (isNewRowOrType(previousCell, cell)) {
|
||||
|
@ -245,14 +252,26 @@ public class ReplicationSink {
|
|||
LOG.debug("Finished replicating mutations.");
|
||||
}
|
||||
|
||||
if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
|
||||
LOG.debug("Started replicating bulk loaded data.");
|
||||
HFileReplicator hFileReplicator =
|
||||
new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
|
||||
if(bulkLoadsPerClusters != null) {
|
||||
for (Entry<List<String>, Map<String, List<Pair<byte[], List<String>>>>> entry :
|
||||
bulkLoadsPerClusters.entrySet()) {
|
||||
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue();
|
||||
if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Started replicating bulk loaded data from cluster ids: {}.",
|
||||
entry.getKey().toString());
|
||||
}
|
||||
HFileReplicator hFileReplicator =
|
||||
new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
|
||||
sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
|
||||
getConnection());
|
||||
hFileReplicator.replicate();
|
||||
LOG.debug("Finished replicating bulk loaded data.");
|
||||
getConnection(), entry.getKey());
|
||||
hFileReplicator.replicate();
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Finished replicating bulk loaded data from cluster id: {}",
|
||||
entry.getKey().toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int size = entries.size();
|
||||
|
@ -267,8 +286,7 @@ public class ReplicationSink {
|
|||
|
||||
private void buildBulkLoadHFileMap(
|
||||
final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
|
||||
Cell cell) throws IOException {
|
||||
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
|
||||
BulkLoadDescriptor bld) throws IOException {
|
||||
List<StoreDescriptor> storesList = bld.getStoresList();
|
||||
int storesSize = storesList.size();
|
||||
for (int j = 0; j < storesSize; j++) {
|
||||
|
|
|
@ -131,6 +131,8 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
|||
private final AtomicInteger numRetries = new AtomicInteger(0);
|
||||
private String bulkToken;
|
||||
|
||||
private List<String> clusterIds = new ArrayList<>();
|
||||
|
||||
public BulkLoadHFilesTool(Configuration conf) {
|
||||
// make a copy, just to be sure we're not overriding someone else's config
|
||||
super(new Configuration(conf));
|
||||
|
@ -377,7 +379,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
|||
.collect(Collectors.toList());
|
||||
CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>();
|
||||
FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds,
|
||||
fsDelegationToken.getUserToken(), bulkToken, copyFiles), (loaded, error) -> {
|
||||
fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds), (loaded, error) -> {
|
||||
if (error != null) {
|
||||
LOG.error("Encountered unrecoverable error from region server", error);
|
||||
if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) &&
|
||||
|
@ -997,6 +999,10 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
|
|||
this.bulkToken = bulkToken;
|
||||
}
|
||||
|
||||
public void setClusterIds(List<String> clusterIds) {
|
||||
this.clusterIds = clusterIds;
|
||||
}
|
||||
|
||||
private void usage() {
|
||||
System.err.println("Usage: " + "bin/hbase completebulkload [OPTIONS] "
|
||||
+ "</PATH/TO/HFILEOUTPUTFORMAT-OUTPUT> <TABLENAME>\n"
|
||||
|
|
|
@ -0,0 +1,316 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
|
||||
import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellBuilder;
|
||||
import org.apache.hadoop.hbase.CellBuilderFactory;
|
||||
import org.apache.hadoop.hbase.CellBuilderType;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileContext;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.TestReplicationBase;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
import org.junit.rules.TestName;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Integration test for bulk load replication. Defines three clusters, with the following
|
||||
* replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between
|
||||
* 2 and 3).
|
||||
*
|
||||
* For each of defined test clusters, it performs a bulk load, asserting values on bulk loaded file
|
||||
* gets replicated to other two peers. Since we are doing 3 bulk loads, with the given replication
|
||||
* topology all these bulk loads should get replicated only once on each peer. To assert this,
|
||||
* this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each of the
|
||||
* clusters. This CP counts the amount of times bulk load actually gets invoked, certifying
|
||||
* we are not entering the infinite loop condition addressed by HBASE-22380.
|
||||
*/
|
||||
@Category({ ReplicationTests.class, MediumTests.class})
|
||||
public class TestBulkLoadReplication extends TestReplicationBase {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestBulkLoadReplication.class);
|
||||
|
||||
protected static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestBulkLoadReplication.class);
|
||||
|
||||
private static final String PEER1_CLUSTER_ID = "peer1";
|
||||
private static final String PEER2_CLUSTER_ID = "peer2";
|
||||
private static final String PEER3_CLUSTER_ID = "peer3";
|
||||
|
||||
private static final String PEER_ID1 = "1";
|
||||
private static final String PEER_ID3 = "3";
|
||||
|
||||
private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0);
|
||||
private static CountDownLatch BULK_LOAD_LATCH;
|
||||
|
||||
private static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility();
|
||||
private static final Configuration CONF3 = UTIL3.getConfiguration();
|
||||
|
||||
private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
|
||||
|
||||
private static Table htable3;
|
||||
|
||||
@Rule
|
||||
public TestName name = new TestName();
|
||||
|
||||
@ClassRule
|
||||
public static TemporaryFolder testFolder = new TemporaryFolder();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID);
|
||||
setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID);
|
||||
setupBulkLoadConfigsForCluster(CONF3, PEER3_CLUSTER_ID);
|
||||
setupConfig(UTIL3, "/3");
|
||||
TestReplicationBase.setUpBeforeClass();
|
||||
startThirdCluster();
|
||||
}
|
||||
|
||||
private static void startThirdCluster() throws Exception {
|
||||
LOG.info("Setup Zk to same one from UTIL1 and UTIL2");
|
||||
UTIL3.setZkCluster(UTIL1.getZkCluster());
|
||||
UTIL3.startMiniCluster(NUM_SLAVES1);
|
||||
|
||||
TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
|
||||
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
|
||||
|
||||
Connection connection3 = ConnectionFactory.createConnection(CONF3);
|
||||
try (Admin admin3 = connection3.getAdmin()) {
|
||||
admin3.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
|
||||
}
|
||||
UTIL3.waitUntilAllRegionsAssigned(tableName);
|
||||
htable3 = connection3.getTable(tableName);
|
||||
}
|
||||
|
||||
@Before
|
||||
@Override
|
||||
public void setUpBase() throws Exception {
|
||||
//"super.setUpBase()" already sets replication from 1->2,
|
||||
//then on the subsequent lines, sets 2->1, 2->3 and 3->2.
|
||||
//So we have following topology: "1 <-> 2 <->3"
|
||||
super.setUpBase();
|
||||
ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
|
||||
ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
|
||||
ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3);
|
||||
//adds cluster1 as a remote peer on cluster2
|
||||
UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
|
||||
//adds cluster3 as a remote peer on cluster2
|
||||
UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
|
||||
//adds cluster2 as a remote peer on cluster3
|
||||
UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
|
||||
setupCoprocessor(UTIL1, "cluster1");
|
||||
setupCoprocessor(UTIL2, "cluster2");
|
||||
setupCoprocessor(UTIL3, "cluster3");
|
||||
}
|
||||
|
||||
private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) {
|
||||
return ReplicationPeerConfig.newBuilder()
|
||||
.setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build();
|
||||
}
|
||||
|
||||
private void setupCoprocessor(HBaseTestingUtility cluster, String name){
|
||||
cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
|
||||
try {
|
||||
r.getCoprocessorHost()
|
||||
.load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
|
||||
cluster.getConfiguration());
|
||||
TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost()
|
||||
.findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
|
||||
cp.clusterName = cluster.getClusterKey();
|
||||
} catch (Exception e){
|
||||
LOG.error(e.getMessage(), e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@After
|
||||
@Override
|
||||
public void tearDownBase() throws Exception {
|
||||
super.tearDownBase();
|
||||
UTIL2.getAdmin().removeReplicationPeer(PEER_ID1);
|
||||
UTIL2.getAdmin().removeReplicationPeer(PEER_ID3);
|
||||
UTIL3.getAdmin().removeReplicationPeer(PEER_ID2);
|
||||
}
|
||||
|
||||
private static void setupBulkLoadConfigsForCluster(Configuration config,
|
||||
String clusterReplicationId) throws Exception {
|
||||
config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
|
||||
config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
|
||||
File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
|
||||
File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath()
|
||||
+ "/hbase-site.xml");
|
||||
config.writeXml(new FileOutputStream(sourceConfigFile));
|
||||
config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBulkLoadReplicationActiveActive() throws Exception {
|
||||
Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
|
||||
Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
|
||||
Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
|
||||
byte[] row = Bytes.toBytes("001");
|
||||
byte[] value = Bytes.toBytes("v1");
|
||||
assertBulkLoadConditions(row, value, UTIL1, peer1TestTable, peer2TestTable, peer3TestTable);
|
||||
row = Bytes.toBytes("002");
|
||||
value = Bytes.toBytes("v2");
|
||||
assertBulkLoadConditions(row, value, UTIL2, peer1TestTable, peer2TestTable, peer3TestTable);
|
||||
row = Bytes.toBytes("003");
|
||||
value = Bytes.toBytes("v3");
|
||||
assertBulkLoadConditions(row, value, UTIL3, peer1TestTable, peer2TestTable, peer3TestTable);
|
||||
//Additional wait to make sure no extra bulk load happens
|
||||
Thread.sleep(400);
|
||||
//We have 3 bulk load events (1 initiated on each cluster).
|
||||
//Each event gets 3 counts (the originator cluster, plus the two peers),
|
||||
//so BULK_LOADS_COUNT expected value is 3 * 3 = 9.
|
||||
assertEquals(9, BULK_LOADS_COUNT.get());
|
||||
}
|
||||
|
||||
private void assertBulkLoadConditions(byte[] row, byte[] value,
|
||||
HBaseTestingUtility utility, Table...tables) throws Exception {
|
||||
BULK_LOAD_LATCH = new CountDownLatch(3);
|
||||
bulkLoadOnCluster(row, value, utility);
|
||||
assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
|
||||
assertTableHasValue(tables[0], row, value);
|
||||
assertTableHasValue(tables[1], row, value);
|
||||
assertTableHasValue(tables[2], row, value);
|
||||
}
|
||||
|
||||
private void bulkLoadOnCluster(byte[] row, byte[] value,
|
||||
HBaseTestingUtility cluster) throws Exception {
|
||||
String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration());
|
||||
copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster());
|
||||
BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration());
|
||||
bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR);
|
||||
}
|
||||
|
||||
private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception {
|
||||
Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, "f");
|
||||
cluster.getFileSystem().mkdirs(bulkLoadDir);
|
||||
cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
|
||||
}
|
||||
|
||||
private void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
|
||||
Get get = new Get(row);
|
||||
Result result = table.get(get);
|
||||
assertTrue(result.advance());
|
||||
assertEquals(Bytes.toString(value), Bytes.toString(result.value()));
|
||||
}
|
||||
|
||||
private String createHFileForFamilies(byte[] row, byte[] value,
|
||||
Configuration clusterConfig) throws IOException {
|
||||
CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
|
||||
cellBuilder.setRow(row)
|
||||
.setFamily(TestReplicationBase.famName)
|
||||
.setQualifier(Bytes.toBytes("1"))
|
||||
.setValue(value)
|
||||
.setType(Cell.Type.Put);
|
||||
|
||||
HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig);
|
||||
// TODO We need a way to do this without creating files
|
||||
File hFileLocation = testFolder.newFile();
|
||||
FSDataOutputStream out =
|
||||
new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
|
||||
try {
|
||||
hFileFactory.withOutputStream(out);
|
||||
hFileFactory.withFileContext(new HFileContext());
|
||||
HFile.Writer writer = hFileFactory.create();
|
||||
try {
|
||||
writer.append(new KeyValue(cellBuilder.build()));
|
||||
} finally {
|
||||
writer.close();
|
||||
}
|
||||
} finally {
|
||||
out.close();
|
||||
}
|
||||
return hFileLocation.getAbsoluteFile().getAbsolutePath();
|
||||
}
|
||||
|
||||
public static class BulkReplicationTestObserver implements RegionCoprocessor {
|
||||
|
||||
String clusterName;
|
||||
AtomicInteger bulkLoadCounts = new AtomicInteger();
|
||||
|
||||
@Override
|
||||
public Optional<RegionObserver> getRegionObserver() {
|
||||
return Optional.of(new RegionObserver() {
|
||||
|
||||
@Override
|
||||
public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
|
||||
List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths)
|
||||
throws IOException {
|
||||
BULK_LOAD_LATCH.countDown();
|
||||
BULK_LOADS_COUNT.incrementAndGet();
|
||||
LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName,
|
||||
bulkLoadCounts.addAndGet(1));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
|
@ -170,7 +170,7 @@ public class TestReplicationBase {
|
|||
htable1.put(puts);
|
||||
}
|
||||
|
||||
private static void setupConfig(HBaseTestingUtility util, String znodeParent) {
|
||||
protected static void setupConfig(HBaseTestingUtility util, String znodeParent) {
|
||||
Configuration conf = util.getConfiguration();
|
||||
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
|
||||
// We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
|
||||
|
|
Loading…
Reference in New Issue