HBASE-22939 SpaceQuotas - Bulkload from different hdfs failed when space quotas are turned on. (#4750)
Signed-off-by: Sakthi <sakthi@apache.org>
This commit is contained in:
parent
fd27d68b18
commit
acc657aa59
|
@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
|
import org.apache.hadoop.hbase.quotas.QuotaUtil;
|
||||||
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
|
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
|
@ -63,6 +64,7 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
|
||||||
protected void customizeClusterConf(Configuration conf) {
|
protected void customizeClusterConf(Configuration conf) {
|
||||||
conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
|
conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
|
||||||
conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
|
conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
|
||||||
|
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
|
||||||
conf.set("hbase.replication.source.fs.conf.provider",
|
conf.set("hbase.replication.source.fs.conf.provider",
|
||||||
TestSourceFSConfigurationProvider.class.getCanonicalName());
|
TestSourceFSConfigurationProvider.class.getCanonicalName());
|
||||||
String classes = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
|
String classes = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
|
||||||
|
@ -81,11 +83,11 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
|
||||||
setupReplication();
|
setupReplication();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Prepare 16 random hfile ranges required for creating hfiles
|
* Prepare 24 random hfile ranges required for creating hfiles
|
||||||
*/
|
*/
|
||||||
Iterator<String> randomHFileRangeListIterator = null;
|
Iterator<String> randomHFileRangeListIterator = null;
|
||||||
Set<String> randomHFileRanges = new HashSet<>(16);
|
Set<String> randomHFileRanges = new HashSet<>(24);
|
||||||
for (int i = 0; i < 16; i++) {
|
for (int i = 0; i < 24; i++) {
|
||||||
randomHFileRanges.add(UTIL1.getRandomUUID().toString());
|
randomHFileRanges.add(UTIL1.getRandomUUID().toString());
|
||||||
}
|
}
|
||||||
List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
|
List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
|
||||||
|
@ -93,8 +95,9 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
|
||||||
randomHFileRangeListIterator = randomHFileRangeList.iterator();
|
randomHFileRangeListIterator = randomHFileRangeList.iterator();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows
|
* at Master: t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3 rows
|
||||||
* into cf1, and 3 rows into norep verify correctly replicated to slave
|
* into norep t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1, and 3
|
||||||
|
* rows into norep verify correctly replicated to slave
|
||||||
*/
|
*/
|
||||||
loadAndReplicateHFiles(true, randomHFileRangeListIterator);
|
loadAndReplicateHFiles(true, randomHFileRangeListIterator);
|
||||||
|
|
||||||
|
@ -175,11 +178,17 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
|
||||||
Iterator<String> randomHFileRangeListIterator) throws Exception {
|
Iterator<String> randomHFileRangeListIterator) throws Exception {
|
||||||
LOG.debug("loadAndReplicateHFiles");
|
LOG.debug("loadAndReplicateHFiles");
|
||||||
|
|
||||||
// Load 100 + 3 hfiles to t1_syncup.
|
// Load 50 + 50 + 3 hfiles to t1_syncup.
|
||||||
byte[][][] hfileRanges =
|
byte[][][] hfileRanges =
|
||||||
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
|
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
|
||||||
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
|
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
|
||||||
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 100);
|
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 50);
|
||||||
|
|
||||||
|
hfileRanges =
|
||||||
|
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
|
||||||
|
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
|
||||||
|
loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source,
|
||||||
|
hfileRanges, 50);
|
||||||
|
|
||||||
hfileRanges =
|
hfileRanges =
|
||||||
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
|
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
|
||||||
|
@ -187,11 +196,17 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
|
||||||
loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht1Source,
|
loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht1Source,
|
||||||
hfileRanges, 3);
|
hfileRanges, 3);
|
||||||
|
|
||||||
// Load 200 + 3 hfiles to t2_syncup.
|
// Load 100 + 100 + 3 hfiles to t2_syncup.
|
||||||
hfileRanges =
|
hfileRanges =
|
||||||
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
|
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
|
||||||
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
|
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
|
||||||
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 200);
|
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 100);
|
||||||
|
|
||||||
|
hfileRanges =
|
||||||
|
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
|
||||||
|
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
|
||||||
|
loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source,
|
||||||
|
hfileRanges, 100);
|
||||||
|
|
||||||
hfileRanges =
|
hfileRanges =
|
||||||
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
|
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
|
||||||
|
@ -229,6 +244,26 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
|
||||||
loader.bulkLoad(tableName, dir);
|
loader.bulkLoad(tableName, dir);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void loadFromOtherHDFSAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
|
||||||
|
Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
|
||||||
|
Path dir = UTIL2.getDataTestDirOnTestFS(testName);
|
||||||
|
FileSystem fs = UTIL2.getTestFileSystem();
|
||||||
|
dir = dir.makeQualified(fs);
|
||||||
|
Path familyDir = new Path(dir, Bytes.toString(fam));
|
||||||
|
|
||||||
|
int hfileIdx = 0;
|
||||||
|
for (byte[][] range : hfileRanges) {
|
||||||
|
byte[] from = range[0];
|
||||||
|
byte[] to = range[1];
|
||||||
|
HFileTestUtil.createHFile(UTIL2.getConfiguration(), fs,
|
||||||
|
new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
final TableName tableName = source.getName();
|
||||||
|
BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
|
||||||
|
loader.bulkLoad(tableName, dir);
|
||||||
|
}
|
||||||
|
|
||||||
private void wait(Table target, int expectedCount, String msg)
|
private void wait(Table target, int expectedCount, String msg)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
for (int i = 0; i < NB_RETRIES; i++) {
|
for (int i = 0; i < NB_RETRIES; i++) {
|
||||||
|
|
|
@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
import org.apache.commons.lang3.mutable.MutableObject;
|
import org.apache.commons.lang3.mutable.MutableObject;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.ByteBufferExtendedCell;
|
import org.apache.hadoop.hbase.ByteBufferExtendedCell;
|
||||||
import org.apache.hadoop.hbase.CacheEvictionStats;
|
import org.apache.hadoop.hbase.CacheEvictionStats;
|
||||||
|
@ -2376,7 +2377,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
||||||
filePaths.add(familyPath.getPath());
|
filePaths.add(familyPath.getPath());
|
||||||
}
|
}
|
||||||
// Check if the batch of files exceeds the current quota
|
// Check if the batch of files exceeds the current quota
|
||||||
sizeToBeLoaded = enforcement.computeBulkLoadSize(regionServer.getFileSystem(), filePaths);
|
sizeToBeLoaded = enforcement.computeBulkLoadSize(getFileSystem(filePaths), filePaths);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2481,6 +2482,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private FileSystem getFileSystem(List<String> filePaths) throws IOException {
|
||||||
|
if (filePaths.isEmpty()) {
|
||||||
|
// local hdfs
|
||||||
|
return regionServer.getFileSystem();
|
||||||
|
}
|
||||||
|
// source hdfs
|
||||||
|
return new Path(filePaths.get(0)).getFileSystem(regionServer.getConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
private com.google.protobuf.Message execServiceOnRegion(HRegion region,
|
private com.google.protobuf.Message execServiceOnRegion(HRegion region,
|
||||||
final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
|
final ClientProtos.CoprocessorServiceCall serviceCall) throws IOException {
|
||||||
// ignore the passed in controller (from the serialized call)
|
// ignore the passed in controller (from the serialized call)
|
||||||
|
|
Loading…
Reference in New Issue