HBASE-23136 (#712)

* HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated (addressing buklload replication related issue raised in HBASE-22380)

Signed-off-by: Josh Elser <elserj@apache.org>
This commit is contained in:
Wellington Ramos Chevreuil 2019-10-18 15:07:11 +01:00 committed by GitHub
parent 946f1e9e25
commit 4d414020bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 157 additions and 42 deletions

View File

@ -2570,16 +2570,19 @@ public final class ProtobufUtil {
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
storeFilesSize, bulkloadSeqId, null);
storeFilesSize, bulkloadSeqId, null, true);
}
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) {
Map<String, Long> storeFilesSize, long bulkloadSeqId,
List<String> clusterIds, boolean replicate) {
BulkLoadDescriptor.Builder desc =
BulkLoadDescriptor.newBuilder()
.setTableName(ProtobufUtil.toProtoTableName(tableName))
.setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
.setEncodedRegionName(encodedRegionName)
.setBulkloadSeqNum(bulkloadSeqId)
.setReplicate(replicate);
if(clusterIds != null) {
desc.addAllClusterIds(clusterIds);
}

View File

@ -568,7 +568,7 @@ public final class RequestConverter {
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken) {
return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken,
false, null);
false, null, true);
}
/**
@ -585,7 +585,7 @@ public final class RequestConverter {
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken, boolean copyFiles,
List<String> clusterIds) {
List<String> clusterIds, boolean replicate) {
RegionSpecifier region = RequestConverter.buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
@ -626,6 +626,7 @@ public final class RequestConverter {
if (clusterIds != null) {
request.addAllClusterIds(clusterIds);
}
request.setReplicate(replicate);
return request.build();
}

View File

@ -379,6 +379,7 @@ message BulkLoadHFileRequest {
optional string bulk_token = 5;
optional bool copy_file = 6 [default = false];
repeated string cluster_ids = 7;
optional bool replicate = 8 [default = true];
message FamilyPath {
required bytes family = 1;

View File

@ -151,6 +151,7 @@ message BulkLoadDescriptor {
repeated StoreDescriptor stores = 3;
required int64 bulkload_seq_num = 4;
repeated string cluster_ids = 5;
optional bool replicate = 6 [default = true];
}
/**

View File

@ -81,17 +81,18 @@ public interface AsyncClusterConnection extends AsyncConnection {
* Defined as default here to avoid breaking callers who rely on the bulkLoad version that does
* not expect additional clusterIds param.
* @param tableName the target table
* @param familyPaths hdfs path for the the table family dirs containg files to be loaded
* @param row row key
* @param assignSeqNum seq num for the event on WAL
* @param userToken user token
* @param bulkToken bulk load token
* @param copyFiles flag for copying the loaded hfiles
* @param familyPaths hdfs path for the the table family dirs containg files to be loaded.
* @param row row key.
* @param assignSeqNum seq num for the event on WAL.
* @param userToken user token.
* @param bulkToken bulk load token.
* @param copyFiles flag for copying the loaded hfiles.
* @param clusterIds list of cluster ids where the given bulk load has already been processed.
* @param replicate flags if the bulkload is targeted for replication.
*/
CompletableFuture<Boolean> bulkLoad(TableName tableName, List<Pair<byte[], String>> familyPaths,
byte[] row, boolean assignSeqNum, Token<?> userToken, String bulkToken, boolean copyFiles,
List<String> clusterIds);
List<String> clusterIds, boolean replicate);
/**
* Clean up after finishing bulk load, no matter success or not.

View File

@ -109,13 +109,13 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
@Override
public CompletableFuture<Boolean> bulkLoad(TableName tableName,
List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
String bulkToken, boolean copyFiles, List<String> clusterIds) {
String bulkToken, boolean copyFiles, List<String> clusterIds, boolean replicate) {
return callerFactory.<Boolean> single().table(tableName).row(row)
.action((controller, loc, stub) -> ConnectionUtils
.<Void, BulkLoadHFileRequest, BulkLoadHFileResponse, Boolean> call(controller, loc, stub,
null,
(rn, nil) -> RequestConverter.buildBulkLoadHFileRequest(familyPaths, rn, assignSeqNum,
userToken, bulkToken, copyFiles, clusterIds),
userToken, bulkToken, copyFiles, clusterIds, replicate),
(s, c, req, done) -> s.bulkLoadHFile(c, req, done), (c, resp) -> resp.getLoaded()))
.call();
}

View File

@ -827,7 +827,9 @@ public class PartitionedMobCompactor extends MobCompactor {
throws IOException {
// bulkload the ref file
try {
BulkLoadHFiles.create(conf).bulkLoad(tableName, bulkloadDirectory);
BulkLoadHFiles bulkLoader = BulkLoadHFiles.create(conf);
bulkLoader.disableReplication();
bulkLoader.bulkLoad(tableName, bulkloadDirectory);
} catch (Exception e) {
throw new IOException(e);
}

View File

@ -6146,7 +6146,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener) throws IOException {
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null);
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false,
null, true);
}
/**
@ -6197,7 +6198,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
boolean assignSeqId, BulkLoadListener bulkLoadListener,
boolean copyFile, List<String> clusterIds) throws IOException {
boolean copyFile, List<String> clusterIds, boolean replicate) throws IOException {
long seqId = -1;
Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
Map<String, Long> storeFilesSizes = new HashMap<>();
@ -6372,7 +6373,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WALProtos.BulkLoadDescriptor loadDescriptor =
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
storeFiles, storeFilesSizes, seqId, clusterIds);
storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
loadDescriptor, mvcc);
} catch (IOException ioe) {

View File

@ -296,7 +296,7 @@ public class SecureBulkLoadManager {
//To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(),
clusterIds);
clusterIds, request.getReplicate());
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
}

View File

@ -203,7 +203,8 @@ public class ReplicationSink {
// Handle bulk load hfiles replication
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
if(bulkLoadsPerClusters == null) {
if(bld.getReplicate()) {
if (bulkLoadsPerClusters == null) {
bulkLoadsPerClusters = new HashMap<>();
}
// Map of table name Vs list of pair of family and list of
@ -215,6 +216,7 @@ public class ReplicationSink {
bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
}
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
}
} else {
// Handle wal replication
if (isNewRowOrType(previousCell, cell)) {

View File

@ -84,6 +84,18 @@ public interface BulkLoadHFiles {
Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Map<byte[], List<Path>> family2Files)
throws TableNotFoundException, IOException;
/**
* Disables replication for all bulkloads done via this instance,
* when bulkload replication is configured.
*/
void disableReplication();
/**
*
* @return true if replication has been disabled.
*/
boolean isReplicationDisabled();
/**
* Perform a bulk load of the given directory into the given pre-existing table.
* @param tableName the table to load into
@ -97,4 +109,6 @@ public interface BulkLoadHFiles {
static BulkLoadHFiles create(Configuration conf) {
return new BulkLoadHFilesTool(conf);
}
}

View File

@ -132,6 +132,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
private String bulkToken;
private List<String> clusterIds = new ArrayList<>();
private boolean replicate = true;
public BulkLoadHFilesTool(Configuration conf) {
// make a copy, just to be sure we're not overriding someone else's config
@ -379,7 +380,8 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
.collect(Collectors.toList());
CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>();
FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds,
fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds), (loaded, error) -> {
fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate),
(loaded, error) -> {
if (error != null) {
LOG.error("Encountered unrecoverable error from region server", error);
if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) &&
@ -1052,4 +1054,14 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args);
System.exit(ret);
}
@Override
public void disableReplication(){
this.replicate = false;
}
@Override
public boolean isReplicationDisabled(){
return !this.replicate;
}
}

View File

@ -144,7 +144,7 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection {
@Override
public CompletableFuture<Boolean> bulkLoad(TableName tableName,
List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
String bulkToken, boolean copyFiles, List<String> clusterIds) {
String bulkToken, boolean copyFiles, List<String> clusterIds, boolean replicate) {
return null;
}

View File

@ -24,10 +24,16 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@ -42,6 +48,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@ -54,14 +61,21 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
@ -105,7 +119,7 @@ public class TestBulkLoadReplication extends TestReplicationBase {
private static final String PEER_ID1 = "1";
private static final String PEER_ID3 = "3";
private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0);
private static AtomicInteger BULK_LOADS_COUNT;
private static CountDownLatch BULK_LOAD_LATCH;
private static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility();
@ -137,7 +151,9 @@ public class TestBulkLoadReplication extends TestReplicationBase {
UTIL3.startMiniCluster(NUM_SLAVES1);
TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
.setMobEnabled(true)
.setMobThreshold(4000)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
@ -168,6 +184,7 @@ public class TestBulkLoadReplication extends TestReplicationBase {
setupCoprocessor(UTIL1, "cluster1");
setupCoprocessor(UTIL2, "cluster2");
setupCoprocessor(UTIL3, "cluster3");
BULK_LOADS_COUNT = new AtomicInteger(0);
}
private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) {
@ -178,12 +195,16 @@ public class TestBulkLoadReplication extends TestReplicationBase {
private void setupCoprocessor(HBaseTestingUtility cluster, String name){
cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
try {
r.getCoprocessorHost()
.load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost().
findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
if(cp == null) {
r.getCoprocessorHost().
load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
cluster.getConfiguration());
TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost()
.findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
cp = r.getCoprocessorHost().
findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
cp.clusterName = cluster.getClusterKey();
}
} catch (Exception e){
LOG.error(e.getMessage(), e);
}
@ -232,6 +253,31 @@ public class TestBulkLoadReplication extends TestReplicationBase {
assertEquals(9, BULK_LOADS_COUNT.get());
}
@Test
public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception {
Path path = createMobFiles(UTIL3);
ColumnFamilyDescriptor descriptor =
new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(famName);
ExecutorService pool = null;
try {
pool = Executors.newFixedThreadPool(1);
PartitionedMobCompactor compactor =
new PartitionedMobCompactor(UTIL3.getConfiguration(), UTIL3.getTestFileSystem(), tableName,
descriptor, pool);
BULK_LOAD_LATCH = new CountDownLatch(1);
BULK_LOADS_COUNT.set(0);
compactor.compact(Arrays.asList(UTIL3.getTestFileSystem().listStatus(path)), true);
assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.SECONDS));
Thread.sleep(400);
assertEquals(1, BULK_LOADS_COUNT.get());
} finally {
if(pool != null && !pool.isTerminated()) {
pool.shutdownNow();
}
}
}
private void assertBulkLoadConditions(byte[] row, byte[] value,
HBaseTestingUtility utility, Table...tables) throws Exception {
BULK_LOAD_LATCH = new CountDownLatch(3);
@ -292,6 +338,36 @@ public class TestBulkLoadReplication extends TestReplicationBase {
return hFileLocation.getAbsoluteFile().getAbsolutePath();
}
private Path createMobFiles(HBaseTestingUtility util) throws IOException {
Path testDir = FSUtils.getRootDir(util.getConfiguration());
Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
Path basePath = new Path(new Path(mobTestDir, tableName.getNameAsString()), "f");
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
MobFileName mobFileName = null;
byte[] mobFileStartRow = new byte[32];
for (byte rowKey : Bytes.toBytes("01234")) {
mobFileName = MobFileName.create(mobFileStartRow, MobUtils.formatDate(new Date()),
UUID.randomUUID().toString().replaceAll("-", ""));
StoreFileWriter mobFileWriter =
new StoreFileWriter.Builder(util.getConfiguration(),
new CacheConfig(util.getConfiguration()), util.getTestFileSystem()).withFileContext(meta)
.withFilePath(new Path(basePath, mobFileName.getFileName())).build();
long now = System.currentTimeMillis();
try {
for (int i = 0; i < 10; i++) {
byte[] key = Bytes.add(Bytes.toBytes(rowKey), Bytes.toBytes(i));
byte[] dummyData = new byte[5000];
new Random().nextBytes(dummyData);
mobFileWriter.append(
new KeyValue(key, famName, Bytes.toBytes("1"), now, KeyValue.Type.Put, dummyData));
}
} finally {
mobFileWriter.close();
}
}
return basePath;
}
public static class BulkReplicationTestObserver implements RegionCoprocessor {
String clusterName;

View File

@ -267,7 +267,8 @@ public class TestBulkLoadHFilesSplitRecovery {
private static AsyncClusterConnection mockAndInjectError(AsyncClusterConnection conn) {
AsyncClusterConnection errConn = spy(conn);
doReturn(failedFuture(new IOException("injecting bulk load error"))).when(errConn)
.bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList());
.bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList(),
anyBoolean());
return errConn;
}