Revert "HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated"

This reverts commit fec4c52499.
This commit is contained in:
Wellington Chevreuil 2019-10-11 14:45:29 +01:00
parent abc38d80e4
commit 38b06c7517
15 changed files with 35 additions and 123 deletions

View File

@ -2570,19 +2570,16 @@ public final class ProtobufUtil {
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
Map<String, Long> storeFilesSize, long bulkloadSeqId) {
return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
storeFilesSize, bulkloadSeqId, null, true);
storeFilesSize, bulkloadSeqId, null);
}
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
Map<String, Long> storeFilesSize, long bulkloadSeqId,
List<String> clusterIds, boolean replicate) {
Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) {
BulkLoadDescriptor.Builder desc =
BulkLoadDescriptor.newBuilder()
.setTableName(ProtobufUtil.toProtoTableName(tableName))
.setEncodedRegionName(encodedRegionName)
.setBulkloadSeqNum(bulkloadSeqId)
.setReplicate(replicate);
.setTableName(ProtobufUtil.toProtoTableName(tableName))
.setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
if(clusterIds != null) {
desc.addAllClusterIds(clusterIds);
}

View File

@ -568,7 +568,7 @@ public final class RequestConverter {
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken) {
return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken,
false, null, true);
false, null);
}
/**
@ -585,7 +585,7 @@ public final class RequestConverter {
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken, boolean copyFiles,
List<String> clusterIds, boolean replicate) {
List<String> clusterIds) {
RegionSpecifier region = RequestConverter.buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
@ -626,7 +626,6 @@ public final class RequestConverter {
if (clusterIds != null) {
request.addAllClusterIds(clusterIds);
}
request.setReplicate(replicate);
return request.build();
}

View File

@ -379,7 +379,6 @@ message BulkLoadHFileRequest {
optional string bulk_token = 5;
optional bool copy_file = 6 [default = false];
repeated string cluster_ids = 7;
optional bool replicate = 8 [default = true];
message FamilyPath {
required bytes family = 1;

View File

@ -151,7 +151,6 @@ message BulkLoadDescriptor {
repeated StoreDescriptor stores = 3;
required int64 bulkload_seq_num = 4;
repeated string cluster_ids = 5;
optional bool replicate = 6 [default = true];
}
/**

View File

@ -81,18 +81,17 @@ public interface AsyncClusterConnection extends AsyncConnection {
* Defined as default here to avoid breaking callers who rely on the bulkLoad version that does
* not expect additional clusterIds param.
* @param tableName the target table
* @param familyPaths hdfs path for the the table family dirs containg files to be loaded.
* @param row row key.
* @param assignSeqNum seq num for the event on WAL.
* @param userToken user token.
* @param bulkToken bulk load token.
* @param copyFiles flag for copying the loaded hfiles.
* @param familyPaths hdfs path for the the table family dirs containg files to be loaded
* @param row row key
* @param assignSeqNum seq num for the event on WAL
* @param userToken user token
* @param bulkToken bulk load token
* @param copyFiles flag for copying the loaded hfiles
* @param clusterIds list of cluster ids where the given bulk load has already been processed.
* @param replicate flags if the bulkload is targeted for replication.
*/
CompletableFuture<Boolean> bulkLoad(TableName tableName, List<Pair<byte[], String>> familyPaths,
byte[] row, boolean assignSeqNum, Token<?> userToken, String bulkToken, boolean copyFiles,
List<String> clusterIds, boolean replicate);
List<String> clusterIds);
/**
* Clean up after finishing bulk load, no matter success or not.

View File

@ -109,13 +109,13 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
@Override
public CompletableFuture<Boolean> bulkLoad(TableName tableName,
List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
String bulkToken, boolean copyFiles, List<String> clusterIds, boolean replicate) {
String bulkToken, boolean copyFiles, List<String> clusterIds) {
return callerFactory.<Boolean> single().table(tableName).row(row)
.action((controller, loc, stub) -> ConnectionUtils
.<Void, BulkLoadHFileRequest, BulkLoadHFileResponse, Boolean> call(controller, loc, stub,
null,
(rn, nil) -> RequestConverter.buildBulkLoadHFileRequest(familyPaths, rn, assignSeqNum,
userToken, bulkToken, copyFiles, clusterIds, replicate),
userToken, bulkToken, copyFiles, clusterIds),
(s, c, req, done) -> s.bulkLoadHFile(c, req, done), (c, resp) -> resp.getLoaded()))
.call();
}

View File

@ -827,9 +827,7 @@ public class PartitionedMobCompactor extends MobCompactor {
throws IOException {
// bulkload the ref file
try {
BulkLoadHFiles bulkLoader = BulkLoadHFiles.create(conf);
bulkLoader.disableReplication();
bulkLoader.bulkLoad(tableName, bulkloadDirectory);
BulkLoadHFiles.create(conf).bulkLoad(tableName, bulkloadDirectory);
} catch (Exception e) {
throw new IOException(e);
}

View File

@ -6146,8 +6146,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener) throws IOException {
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false,
null, true);
return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null);
}
/**
@ -6198,7 +6197,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
boolean assignSeqId, BulkLoadListener bulkLoadListener,
boolean copyFile, List<String> clusterIds, boolean replicate) throws IOException {
boolean copyFile, List<String> clusterIds) throws IOException {
long seqId = -1;
Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
Map<String, Long> storeFilesSizes = new HashMap<>();
@ -6373,7 +6372,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WALProtos.BulkLoadDescriptor loadDescriptor =
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
storeFiles, storeFilesSizes, seqId, clusterIds);
WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
loadDescriptor, mvcc);
} catch (IOException ioe) {

View File

@ -296,7 +296,7 @@ public class SecureBulkLoadManager {
//To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(),
clusterIds, request.getReplicate());
clusterIds);
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
}

View File

@ -203,19 +203,18 @@ public class ReplicationSink {
// Handle bulk load hfiles replication
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
if(bld.getReplicate()) {
if (bulkLoadsPerClusters == null) {
bulkLoadsPerClusters = new HashMap<>();
}
// Map of table name Vs list of pair of family and list of
// hfile paths from its namespace
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = bulkLoadsPerClusters.get(bld.getClusterIdsList());
if (bulkLoadHFileMap == null) {
bulkLoadHFileMap = new HashMap<>();
bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
}
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
if(bulkLoadsPerClusters == null) {
bulkLoadsPerClusters = new HashMap<>();
}
// Map of table name Vs list of pair of family and list of
// hfile paths from its namespace
Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
bulkLoadsPerClusters.get(bld.getClusterIdsList());
if (bulkLoadHFileMap == null) {
bulkLoadHFileMap = new HashMap<>();
bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
}
buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
} else {
// Handle wal replication
if (isNewRowOrType(previousCell, cell)) {

View File

@ -84,10 +84,6 @@ public interface BulkLoadHFiles {
Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Map<byte[], List<Path>> family2Files)
throws TableNotFoundException, IOException;
/**
* Disable replication for this bulkload, if bulkload replication is configured.
*/
void disableReplication();
/**
* Perform a bulk load of the given directory into the given pre-existing table.
* @param tableName the table to load into
@ -101,6 +97,4 @@ public interface BulkLoadHFiles {
static BulkLoadHFiles create(Configuration conf) {
return new BulkLoadHFilesTool(conf);
}
}

View File

@ -132,7 +132,6 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
private String bulkToken;
private List<String> clusterIds = new ArrayList<>();
private boolean replicate = true;
public BulkLoadHFilesTool(Configuration conf) {
// make a copy, just to be sure we're not overriding someone else's config
@ -380,8 +379,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
.collect(Collectors.toList());
CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>();
FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds,
fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate),
(loaded, error) -> {
fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds), (loaded, error) -> {
if (error != null) {
LOG.error("Encountered unrecoverable error from region server", error);
if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) &&
@ -1054,9 +1052,4 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args);
System.exit(ret);
}
@Override
public void disableReplication(){
this.replicate = false;
}
}

View File

@ -144,7 +144,7 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection {
@Override
public CompletableFuture<Boolean> bulkLoad(TableName tableName,
List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
String bulkToken, boolean copyFiles, List<String> clusterIds, boolean replicate) {
String bulkToken, boolean copyFiles, List<String> clusterIds) {
return null;
}

View File

@ -24,15 +24,10 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@ -47,7 +42,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@ -60,22 +54,14 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.mob.MobFileName;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
import org.apache.hadoop.hbase.mob.compactions.TestPartitionedMobCompactor;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
@ -151,9 +137,7 @@ public class TestBulkLoadReplication extends TestReplicationBase {
UTIL3.startMiniCluster(NUM_SLAVES1);
TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
.setMobEnabled(true)
.setMobThreshold(4000)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
@ -248,23 +232,6 @@ public class TestBulkLoadReplication extends TestReplicationBase {
assertEquals(9, BULK_LOADS_COUNT.get());
}
@Test
public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception {
Path path = createMobFiles(UTIL3);
ColumnFamilyDescriptor descriptor =
new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(famName);
PartitionedMobCompactor compactor = new PartitionedMobCompactor(UTIL3.getConfiguration(),
UTIL3.getTestFileSystem(), tableName, descriptor, Executors.newFixedThreadPool(1));
BULK_LOAD_LATCH = new CountDownLatch(1);
BULK_LOADS_COUNT.set(0);
compactor.compact(Arrays.asList(UTIL3.getTestFileSystem().listStatus(path)), true);
assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.SECONDS));
Thread.sleep(400);
assertEquals(1, BULK_LOADS_COUNT.get());
}
private void assertBulkLoadConditions(byte[] row, byte[] value,
HBaseTestingUtility utility, Table...tables) throws Exception {
BULK_LOAD_LATCH = new CountDownLatch(3);
@ -325,36 +292,6 @@ public class TestBulkLoadReplication extends TestReplicationBase {
return hFileLocation.getAbsoluteFile().getAbsolutePath();
}
private Path createMobFiles(HBaseTestingUtility util) throws IOException {
Path testDir = FSUtils.getRootDir(util.getConfiguration());
Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
Path basePath = new Path(new Path(mobTestDir, tableName.getNameAsString()), "f");
HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
MobFileName mobFileName = null;
byte[] mobFileStartRow = new byte[32];
for (byte rowKey : Bytes.toBytes("01234")) {
mobFileName = MobFileName.create(mobFileStartRow, MobUtils.formatDate(new Date()),
UUID.randomUUID().toString().replaceAll("-", ""));
StoreFileWriter mobFileWriter =
new StoreFileWriter.Builder(util.getConfiguration(),
new CacheConfig(util.getConfiguration()), util.getTestFileSystem()).withFileContext(meta)
.withFilePath(new Path(basePath, mobFileName.getFileName())).build();
long now = System.currentTimeMillis();
try {
for (int i = 0; i < 10; i++) {
byte[] key = Bytes.add(Bytes.toBytes(rowKey), Bytes.toBytes(i));
byte[] dummyData = new byte[5000];
new Random().nextBytes(dummyData);
mobFileWriter.append(
new KeyValue(key, famName, Bytes.toBytes("1"), now, KeyValue.Type.Put, dummyData));
}
} finally {
mobFileWriter.close();
}
}
return basePath;
}
public static class BulkReplicationTestObserver implements RegionCoprocessor {
String clusterName;

View File

@ -267,8 +267,7 @@ public class TestBulkLoadHFilesSplitRecovery {
private static AsyncClusterConnection mockAndInjectError(AsyncClusterConnection conn) {
AsyncClusterConnection errConn = spy(conn);
doReturn(failedFuture(new IOException("injecting bulk load error"))).when(errConn)
.bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList(),
anyBoolean());
.bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList());
return errConn;
}