From 38b06c75170830f3f92eac3f03b3870e22d2c9b0 Mon Sep 17 00:00:00 2001 From: Wellington Chevreuil Date: Fri, 11 Oct 2019 14:45:29 +0100 Subject: [PATCH] Revert "HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated" This reverts commit fec4c5249968e112b5ab6f4154d0e6c1fe428abc. --- .../hbase/shaded/protobuf/ProtobufUtil.java | 11 ++-- .../shaded/protobuf/RequestConverter.java | 5 +- .../src/main/protobuf/Client.proto | 1 - .../src/main/protobuf/WAL.proto | 1 - .../hbase/client/AsyncClusterConnection.java | 15 ++--- .../client/AsyncClusterConnectionImpl.java | 4 +- .../compactions/PartitionedMobCompactor.java | 4 +- .../hadoop/hbase/regionserver/HRegion.java | 7 +- .../regionserver/SecureBulkLoadManager.java | 2 +- .../regionserver/ReplicationSink.java | 23 ++++--- .../hadoop/hbase/tool/BulkLoadHFiles.java | 6 -- .../hadoop/hbase/tool/BulkLoadHFilesTool.java | 9 +-- .../client/DummyAsyncClusterConnection.java | 2 +- .../regionserver/TestBulkLoadReplication.java | 65 +------------------ .../tool/TestBulkLoadHFilesSplitRecovery.java | 3 +- 15 files changed, 35 insertions(+), 123 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 81082174bc0..52e3bf1bfbe 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2570,19 +2570,16 @@ public final class ProtobufUtil { ByteString encodedRegionName, Map> storeFiles, Map storeFilesSize, long bulkloadSeqId) { return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles, - storeFilesSize, bulkloadSeqId, null, true); + storeFilesSize, bulkloadSeqId, null); } public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName, ByteString encodedRegionName, Map> storeFiles, - Map storeFilesSize, long bulkloadSeqId, - List clusterIds, boolean replicate) { + Map storeFilesSize, long bulkloadSeqId, List clusterIds) { BulkLoadDescriptor.Builder desc = BulkLoadDescriptor.newBuilder() - .setTableName(ProtobufUtil.toProtoTableName(tableName)) - .setEncodedRegionName(encodedRegionName) - .setBulkloadSeqNum(bulkloadSeqId) - .setReplicate(replicate); + .setTableName(ProtobufUtil.toProtoTableName(tableName)) + .setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId); if(clusterIds != null) { desc.addAllClusterIds(clusterIds); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index d45423c95b0..ae3cd3f746b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -568,7 +568,7 @@ public final class RequestConverter { final byte[] regionName, boolean assignSeqNum, final Token userToken, final String bulkToken) { return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken, - false, null, true); + false, null); } /** @@ -585,7 +585,7 @@ public final class RequestConverter { public static BulkLoadHFileRequest buildBulkLoadHFileRequest( final List> familyPaths, final byte[] regionName, boolean assignSeqNum, final Token userToken, final String bulkToken, boolean copyFiles, - List clusterIds, boolean replicate) { + List clusterIds) { RegionSpecifier region = RequestConverter.buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); @@ -626,7 +626,6 @@ public final class RequestConverter { if (clusterIds != null) { request.addAllClusterIds(clusterIds); } - request.setReplicate(replicate); return request.build(); } diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto index a22c6237bc7..07d8d711a0a 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto @@ -379,7 +379,6 @@ message BulkLoadHFileRequest { optional string bulk_token = 5; optional bool copy_file = 6 [default = false]; repeated string cluster_ids = 7; - optional bool replicate = 8 [default = true]; message FamilyPath { required bytes family = 1; diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/WAL.proto index fd622cfc5ba..c103075c447 100644 --- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto +++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto @@ -151,7 +151,6 @@ message BulkLoadDescriptor { repeated StoreDescriptor stores = 3; required int64 bulkload_seq_num = 4; repeated string cluster_ids = 5; - optional bool replicate = 6 [default = true]; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java index 92118ac444f..5c57817f5a7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java @@ -81,18 +81,17 @@ public interface AsyncClusterConnection extends AsyncConnection { * Defined as default here to avoid breaking callers who rely on the bulkLoad version that does * not expect additional clusterIds param. * @param tableName the target table - * @param familyPaths hdfs path for the the table family dirs containg files to be loaded. - * @param row row key. - * @param assignSeqNum seq num for the event on WAL. - * @param userToken user token. - * @param bulkToken bulk load token. - * @param copyFiles flag for copying the loaded hfiles. + * @param familyPaths hdfs path for the the table family dirs containg files to be loaded + * @param row row key + * @param assignSeqNum seq num for the event on WAL + * @param userToken user token + * @param bulkToken bulk load token + * @param copyFiles flag for copying the loaded hfiles * @param clusterIds list of cluster ids where the given bulk load has already been processed. - * @param replicate flags if the bulkload is targeted for replication. */ CompletableFuture bulkLoad(TableName tableName, List> familyPaths, byte[] row, boolean assignSeqNum, Token userToken, String bulkToken, boolean copyFiles, - List clusterIds, boolean replicate); + List clusterIds); /** * Clean up after finishing bulk load, no matter success or not. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java index 046ef41ca0a..746c3b81f07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java @@ -109,13 +109,13 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu @Override public CompletableFuture bulkLoad(TableName tableName, List> familyPaths, byte[] row, boolean assignSeqNum, Token userToken, - String bulkToken, boolean copyFiles, List clusterIds, boolean replicate) { + String bulkToken, boolean copyFiles, List clusterIds) { return callerFactory. single().table(tableName).row(row) .action((controller, loc, stub) -> ConnectionUtils . call(controller, loc, stub, null, (rn, nil) -> RequestConverter.buildBulkLoadHFileRequest(familyPaths, rn, assignSeqNum, - userToken, bulkToken, copyFiles, clusterIds, replicate), + userToken, bulkToken, copyFiles, clusterIds), (s, c, req, done) -> s.bulkLoadHFile(c, req, done), (c, resp) -> resp.getLoaded())) .call(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java index dba591d6ad2..a5823ec7d7b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java @@ -827,9 +827,7 @@ public class PartitionedMobCompactor extends MobCompactor { throws IOException { // bulkload the ref file try { - BulkLoadHFiles bulkLoader = BulkLoadHFiles.create(conf); - bulkLoader.disableReplication(); - bulkLoader.bulkLoad(tableName, bulkloadDirectory); + BulkLoadHFiles.create(conf).bulkLoad(tableName, bulkloadDirectory); } catch (Exception e) { throw new IOException(e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index ccfc69d7044..fd8235d6529 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6146,8 +6146,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ public Map> bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener) throws IOException { - return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, - null, true); + return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null); } /** @@ -6198,7 +6197,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ public Map> bulkLoadHFiles(Collection> familyPaths, boolean assignSeqId, BulkLoadListener bulkLoadListener, - boolean copyFile, List clusterIds, boolean replicate) throws IOException { + boolean copyFile, List clusterIds) throws IOException { long seqId = -1; Map> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR); Map storeFilesSizes = new HashMap<>(); @@ -6373,7 +6372,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi WALProtos.BulkLoadDescriptor loadDescriptor = ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(), UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()), - storeFiles, storeFilesSizes, seqId, clusterIds, replicate); + storeFiles, storeFilesSizes, seqId, clusterIds); WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(), loadDescriptor, mvcc); } catch (IOException ioe) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java index bccc8fed459..ad19473c7ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -296,7 +296,7 @@ public class SecureBulkLoadManager { //To enable access prior to staging return region.bulkLoadHFiles(familyPaths, true, new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(), - clusterIds, request.getReplicate()); + clusterIds); } catch (Exception e) { LOG.error("Failed to complete bulk load", e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java index 51cbea8147b..6c3f70ca478 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java @@ -203,19 +203,18 @@ public class ReplicationSink { // Handle bulk load hfiles replication if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) { BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell); - if(bld.getReplicate()) { - if (bulkLoadsPerClusters == null) { - bulkLoadsPerClusters = new HashMap<>(); - } - // Map of table name Vs list of pair of family and list of - // hfile paths from its namespace - Map>>> bulkLoadHFileMap = bulkLoadsPerClusters.get(bld.getClusterIdsList()); - if (bulkLoadHFileMap == null) { - bulkLoadHFileMap = new HashMap<>(); - bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap); - } - buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); + if(bulkLoadsPerClusters == null) { + bulkLoadsPerClusters = new HashMap<>(); } + // Map of table name Vs list of pair of family and list of + // hfile paths from its namespace + Map>>> bulkLoadHFileMap = + bulkLoadsPerClusters.get(bld.getClusterIdsList()); + if (bulkLoadHFileMap == null) { + bulkLoadHFileMap = new HashMap<>(); + bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap); + } + buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld); } else { // Handle wal replication if (isNewRowOrType(previousCell, cell)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java index 1cffe05aea3..f3d627ab2b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java @@ -84,10 +84,6 @@ public interface BulkLoadHFiles { Map bulkLoad(TableName tableName, Map> family2Files) throws TableNotFoundException, IOException; - /** - * Disable replication for this bulkload, if bulkload replication is configured. - */ - void disableReplication(); /** * Perform a bulk load of the given directory into the given pre-existing table. * @param tableName the table to load into @@ -101,6 +97,4 @@ public interface BulkLoadHFiles { static BulkLoadHFiles create(Configuration conf) { return new BulkLoadHFilesTool(conf); } - - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java index 294d94bb0fa..0e2e029fe96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java @@ -132,7 +132,6 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To private String bulkToken; private List clusterIds = new ArrayList<>(); - private boolean replicate = true; public BulkLoadHFilesTool(Configuration conf) { // make a copy, just to be sure we're not overriding someone else's config @@ -380,8 +379,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To .collect(Collectors.toList()); CompletableFuture> future = new CompletableFuture<>(); FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds, - fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate), - (loaded, error) -> { + fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds), (loaded, error) -> { if (error != null) { LOG.error("Encountered unrecoverable error from region server", error); if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) && @@ -1054,9 +1052,4 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args); System.exit(ret); } - - @Override - public void disableReplication(){ - this.replicate = false; - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java index 87557499f7e..5a344578c9c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java @@ -144,7 +144,7 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection { @Override public CompletableFuture bulkLoad(TableName tableName, List> familyPaths, byte[] row, boolean assignSeqNum, Token userToken, - String bulkToken, boolean copyFiles, List clusterIds, boolean replicate) { + String bulkToken, boolean copyFiles, List clusterIds) { return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java index b2274039d31..7a49989776e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java @@ -24,15 +24,10 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; -import java.util.Arrays; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Random; -import java.util.UUID; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; @@ -47,7 +42,6 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -60,22 +54,14 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; -import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; -import org.apache.hadoop.hbase.mob.MobConstants; -import org.apache.hadoop.hbase.mob.MobFileName; -import org.apache.hadoop.hbase.mob.MobUtils; -import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor; -import org.apache.hadoop.hbase.mob.compactions.TestPartitionedMobCompactor; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.TestReplicationBase; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; @@ -151,9 +137,7 @@ public class TestBulkLoadReplication extends TestReplicationBase { UTIL3.startMiniCluster(NUM_SLAVES1); TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) - .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName) - .setMobEnabled(true) - .setMobThreshold(4000) + .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); @@ -248,23 +232,6 @@ public class TestBulkLoadReplication extends TestReplicationBase { assertEquals(9, BULK_LOADS_COUNT.get()); } - @Test - public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception { - Path path = createMobFiles(UTIL3); - ColumnFamilyDescriptor descriptor = - new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(famName); - PartitionedMobCompactor compactor = new PartitionedMobCompactor(UTIL3.getConfiguration(), - UTIL3.getTestFileSystem(), tableName, descriptor, Executors.newFixedThreadPool(1)); - BULK_LOAD_LATCH = new CountDownLatch(1); - BULK_LOADS_COUNT.set(0); - compactor.compact(Arrays.asList(UTIL3.getTestFileSystem().listStatus(path)), true); - assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.SECONDS)); - Thread.sleep(400); - assertEquals(1, BULK_LOADS_COUNT.get()); - - } - - private void assertBulkLoadConditions(byte[] row, byte[] value, HBaseTestingUtility utility, Table...tables) throws Exception { BULK_LOAD_LATCH = new CountDownLatch(3); @@ -325,36 +292,6 @@ public class TestBulkLoadReplication extends TestReplicationBase { return hFileLocation.getAbsoluteFile().getAbsolutePath(); } - private Path createMobFiles(HBaseTestingUtility util) throws IOException { - Path testDir = FSUtils.getRootDir(util.getConfiguration()); - Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME); - Path basePath = new Path(new Path(mobTestDir, tableName.getNameAsString()), "f"); - HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build(); - MobFileName mobFileName = null; - byte[] mobFileStartRow = new byte[32]; - for (byte rowKey : Bytes.toBytes("01234")) { - mobFileName = MobFileName.create(mobFileStartRow, MobUtils.formatDate(new Date()), - UUID.randomUUID().toString().replaceAll("-", "")); - StoreFileWriter mobFileWriter = - new StoreFileWriter.Builder(util.getConfiguration(), - new CacheConfig(util.getConfiguration()), util.getTestFileSystem()).withFileContext(meta) - .withFilePath(new Path(basePath, mobFileName.getFileName())).build(); - long now = System.currentTimeMillis(); - try { - for (int i = 0; i < 10; i++) { - byte[] key = Bytes.add(Bytes.toBytes(rowKey), Bytes.toBytes(i)); - byte[] dummyData = new byte[5000]; - new Random().nextBytes(dummyData); - mobFileWriter.append( - new KeyValue(key, famName, Bytes.toBytes("1"), now, KeyValue.Type.Put, dummyData)); - } - } finally { - mobFileWriter.close(); - } - } - return basePath; - } - public static class BulkReplicationTestObserver implements RegionCoprocessor { String clusterName; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java index b626fe85921..1326564b543 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java @@ -267,8 +267,7 @@ public class TestBulkLoadHFilesSplitRecovery { private static AsyncClusterConnection mockAndInjectError(AsyncClusterConnection conn) { AsyncClusterConnection errConn = spy(conn); doReturn(failedFuture(new IOException("injecting bulk load error"))).when(errConn) - .bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList(), - anyBoolean()); + .bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList()); return errConn; }