HBASE-20293 get_splits returns duplicate split points when region replication is on
Signed-off-by: Ted Yu <yuzhihong@gmail.com> Signed-off-by: Huaxiang Sun <hsun@apache.org> Signed-off-by: Sean Busbey <busbey@apache.org>
This commit is contained in:
parent
af172e0604
commit
59d9e0f407
|
@ -20,6 +20,8 @@
|
||||||
include Java
|
include Java
|
||||||
|
|
||||||
java_import org.apache.hadoop.hbase.util.Bytes
|
java_import org.apache.hadoop.hbase.util.Bytes
|
||||||
|
java_import org.apache.hadoop.hbase.client.RegionReplicaUtil
|
||||||
|
java_import org.apache.hadoop.hbase.client.Scan
|
||||||
|
|
||||||
# Wrapper for org.apache.hadoop.hbase.client.Table
|
# Wrapper for org.apache.hadoop.hbase.client.Table
|
||||||
|
|
||||||
|
@ -48,8 +50,9 @@ module Hbase
|
||||||
method = name.to_sym
|
method = name.to_sym
|
||||||
self.class_eval do
|
self.class_eval do
|
||||||
define_method method do |*args|
|
define_method method do |*args|
|
||||||
@shell.internal_command(shell_command, internal_method_name, self, *args)
|
@shell.internal_command(shell_command, internal_method_name, self,
|
||||||
end
|
*args)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -143,7 +146,7 @@ EOF
|
||||||
end
|
end
|
||||||
#Case where attributes are specified without timestamp
|
#Case where attributes are specified without timestamp
|
||||||
if timestamp.kind_of?(Hash)
|
if timestamp.kind_of?(Hash)
|
||||||
timestamp.each do |k, v|
|
timestamp.each do |k, v|
|
||||||
if k == 'ATTRIBUTES'
|
if k == 'ATTRIBUTES'
|
||||||
set_attributes(p, v)
|
set_attributes(p, v)
|
||||||
elsif k == 'VISIBILITY'
|
elsif k == 'VISIBILITY'
|
||||||
|
@ -185,12 +188,12 @@ EOF
|
||||||
timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP
|
timestamp = org.apache.hadoop.hbase.HConstants::LATEST_TIMESTAMP
|
||||||
end
|
end
|
||||||
d = org.apache.hadoop.hbase.client.Delete.new(row.to_s.to_java_bytes, timestamp)
|
d = org.apache.hadoop.hbase.client.Delete.new(row.to_s.to_java_bytes, timestamp)
|
||||||
if temptimestamp.kind_of?(Hash)
|
if temptimestamp.is_a?(Hash)
|
||||||
temptimestamp.each do |k, v|
|
temptimestamp.each do |_, v|
|
||||||
if v.kind_of?(String)
|
if v.is_a?(String)
|
||||||
set_cell_visibility(d, v) if v
|
set_cell_visibility(d, v) if v
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
if args.any?
|
if args.any?
|
||||||
visibility = args[VISIBILITY]
|
visibility = args[VISIBILITY]
|
||||||
|
@ -262,9 +265,11 @@ EOF
|
||||||
|
|
||||||
#----------------------------------------------------------------------------------------------
|
#----------------------------------------------------------------------------------------------
|
||||||
# Count rows in a table
|
# Count rows in a table
|
||||||
|
|
||||||
|
# rubocop:disable Metrics/AbcSize
|
||||||
def _count_internal(interval = 1000, caching_rows = 10)
|
def _count_internal(interval = 1000, caching_rows = 10)
|
||||||
# We can safely set scanner caching with the first key only filter
|
# We can safely set scanner caching with the first key only filter
|
||||||
scan = org.apache.hadoop.hbase.client.Scan.new
|
scan = Scan.new
|
||||||
scan.setCacheBlocks(false)
|
scan.setCacheBlocks(false)
|
||||||
scan.setCaching(caching_rows)
|
scan.setCaching(caching_rows)
|
||||||
scan.setFilter(org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter.new)
|
scan.setFilter(org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter.new)
|
||||||
|
@ -288,6 +293,7 @@ EOF
|
||||||
# Return the counter
|
# Return the counter
|
||||||
return count
|
return count
|
||||||
end
|
end
|
||||||
|
# rubocop:enable Metrics/AbcSize
|
||||||
|
|
||||||
#----------------------------------------------------------------------------------------------
|
#----------------------------------------------------------------------------------------------
|
||||||
# Get from table
|
# Get from table
|
||||||
|
@ -425,6 +431,8 @@ EOF
|
||||||
org.apache.hadoop.hbase.util.Bytes::toLong(cell.getValue)
|
org.apache.hadoop.hbase.util.Bytes::toLong(cell.getValue)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# rubocop:disable Metrics/AbcSize
|
||||||
|
# rubocop:disable Metrics/MethodLength
|
||||||
def _hash_to_scan(args)
|
def _hash_to_scan(args)
|
||||||
if args.any?
|
if args.any?
|
||||||
enablemetrics = args["ALL_METRICS"].nil? ? false : args["ALL_METRICS"]
|
enablemetrics = args["ALL_METRICS"].nil? ? false : args["ALL_METRICS"]
|
||||||
|
@ -453,10 +461,10 @@ EOF
|
||||||
end
|
end
|
||||||
|
|
||||||
scan = if stoprow
|
scan = if stoprow
|
||||||
org.apache.hadoop.hbase.client.Scan.new(startrow.to_java_bytes, stoprow.to_java_bytes)
|
Scan.new(startrow.to_java_bytes, stoprow.to_java_bytes)
|
||||||
else
|
else
|
||||||
org.apache.hadoop.hbase.client.Scan.new(startrow.to_java_bytes)
|
Scan.new(startrow.to_java_bytes)
|
||||||
end
|
end
|
||||||
|
|
||||||
# This will overwrite any startrow/stoprow settings
|
# This will overwrite any startrow/stoprow settings
|
||||||
scan.setRowPrefixFilter(rowprefixfilter.to_java_bytes) if rowprefixfilter
|
scan.setRowPrefixFilter(rowprefixfilter.to_java_bytes) if rowprefixfilter
|
||||||
|
@ -493,11 +501,13 @@ EOF
|
||||||
set_authorizations(scan, authorizations) if authorizations
|
set_authorizations(scan, authorizations) if authorizations
|
||||||
scan.setConsistency(org.apache.hadoop.hbase.client.Consistency.valueOf(consistency)) if consistency
|
scan.setConsistency(org.apache.hadoop.hbase.client.Consistency.valueOf(consistency)) if consistency
|
||||||
else
|
else
|
||||||
scan = org.apache.hadoop.hbase.client.Scan.new
|
scan = Scan.new
|
||||||
end
|
end
|
||||||
|
|
||||||
scan
|
scan
|
||||||
end
|
end
|
||||||
|
# rubocop:enable Metrics/MethodLength
|
||||||
|
# rubocop:enable Metrics/AbcSize
|
||||||
|
|
||||||
def _get_scanner(args)
|
def _get_scanner(args)
|
||||||
@table.getScanner(_hash_to_scan(args))
|
@table.getScanner(_hash_to_scan(args))
|
||||||
|
@ -505,10 +515,11 @@ EOF
|
||||||
|
|
||||||
#----------------------------------------------------------------------------------------------
|
#----------------------------------------------------------------------------------------------
|
||||||
# Scans whole table or a range of keys and returns rows matching specific criteria
|
# Scans whole table or a range of keys and returns rows matching specific criteria
|
||||||
|
# rubocop:disable Metrics/AbcSize
|
||||||
def _scan_internal(args = {}, scan = nil)
|
def _scan_internal(args = {}, scan = nil)
|
||||||
raise(ArgumentError, "Args should be a Hash") unless args.kind_of?(Hash)
|
raise(ArgumentError, "Args should be a Hash") unless args.kind_of?(Hash)
|
||||||
raise(ArgumentError, "Scan argument should be org.apache.hadoop.hbase.client.Scan") \
|
raise(ArgumentError, "Scan argument should be org.apache.hadoop.hbase.client.Scan") \
|
||||||
unless scan == nil || scan.kind_of?(org.apache.hadoop.hbase.client.Scan)
|
unless scan.nil? || scan.is_a?(Scan)
|
||||||
|
|
||||||
limit = args["LIMIT"] || -1
|
limit = args["LIMIT"] || -1
|
||||||
maxlength = args.delete("MAXLENGTH") || -1
|
maxlength = args.delete("MAXLENGTH") || -1
|
||||||
|
@ -552,8 +563,9 @@ EOF
|
||||||
scanner.close()
|
scanner.close()
|
||||||
return ((block_given?) ? [count, is_stale] : res)
|
return ((block_given?) ? [count, is_stale] : res)
|
||||||
end
|
end
|
||||||
|
# rubocop:enable Metrics/AbcSize
|
||||||
|
|
||||||
# Apply OperationAttributes to puts/scans/gets
|
# Apply OperationAttributes to puts/scans/gets
|
||||||
def set_attributes(oprattr, attributes)
|
def set_attributes(oprattr, attributes)
|
||||||
raise(ArgumentError, "Attributes must be a Hash type") unless attributes.kind_of?(Hash)
|
raise(ArgumentError, "Attributes must be a Hash type") unless attributes.kind_of?(Hash)
|
||||||
for k,v in attributes
|
for k,v in attributes
|
||||||
|
@ -723,11 +735,13 @@ EOF
|
||||||
# rubocop:disable Style/MultilineBlockChain
|
# rubocop:disable Style/MultilineBlockChain
|
||||||
def _get_splits_internal()
|
def _get_splits_internal()
|
||||||
locator = @table.getRegionLocator
|
locator = @table.getRegionLocator
|
||||||
locator.getAllRegionLocations.map do |i|
|
locator.getAllRegionLocations.select do |s|
|
||||||
|
RegionReplicaUtil.isDefaultReplica(s.getRegionInfo)
|
||||||
|
end.map do |i|
|
||||||
Bytes.toStringBinary(i.getRegionInfo.getStartKey)
|
Bytes.toStringBinary(i.getRegionInfo.getStartKey)
|
||||||
end.delete_if { |k| k == '' }
|
end.delete_if { |k| k == '' }
|
||||||
ensure
|
ensure
|
||||||
locator.close()
|
locator.close
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
# rubocop:enable Style/MultilineBlockChain
|
# rubocop:enable Style/MultilineBlockChain
|
||||||
|
|
|
@ -188,6 +188,7 @@ module Hbase
|
||||||
end
|
end
|
||||||
|
|
||||||
# Complex data management methods tests
|
# Complex data management methods tests
|
||||||
|
# rubocop:disable Metrics/ClassLength
|
||||||
class TableComplexMethodsTest < Test::Unit::TestCase
|
class TableComplexMethodsTest < Test::Unit::TestCase
|
||||||
include TestHelpers
|
include TestHelpers
|
||||||
|
|
||||||
|
@ -302,7 +303,8 @@ module Hbase
|
||||||
assert_not_nil(res['x:b'])
|
assert_not_nil(res['x:b'])
|
||||||
end
|
end
|
||||||
|
|
||||||
define_test "get should work with hash columns spec and TIMESTAMP and AUTHORIZATIONS" do
|
define_test 'get should work with hash columns spec and TIMESTAMP and' \
|
||||||
|
' AUTHORIZATIONS' do
|
||||||
res = @test_table._get_internal('1', TIMESTAMP => 1234, AUTHORIZATIONS=>['PRIVATE'])
|
res = @test_table._get_internal('1', TIMESTAMP => 1234, AUTHORIZATIONS=>['PRIVATE'])
|
||||||
assert_nil(res)
|
assert_nil(res)
|
||||||
end
|
end
|
||||||
|
@ -635,5 +637,19 @@ module Hbase
|
||||||
assert_equal(0, splits.size)
|
assert_equal(0, splits.size)
|
||||||
assert_equal([], splits)
|
assert_equal([], splits)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
define_test 'Split count for a table with region replicas' do
|
||||||
|
@test_table_name = 'tableWithRegionReplicas'
|
||||||
|
create_test_table_with_region_replicas(@test_table_name, 3,
|
||||||
|
SPLITS => ['10'])
|
||||||
|
@table = table(@test_table_name)
|
||||||
|
splits = @table._get_splits_internal
|
||||||
|
# In this case, total splits should be 1 even if the number of region
|
||||||
|
# replicas is 3.
|
||||||
|
assert_equal(1, splits.size)
|
||||||
|
assert_equal(['10'], splits)
|
||||||
|
drop_test_table(@test_table_name)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
# rubocop:enable Metrics/ClassLength
|
||||||
end
|
end
|
||||||
|
|
|
@ -107,6 +107,17 @@ module Hbase
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def create_test_table_with_region_replicas(name, num_of_replicas, splits)
|
||||||
|
# Create the table if needed
|
||||||
|
unless admin.exists?(name)
|
||||||
|
admin.create name, 'f1', { REGION_REPLICATION => num_of_replicas },
|
||||||
|
splits
|
||||||
|
end
|
||||||
|
|
||||||
|
# Enable the table if needed
|
||||||
|
admin.enable(name) unless admin.enabled?(name)
|
||||||
|
end
|
||||||
|
|
||||||
def drop_test_table(name)
|
def drop_test_table(name)
|
||||||
return unless admin.exists?(name)
|
return unless admin.exists?(name)
|
||||||
begin
|
begin
|
||||||
|
|
Loading…
Reference in New Issue