HBASE-12904 Threading issues in region_mover.rb
This commit is contained in:
parent
418ea93774
commit
e370baf8a5
@ -43,41 +43,6 @@ import org.apache.hadoop.hbase.HRegionInfo
|
|||||||
# Name of this script
|
# Name of this script
|
||||||
NAME = "region_mover"
|
NAME = "region_mover"
|
||||||
|
|
||||||
# Get meta table reference
|
|
||||||
def getMetaTable(config)
|
|
||||||
# Keep meta reference in ruby global
|
|
||||||
if not $META
|
|
||||||
$META = HTable.new(config, HConstants::META_TABLE_NAME)
|
|
||||||
end
|
|
||||||
return $META
|
|
||||||
end
|
|
||||||
|
|
||||||
# Get table instance.
|
|
||||||
# Maintains cache of table instances.
|
|
||||||
def getTable(config, name)
|
|
||||||
# Keep dictionary of tables in ruby global
|
|
||||||
if not $TABLES
|
|
||||||
$TABLES = {}
|
|
||||||
end
|
|
||||||
key = name.toString()
|
|
||||||
if not $TABLES[key]
|
|
||||||
$TABLES[key] = HTable.new(config, name)
|
|
||||||
end
|
|
||||||
return $TABLES[key]
|
|
||||||
end
|
|
||||||
|
|
||||||
def closeTables()
|
|
||||||
if not $TABLES
|
|
||||||
return
|
|
||||||
end
|
|
||||||
|
|
||||||
$LOG.info("Close all tables")
|
|
||||||
$TABLES.each do |name, table|
|
|
||||||
$TABLES.delete(name)
|
|
||||||
table.close()
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
# Returns true if passed region is still on 'original' when we look at hbase:meta.
|
# Returns true if passed region is still on 'original' when we look at hbase:meta.
|
||||||
def isSameServer(admin, r, original)
|
def isSameServer(admin, r, original)
|
||||||
server = getServerNameForRegion(admin, r)
|
server = getServerNameForRegion(admin, r)
|
||||||
@ -111,17 +76,20 @@ def getServerNameForRegion(admin, r)
|
|||||||
zkw.close()
|
zkw.close()
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
table = nil
|
table = HTable.new(admin.getConfiguration(), HConstants::META_TABLE_NAME)
|
||||||
table = getMetaTable(admin.getConfiguration())
|
begin
|
||||||
g = Get.new(r.getRegionName())
|
g = Get.new(r.getRegionName())
|
||||||
g.addColumn(HConstants::CATALOG_FAMILY, HConstants::SERVER_QUALIFIER)
|
g.addColumn(HConstants::CATALOG_FAMILY, HConstants::SERVER_QUALIFIER)
|
||||||
g.addColumn(HConstants::CATALOG_FAMILY, HConstants::STARTCODE_QUALIFIER)
|
g.addColumn(HConstants::CATALOG_FAMILY, HConstants::STARTCODE_QUALIFIER)
|
||||||
result = table.get(g)
|
result = table.get(g)
|
||||||
return nil unless result
|
return nil unless result
|
||||||
server = result.getValue(HConstants::CATALOG_FAMILY, HConstants::SERVER_QUALIFIER)
|
server = result.getValue(HConstants::CATALOG_FAMILY, HConstants::SERVER_QUALIFIER)
|
||||||
startcode = result.getValue(HConstants::CATALOG_FAMILY, HConstants::STARTCODE_QUALIFIER)
|
startcode = result.getValue(HConstants::CATALOG_FAMILY, HConstants::STARTCODE_QUALIFIER)
|
||||||
return nil unless server
|
return nil unless server
|
||||||
return java.lang.String.new(Bytes.toString(server)).replaceFirst(":", ",") + "," + Bytes.toLong(startcode).to_s
|
return java.lang.String.new(Bytes.toString(server)).replaceFirst(":", ",") + "," + Bytes.toLong(startcode).to_s
|
||||||
|
ensure
|
||||||
|
table.close()
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
# Trys to scan a row from passed region
|
# Trys to scan a row from passed region
|
||||||
@ -132,23 +100,22 @@ def isSuccessfulScan(admin, r)
|
|||||||
scan.setCaching(1)
|
scan.setCaching(1)
|
||||||
scan.setFilter(FirstKeyOnlyFilter.new())
|
scan.setFilter(FirstKeyOnlyFilter.new())
|
||||||
begin
|
begin
|
||||||
table = getTable(admin.getConfiguration(), r.getTableName())
|
table = HTable.new(admin.getConfiguration(), r.getTableName())
|
||||||
scanner = table.getScanner(scan)
|
scanner = table.getScanner(scan)
|
||||||
|
begin
|
||||||
|
results = scanner.next()
|
||||||
|
# We might scan into next region, this might be an empty table.
|
||||||
|
# But if no exception, presume scanning is working.
|
||||||
|
ensure
|
||||||
|
scanner.close()
|
||||||
|
end
|
||||||
rescue org.apache.hadoop.hbase.TableNotFoundException,
|
rescue org.apache.hadoop.hbase.TableNotFoundException,
|
||||||
org.apache.hadoop.hbase.TableNotEnabledException => e
|
org.apache.hadoop.hbase.TableNotEnabledException => e
|
||||||
$LOG.warn("Region " + r.getEncodedName() + " belongs to recently " +
|
$LOG.warn("Region " + r.getEncodedName() + " belongs to recently " +
|
||||||
"deleted/disabled table. Skipping... " + e.message)
|
"deleted/disabled table. Skipping... " + e.message)
|
||||||
return
|
return
|
||||||
end
|
|
||||||
begin
|
|
||||||
results = scanner.next()
|
|
||||||
# We might scan into next region, this might be an empty table.
|
|
||||||
# But if no exception, presume scanning is working.
|
|
||||||
ensure
|
ensure
|
||||||
scanner.close()
|
table.close() unless table.nil?
|
||||||
# Do not close the htable. It is cached in $TABLES and
|
|
||||||
# may be reused in moving another region of same table.
|
|
||||||
# table.close()
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
@ -517,5 +484,3 @@ case ARGV[0]
|
|||||||
puts optparse
|
puts optparse
|
||||||
exit 3
|
exit 3
|
||||||
end
|
end
|
||||||
|
|
||||||
closeTables()
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user