HBASE-1699 Remove hbrep example as it's too out of date
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@797512 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
926a17e351
commit
3e7c6a6e4f
|
@ -508,6 +508,7 @@ Release 0.20.0 - Unreleased
|
||||||
HBASE-1688 Improve javadocs in Result and KeyValue
|
HBASE-1688 Improve javadocs in Result and KeyValue
|
||||||
HBASE-1694 Add TOC to 'Getting Started', add references to THBase and
|
HBASE-1694 Add TOC to 'Getting Started', add references to THBase and
|
||||||
ITHBase
|
ITHBase
|
||||||
|
HBASE-1699 Remove hbrep example as it's too out of date
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
HBASE-1412 Change values for delete column and column family in KeyValue
|
HBASE-1412 Change values for delete column and column family in KeyValue
|
||||||
|
|
|
@ -1,39 +0,0 @@
|
||||||
import sys, os
|
|
||||||
|
|
||||||
from Hbase.ttypes import *
|
|
||||||
from Hbase import Hbase
|
|
||||||
|
|
||||||
from thrift import Thrift
|
|
||||||
from thrift.transport import TSocket, TTransport
|
|
||||||
from thrift.protocol import TBinaryProtocol
|
|
||||||
|
|
||||||
class HBaseConnection:
|
|
||||||
def __init__(self, hostname, port):
|
|
||||||
# Make socket
|
|
||||||
self.transport = TSocket.TSocket(hostname, port)
|
|
||||||
# Buffering is critical. Raw sockets are very slow
|
|
||||||
self.transport = TTransport.TBufferedTransport(self.transport)
|
|
||||||
# Wrap in a protocol
|
|
||||||
self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
|
|
||||||
# Create a client to use the protocol encoder
|
|
||||||
self.client = Hbase.Client(self.protocol)
|
|
||||||
|
|
||||||
def connect(self):
|
|
||||||
self.transport.open()
|
|
||||||
|
|
||||||
def disconnect(self):
|
|
||||||
self.transport.close()
|
|
||||||
|
|
||||||
def validate_column_descriptors(self, table_name, column_descriptors):
|
|
||||||
hbase_families = self.client.getColumnDescriptors(table_name)
|
|
||||||
for col_desc in column_descriptors:
|
|
||||||
family, column = col_desc.split(":")
|
|
||||||
if not family in hbase_families:
|
|
||||||
raise Exception("Invalid column descriptor \"%s\" for hbase table \"%s\"" % (col_desc,table_name))
|
|
||||||
|
|
||||||
def validate_table_name(self, table_name):
|
|
||||||
if not table_name in self.client.getTableNames():
|
|
||||||
raise Exception("hbase table '%s' not found." % (table_name))
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,90 +0,0 @@
|
||||||
import sys, os, pgq, skytools, ConfigParser
|
|
||||||
|
|
||||||
from thrift import Thrift
|
|
||||||
from thrift.transport import TSocket, TTransport
|
|
||||||
from thrift.protocol import TBinaryProtocol
|
|
||||||
|
|
||||||
from HBaseConnection import *
|
|
||||||
import tablemapping
|
|
||||||
|
|
||||||
INSERT = 'I'
|
|
||||||
UPDATE = 'U'
|
|
||||||
DELETE = 'D'
|
|
||||||
|
|
||||||
class HBaseConsumer(pgq.Consumer):
|
|
||||||
"""HBaseConsumer is a pgq.Consumer that sends processed events to hbase as mutations."""
|
|
||||||
|
|
||||||
def __init__(self, service_name, args):
|
|
||||||
pgq.Consumer.__init__(self, service_name, "postgresql_db", args)
|
|
||||||
|
|
||||||
config_file = self.args[0]
|
|
||||||
if len(self.args) < 2:
|
|
||||||
print "need table names"
|
|
||||||
sys.exit(1)
|
|
||||||
else:
|
|
||||||
self.table_names = self.args[1:]
|
|
||||||
|
|
||||||
#just to check this option exists
|
|
||||||
self.cf.get("postgresql_db")
|
|
||||||
|
|
||||||
self.max_batch_size = int(self.cf.get("max_batch_size", "10000"))
|
|
||||||
self.hbase_hostname = self.cf.get("hbase_hostname", "localhost")
|
|
||||||
self.hbase_port = int(self.cf.get("hbase_port", "9090"))
|
|
||||||
self.row_limit = int(self.cf.get("bootstrap_row_limit", 0))
|
|
||||||
self.table_mappings = tablemapping.load_table_mappings(config_file, self.table_names)
|
|
||||||
|
|
||||||
def process_batch(self, source_db, batch_id, event_list):
|
|
||||||
try:
|
|
||||||
self.log.debug("processing batch %s" % (batch_id))
|
|
||||||
hbase = HBaseConnection(self.hbase_hostname, self.hbase_port)
|
|
||||||
try:
|
|
||||||
self.log.debug("Connecting to HBase")
|
|
||||||
hbase.connect()
|
|
||||||
|
|
||||||
i = 0L
|
|
||||||
for event in event_list:
|
|
||||||
i = i+1
|
|
||||||
self.process_event(event, hbase)
|
|
||||||
print "%i events processed" % (i)
|
|
||||||
|
|
||||||
except Exception, e:
|
|
||||||
#self.log.info(e)
|
|
||||||
sys.exit(e)
|
|
||||||
|
|
||||||
finally:
|
|
||||||
hbase.disconnect()
|
|
||||||
|
|
||||||
def process_event(self, event, hbase):
|
|
||||||
if event.ev_extra1 in self.table_mappings:
|
|
||||||
table_mapping = self.table_mappings[event.ev_extra1]
|
|
||||||
else:
|
|
||||||
self.log.info("table name not found in config, skipping event")
|
|
||||||
return
|
|
||||||
#hbase.validate_table_name(table_mapping.hbase_table_name)
|
|
||||||
#hbase.validate_column_descriptors(table_mapping.hbase_table_name, table_mapping.hbase_column_descriptors)
|
|
||||||
event_data = skytools.db_urldecode(event.data)
|
|
||||||
event_type = event.type.split(':')[0]
|
|
||||||
|
|
||||||
batch = BatchMutation()
|
|
||||||
batch.row = table_mapping.hbase_row_prefix + str(event_data[table_mapping.psql_key_column])
|
|
||||||
|
|
||||||
batch.mutations = []
|
|
||||||
for psql_column, hbase_column in zip(table_mapping.psql_columns, table_mapping.hbase_column_descriptors):
|
|
||||||
if event_type == INSERT or event_type == UPDATE:
|
|
||||||
m = Mutation()
|
|
||||||
m.column = hbase_column
|
|
||||||
m.value = str(event_data[psql_column])
|
|
||||||
elif event_type == DELETE:
|
|
||||||
# delete this column entry
|
|
||||||
m = Mutation()
|
|
||||||
m.isDelete = True
|
|
||||||
m.column = hbase_column
|
|
||||||
else:
|
|
||||||
raise Exception("Invalid event type: %s, event data was: %s" % (event_type, str(event_data)))
|
|
||||||
batch.mutations.append(m)
|
|
||||||
hbase.client.mutateRow(table_mapping.hbase_table_name, batch.row, batch.mutations)
|
|
||||||
event.tag_done()
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
script = HBaseConsumer("HBaseReplic",sys.argv[1:])
|
|
||||||
script.start()
|
|
|
@ -1,247 +0,0 @@
|
||||||
#!/usr/bin/env python
|
|
||||||
#
|
|
||||||
# Autogenerated by Thrift
|
|
||||||
#
|
|
||||||
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
|
|
||||||
#
|
|
||||||
|
|
||||||
import sys
|
|
||||||
import pprint
|
|
||||||
from urlparse import urlparse
|
|
||||||
from thrift.transport import TTransport
|
|
||||||
from thrift.transport import TSocket
|
|
||||||
from thrift.transport import THttpClient
|
|
||||||
from thrift.protocol import TBinaryProtocol
|
|
||||||
|
|
||||||
import Hbase
|
|
||||||
from ttypes import *
|
|
||||||
|
|
||||||
if len(sys.argv) <= 1 or sys.argv[1] == '--help':
|
|
||||||
print ''
|
|
||||||
print 'Usage: ' + sys.argv[0] + ' [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]'
|
|
||||||
print ''
|
|
||||||
print 'Functions:'
|
|
||||||
print ' getTableNames()'
|
|
||||||
print ' getColumnDescriptors(Text tableName)'
|
|
||||||
print ' getTableRegions(Text tableName)'
|
|
||||||
print ' void createTable(Text tableName, columnFamilies)'
|
|
||||||
print ' void deleteTable(Text tableName)'
|
|
||||||
print ' Bytes get(Text tableName, Text row, Text column)'
|
|
||||||
print ' getVer(Text tableName, Text row, Text column, i32 numVersions)'
|
|
||||||
print ' getVerTs(Text tableName, Text row, Text column, i64 timestamp, i32 numVersions)'
|
|
||||||
print ' getRow(Text tableName, Text row)'
|
|
||||||
print ' getRowTs(Text tableName, Text row, i64 timestamp)'
|
|
||||||
print ' void put(Text tableName, Text row, Text column, Bytes value)'
|
|
||||||
print ' void mutateRow(Text tableName, Text row, mutations)'
|
|
||||||
print ' void mutateRowTs(Text tableName, Text row, mutations, i64 timestamp)'
|
|
||||||
print ' void mutateRows(Text tableName, rowBatches)'
|
|
||||||
print ' void mutateRowsTs(Text tableName, rowBatches, i64 timestamp)'
|
|
||||||
print ' void deleteAll(Text tableName, Text row, Text column)'
|
|
||||||
print ' void deleteAllTs(Text tableName, Text row, Text column, i64 timestamp)'
|
|
||||||
print ' void deleteAllRow(Text tableName, Text row)'
|
|
||||||
print ' void deleteAllRowTs(Text tableName, Text row, i64 timestamp)'
|
|
||||||
print ' ScannerID scannerOpen(Text tableName, Text startRow, columns)'
|
|
||||||
print ' ScannerID scannerOpenWithStop(Text tableName, Text startRow, Text stopRow, columns)'
|
|
||||||
print ' ScannerID scannerOpenTs(Text tableName, Text startRow, columns, i64 timestamp)'
|
|
||||||
print ' ScannerID scannerOpenWithStopTs(Text tableName, Text startRow, Text stopRow, columns, i64 timestamp)'
|
|
||||||
print ' ScanEntry scannerGet(ScannerID id)'
|
|
||||||
print ' void scannerClose(ScannerID id)'
|
|
||||||
print ''
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
pp = pprint.PrettyPrinter(indent = 2)
|
|
||||||
host = 'localhost'
|
|
||||||
port = 9090
|
|
||||||
uri = ''
|
|
||||||
framed = False
|
|
||||||
http = False
|
|
||||||
argi = 1
|
|
||||||
|
|
||||||
if sys.argv[argi] == '-h':
|
|
||||||
parts = sys.argv[argi+1].split(':')
|
|
||||||
host = parts[0]
|
|
||||||
port = int(parts[1])
|
|
||||||
argi += 2
|
|
||||||
|
|
||||||
if sys.argv[argi] == '-u':
|
|
||||||
url = urlparse(sys.argv[argi+1])
|
|
||||||
parts = url[1].split(':')
|
|
||||||
host = parts[0]
|
|
||||||
if len(parts) > 1:
|
|
||||||
port = int(parts[1])
|
|
||||||
else:
|
|
||||||
port = 80
|
|
||||||
uri = url[2]
|
|
||||||
http = True
|
|
||||||
argi += 2
|
|
||||||
|
|
||||||
if sys.argv[argi] == '-f' or sys.argv[argi] == '-framed':
|
|
||||||
framed = True
|
|
||||||
argi += 1
|
|
||||||
|
|
||||||
cmd = sys.argv[argi]
|
|
||||||
args = sys.argv[argi+1:]
|
|
||||||
|
|
||||||
if http:
|
|
||||||
transport = THttpClient.THttpClient(host, port, uri)
|
|
||||||
else:
|
|
||||||
socket = TSocket.TSocket(host, port)
|
|
||||||
if framed:
|
|
||||||
transport = TTransport.TFramedTransport(socket)
|
|
||||||
else:
|
|
||||||
transport = TTransport.TBufferedTransport(socket)
|
|
||||||
protocol = TBinaryProtocol.TBinaryProtocol(transport)
|
|
||||||
client = Hbase.Client(protocol)
|
|
||||||
transport.open()
|
|
||||||
|
|
||||||
if cmd == 'getTableNames':
|
|
||||||
if len(args) != 0:
|
|
||||||
print 'getTableNames requires 0 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.getTableNames())
|
|
||||||
|
|
||||||
elif cmd == 'getColumnDescriptors':
|
|
||||||
if len(args) != 1:
|
|
||||||
print 'getColumnDescriptors requires 1 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.getColumnDescriptors(eval(args[0]),))
|
|
||||||
|
|
||||||
elif cmd == 'getTableRegions':
|
|
||||||
if len(args) != 1:
|
|
||||||
print 'getTableRegions requires 1 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.getTableRegions(eval(args[0]),))
|
|
||||||
|
|
||||||
elif cmd == 'createTable':
|
|
||||||
if len(args) != 2:
|
|
||||||
print 'createTable requires 2 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.createTable(eval(args[0]),eval(args[1]),))
|
|
||||||
|
|
||||||
elif cmd == 'deleteTable':
|
|
||||||
if len(args) != 1:
|
|
||||||
print 'deleteTable requires 1 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.deleteTable(eval(args[0]),))
|
|
||||||
|
|
||||||
elif cmd == 'get':
|
|
||||||
if len(args) != 3:
|
|
||||||
print 'get requires 3 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.get(eval(args[0]),eval(args[1]),eval(args[2]),))
|
|
||||||
|
|
||||||
elif cmd == 'getVer':
|
|
||||||
if len(args) != 4:
|
|
||||||
print 'getVer requires 4 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.getVer(eval(args[0]),eval(args[1]),eval(args[2]),eval(args[3]),))
|
|
||||||
|
|
||||||
elif cmd == 'getVerTs':
|
|
||||||
if len(args) != 5:
|
|
||||||
print 'getVerTs requires 5 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.getVerTs(eval(args[0]),eval(args[1]),eval(args[2]),eval(args[3]),eval(args[4]),))
|
|
||||||
|
|
||||||
elif cmd == 'getRow':
|
|
||||||
if len(args) != 2:
|
|
||||||
print 'getRow requires 2 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.getRow(eval(args[0]),eval(args[1]),))
|
|
||||||
|
|
||||||
elif cmd == 'getRowTs':
|
|
||||||
if len(args) != 3:
|
|
||||||
print 'getRowTs requires 3 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.getRowTs(eval(args[0]),eval(args[1]),eval(args[2]),))
|
|
||||||
|
|
||||||
elif cmd == 'put':
|
|
||||||
if len(args) != 4:
|
|
||||||
print 'put requires 4 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.put(eval(args[0]),eval(args[1]),eval(args[2]),eval(args[3]),))
|
|
||||||
|
|
||||||
elif cmd == 'mutateRow':
|
|
||||||
if len(args) != 3:
|
|
||||||
print 'mutateRow requires 3 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.mutateRow(eval(args[0]),eval(args[1]),eval(args[2]),))
|
|
||||||
|
|
||||||
elif cmd == 'mutateRowTs':
|
|
||||||
if len(args) != 4:
|
|
||||||
print 'mutateRowTs requires 4 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.mutateRowTs(eval(args[0]),eval(args[1]),eval(args[2]),eval(args[3]),))
|
|
||||||
|
|
||||||
elif cmd == 'mutateRows':
|
|
||||||
if len(args) != 2:
|
|
||||||
print 'mutateRows requires 2 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.mutateRows(eval(args[0]),eval(args[1]),))
|
|
||||||
|
|
||||||
elif cmd == 'mutateRowsTs':
|
|
||||||
if len(args) != 3:
|
|
||||||
print 'mutateRowsTs requires 3 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.mutateRowsTs(eval(args[0]),eval(args[1]),eval(args[2]),))
|
|
||||||
|
|
||||||
elif cmd == 'deleteAll':
|
|
||||||
if len(args) != 3:
|
|
||||||
print 'deleteAll requires 3 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.deleteAll(eval(args[0]),eval(args[1]),eval(args[2]),))
|
|
||||||
|
|
||||||
elif cmd == 'deleteAllTs':
|
|
||||||
if len(args) != 4:
|
|
||||||
print 'deleteAllTs requires 4 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.deleteAllTs(eval(args[0]),eval(args[1]),eval(args[2]),eval(args[3]),))
|
|
||||||
|
|
||||||
elif cmd == 'deleteAllRow':
|
|
||||||
if len(args) != 2:
|
|
||||||
print 'deleteAllRow requires 2 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.deleteAllRow(eval(args[0]),eval(args[1]),))
|
|
||||||
|
|
||||||
elif cmd == 'deleteAllRowTs':
|
|
||||||
if len(args) != 3:
|
|
||||||
print 'deleteAllRowTs requires 3 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.deleteAllRowTs(eval(args[0]),eval(args[1]),eval(args[2]),))
|
|
||||||
|
|
||||||
elif cmd == 'scannerOpen':
|
|
||||||
if len(args) != 3:
|
|
||||||
print 'scannerOpen requires 3 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.scannerOpen(eval(args[0]),eval(args[1]),eval(args[2]),))
|
|
||||||
|
|
||||||
elif cmd == 'scannerOpenWithStop':
|
|
||||||
if len(args) != 4:
|
|
||||||
print 'scannerOpenWithStop requires 4 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.scannerOpenWithStop(eval(args[0]),eval(args[1]),eval(args[2]),eval(args[3]),))
|
|
||||||
|
|
||||||
elif cmd == 'scannerOpenTs':
|
|
||||||
if len(args) != 4:
|
|
||||||
print 'scannerOpenTs requires 4 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.scannerOpenTs(eval(args[0]),eval(args[1]),eval(args[2]),eval(args[3]),))
|
|
||||||
|
|
||||||
elif cmd == 'scannerOpenWithStopTs':
|
|
||||||
if len(args) != 5:
|
|
||||||
print 'scannerOpenWithStopTs requires 5 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.scannerOpenWithStopTs(eval(args[0]),eval(args[1]),eval(args[2]),eval(args[3]),eval(args[4]),))
|
|
||||||
|
|
||||||
elif cmd == 'scannerGet':
|
|
||||||
if len(args) != 1:
|
|
||||||
print 'scannerGet requires 1 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.scannerGet(eval(args[0]),))
|
|
||||||
|
|
||||||
elif cmd == 'scannerClose':
|
|
||||||
if len(args) != 1:
|
|
||||||
print 'scannerClose requires 1 args'
|
|
||||||
sys.exit(1)
|
|
||||||
pp.pprint(client.scannerClose(eval(args[0]),))
|
|
||||||
|
|
||||||
transport.close()
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1 +0,0 @@
|
||||||
__all__ = ['ttypes', 'constants', 'Hbase']
|
|
|
@ -1,9 +0,0 @@
|
||||||
#
|
|
||||||
# Autogenerated by Thrift
|
|
||||||
#
|
|
||||||
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
|
|
||||||
#
|
|
||||||
|
|
||||||
from thrift.Thrift import *
|
|
||||||
from ttypes import *
|
|
||||||
|
|
|
@ -1,708 +0,0 @@
|
||||||
#
|
|
||||||
# Autogenerated by Thrift
|
|
||||||
#
|
|
||||||
# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
|
|
||||||
#
|
|
||||||
|
|
||||||
from thrift.Thrift import *
|
|
||||||
|
|
||||||
from thrift.transport import TTransport
|
|
||||||
from thrift.protocol import TBinaryProtocol
|
|
||||||
try:
|
|
||||||
from thrift.protocol import fastbinary
|
|
||||||
except:
|
|
||||||
fastbinary = None
|
|
||||||
|
|
||||||
|
|
||||||
class ColumnDescriptor:
|
|
||||||
|
|
||||||
thrift_spec = (
|
|
||||||
None, # 0
|
|
||||||
(1, TType.STRING, 'name', None, None, ), # 1
|
|
||||||
(2, TType.I32, 'maxVersions', None, None, ), # 2
|
|
||||||
(3, TType.STRING, 'compression', None, None, ), # 3
|
|
||||||
(4, TType.BOOL, 'inMemory', None, None, ), # 4
|
|
||||||
(5, TType.I32, 'maxValueLength', None, None, ), # 5
|
|
||||||
(6, TType.STRING, 'bloomFilterType', None, None, ), # 6
|
|
||||||
(7, TType.I32, 'bloomFilterVectorSize', None, None, ), # 7
|
|
||||||
(8, TType.I32, 'bloomFilterNbHashes', None, None, ), # 8
|
|
||||||
(9, TType.BOOL, 'blockCacheEnabled', None, None, ), # 9
|
|
||||||
(10, TType.I32, 'timeToLive', None, None, ), # 10
|
|
||||||
)
|
|
||||||
|
|
||||||
def __init__(self, d=None):
|
|
||||||
self.name = None
|
|
||||||
self.maxVersions = 3
|
|
||||||
self.compression = 'NONE'
|
|
||||||
self.inMemory = False
|
|
||||||
self.maxValueLength = 2147483647
|
|
||||||
self.bloomFilterType = 'NONE'
|
|
||||||
self.bloomFilterVectorSize = 0
|
|
||||||
self.bloomFilterNbHashes = 0
|
|
||||||
self.blockCacheEnabled = False
|
|
||||||
self.timeToLive = -1
|
|
||||||
if isinstance(d, dict):
|
|
||||||
if 'name' in d:
|
|
||||||
self.name = d['name']
|
|
||||||
if 'maxVersions' in d:
|
|
||||||
self.maxVersions = d['maxVersions']
|
|
||||||
if 'compression' in d:
|
|
||||||
self.compression = d['compression']
|
|
||||||
if 'inMemory' in d:
|
|
||||||
self.inMemory = d['inMemory']
|
|
||||||
if 'maxValueLength' in d:
|
|
||||||
self.maxValueLength = d['maxValueLength']
|
|
||||||
if 'bloomFilterType' in d:
|
|
||||||
self.bloomFilterType = d['bloomFilterType']
|
|
||||||
if 'bloomFilterVectorSize' in d:
|
|
||||||
self.bloomFilterVectorSize = d['bloomFilterVectorSize']
|
|
||||||
if 'bloomFilterNbHashes' in d:
|
|
||||||
self.bloomFilterNbHashes = d['bloomFilterNbHashes']
|
|
||||||
if 'blockCacheEnabled' in d:
|
|
||||||
self.blockCacheEnabled = d['blockCacheEnabled']
|
|
||||||
if 'timeToLive' in d:
|
|
||||||
self.timeToLive = d['timeToLive']
|
|
||||||
|
|
||||||
def read(self, iprot):
|
|
||||||
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
|
|
||||||
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
|
|
||||||
return
|
|
||||||
iprot.readStructBegin()
|
|
||||||
while True:
|
|
||||||
(fname, ftype, fid) = iprot.readFieldBegin()
|
|
||||||
if ftype == TType.STOP:
|
|
||||||
break
|
|
||||||
if fid == 1:
|
|
||||||
if ftype == TType.STRING:
|
|
||||||
self.name = iprot.readString();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
elif fid == 2:
|
|
||||||
if ftype == TType.I32:
|
|
||||||
self.maxVersions = iprot.readI32();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
elif fid == 3:
|
|
||||||
if ftype == TType.STRING:
|
|
||||||
self.compression = iprot.readString();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
elif fid == 4:
|
|
||||||
if ftype == TType.BOOL:
|
|
||||||
self.inMemory = iprot.readBool();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
elif fid == 5:
|
|
||||||
if ftype == TType.I32:
|
|
||||||
self.maxValueLength = iprot.readI32();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
elif fid == 6:
|
|
||||||
if ftype == TType.STRING:
|
|
||||||
self.bloomFilterType = iprot.readString();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
elif fid == 7:
|
|
||||||
if ftype == TType.I32:
|
|
||||||
self.bloomFilterVectorSize = iprot.readI32();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
elif fid == 8:
|
|
||||||
if ftype == TType.I32:
|
|
||||||
self.bloomFilterNbHashes = iprot.readI32();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
elif fid == 9:
|
|
||||||
if ftype == TType.BOOL:
|
|
||||||
self.blockCacheEnabled = iprot.readBool();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
elif fid == 10:
|
|
||||||
if ftype == TType.I32:
|
|
||||||
self.timeToLive = iprot.readI32();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
iprot.readFieldEnd()
|
|
||||||
iprot.readStructEnd()
|
|
||||||
|
|
||||||
def write(self, oprot):
|
|
||||||
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
|
|
||||||
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
|
|
||||||
return
|
|
||||||
oprot.writeStructBegin('ColumnDescriptor')
|
|
||||||
if self.name != None:
|
|
||||||
oprot.writeFieldBegin('name', TType.STRING, 1)
|
|
||||||
oprot.writeString(self.name)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
if self.maxVersions != None:
|
|
||||||
oprot.writeFieldBegin('maxVersions', TType.I32, 2)
|
|
||||||
oprot.writeI32(self.maxVersions)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
if self.compression != None:
|
|
||||||
oprot.writeFieldBegin('compression', TType.STRING, 3)
|
|
||||||
oprot.writeString(self.compression)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
if self.inMemory != None:
|
|
||||||
oprot.writeFieldBegin('inMemory', TType.BOOL, 4)
|
|
||||||
oprot.writeBool(self.inMemory)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
if self.maxValueLength != None:
|
|
||||||
oprot.writeFieldBegin('maxValueLength', TType.I32, 5)
|
|
||||||
oprot.writeI32(self.maxValueLength)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
if self.bloomFilterType != None:
|
|
||||||
oprot.writeFieldBegin('bloomFilterType', TType.STRING, 6)
|
|
||||||
oprot.writeString(self.bloomFilterType)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
if self.bloomFilterVectorSize != None:
|
|
||||||
oprot.writeFieldBegin('bloomFilterVectorSize', TType.I32, 7)
|
|
||||||
oprot.writeI32(self.bloomFilterVectorSize)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
if self.bloomFilterNbHashes != None:
|
|
||||||
oprot.writeFieldBegin('bloomFilterNbHashes', TType.I32, 8)
|
|
||||||
oprot.writeI32(self.bloomFilterNbHashes)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
if self.blockCacheEnabled != None:
|
|
||||||
oprot.writeFieldBegin('blockCacheEnabled', TType.BOOL, 9)
|
|
||||||
oprot.writeBool(self.blockCacheEnabled)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
if self.timeToLive != None:
|
|
||||||
oprot.writeFieldBegin('timeToLive', TType.I32, 10)
|
|
||||||
oprot.writeI32(self.timeToLive)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
oprot.writeFieldStop()
|
|
||||||
oprot.writeStructEnd()
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return str(self.__dict__)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return repr(self.__dict__)
|
|
||||||
|
|
||||||
def __eq__(self, other):
|
|
||||||
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
|
|
||||||
|
|
||||||
def __ne__(self, other):
|
|
||||||
return not (self == other)
|
|
||||||
|
|
||||||
class RegionDescriptor:
|
|
||||||
|
|
||||||
thrift_spec = (
|
|
||||||
None, # 0
|
|
||||||
(1, TType.STRING, 'startKey', None, None, ), # 1
|
|
||||||
)
|
|
||||||
|
|
||||||
def __init__(self, d=None):
|
|
||||||
self.startKey = None
|
|
||||||
if isinstance(d, dict):
|
|
||||||
if 'startKey' in d:
|
|
||||||
self.startKey = d['startKey']
|
|
||||||
|
|
||||||
def read(self, iprot):
|
|
||||||
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
|
|
||||||
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
|
|
||||||
return
|
|
||||||
iprot.readStructBegin()
|
|
||||||
while True:
|
|
||||||
(fname, ftype, fid) = iprot.readFieldBegin()
|
|
||||||
if ftype == TType.STOP:
|
|
||||||
break
|
|
||||||
if fid == 1:
|
|
||||||
if ftype == TType.STRING:
|
|
||||||
self.startKey = iprot.readString();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
iprot.readFieldEnd()
|
|
||||||
iprot.readStructEnd()
|
|
||||||
|
|
||||||
def write(self, oprot):
|
|
||||||
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
|
|
||||||
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
|
|
||||||
return
|
|
||||||
oprot.writeStructBegin('RegionDescriptor')
|
|
||||||
if self.startKey != None:
|
|
||||||
oprot.writeFieldBegin('startKey', TType.STRING, 1)
|
|
||||||
oprot.writeString(self.startKey)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
oprot.writeFieldStop()
|
|
||||||
oprot.writeStructEnd()
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return str(self.__dict__)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return repr(self.__dict__)
|
|
||||||
|
|
||||||
def __eq__(self, other):
|
|
||||||
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
|
|
||||||
|
|
||||||
def __ne__(self, other):
|
|
||||||
return not (self == other)
|
|
||||||
|
|
||||||
class Mutation:
|
|
||||||
|
|
||||||
thrift_spec = (
|
|
||||||
None, # 0
|
|
||||||
(1, TType.BOOL, 'isDelete', None, None, ), # 1
|
|
||||||
(2, TType.STRING, 'column', None, None, ), # 2
|
|
||||||
(3, TType.STRING, 'value', None, None, ), # 3
|
|
||||||
)
|
|
||||||
|
|
||||||
def __init__(self, d=None):
|
|
||||||
self.isDelete = False
|
|
||||||
self.column = None
|
|
||||||
self.value = None
|
|
||||||
if isinstance(d, dict):
|
|
||||||
if 'isDelete' in d:
|
|
||||||
self.isDelete = d['isDelete']
|
|
||||||
if 'column' in d:
|
|
||||||
self.column = d['column']
|
|
||||||
if 'value' in d:
|
|
||||||
self.value = d['value']
|
|
||||||
|
|
||||||
def read(self, iprot):
|
|
||||||
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
|
|
||||||
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
|
|
||||||
return
|
|
||||||
iprot.readStructBegin()
|
|
||||||
while True:
|
|
||||||
(fname, ftype, fid) = iprot.readFieldBegin()
|
|
||||||
if ftype == TType.STOP:
|
|
||||||
break
|
|
||||||
if fid == 1:
|
|
||||||
if ftype == TType.BOOL:
|
|
||||||
self.isDelete = iprot.readBool();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
elif fid == 2:
|
|
||||||
if ftype == TType.STRING:
|
|
||||||
self.column = iprot.readString();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
elif fid == 3:
|
|
||||||
if ftype == TType.STRING:
|
|
||||||
self.value = iprot.readString();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
iprot.readFieldEnd()
|
|
||||||
iprot.readStructEnd()
|
|
||||||
|
|
||||||
def write(self, oprot):
|
|
||||||
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
|
|
||||||
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
|
|
||||||
return
|
|
||||||
oprot.writeStructBegin('Mutation')
|
|
||||||
if self.isDelete != None:
|
|
||||||
oprot.writeFieldBegin('isDelete', TType.BOOL, 1)
|
|
||||||
oprot.writeBool(self.isDelete)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
if self.column != None:
|
|
||||||
oprot.writeFieldBegin('column', TType.STRING, 2)
|
|
||||||
oprot.writeString(self.column)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
if self.value != None:
|
|
||||||
oprot.writeFieldBegin('value', TType.STRING, 3)
|
|
||||||
oprot.writeString(self.value)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
oprot.writeFieldStop()
|
|
||||||
oprot.writeStructEnd()
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return str(self.__dict__)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return repr(self.__dict__)
|
|
||||||
|
|
||||||
def __eq__(self, other):
|
|
||||||
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
|
|
||||||
|
|
||||||
def __ne__(self, other):
|
|
||||||
return not (self == other)
|
|
||||||
|
|
||||||
class BatchMutation:
|
|
||||||
|
|
||||||
thrift_spec = (
|
|
||||||
None, # 0
|
|
||||||
(1, TType.STRING, 'row', None, None, ), # 1
|
|
||||||
(2, TType.LIST, 'mutations', (TType.STRUCT,(Mutation, Mutation.thrift_spec)), None, ), # 2
|
|
||||||
)
|
|
||||||
|
|
||||||
def __init__(self, d=None):
|
|
||||||
self.row = None
|
|
||||||
self.mutations = None
|
|
||||||
if isinstance(d, dict):
|
|
||||||
if 'row' in d:
|
|
||||||
self.row = d['row']
|
|
||||||
if 'mutations' in d:
|
|
||||||
self.mutations = d['mutations']
|
|
||||||
|
|
||||||
def read(self, iprot):
|
|
||||||
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
|
|
||||||
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
|
|
||||||
return
|
|
||||||
iprot.readStructBegin()
|
|
||||||
while True:
|
|
||||||
(fname, ftype, fid) = iprot.readFieldBegin()
|
|
||||||
if ftype == TType.STOP:
|
|
||||||
break
|
|
||||||
if fid == 1:
|
|
||||||
if ftype == TType.STRING:
|
|
||||||
self.row = iprot.readString();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
elif fid == 2:
|
|
||||||
if ftype == TType.LIST:
|
|
||||||
self.mutations = []
|
|
||||||
(_etype3, _size0) = iprot.readListBegin()
|
|
||||||
for _i4 in xrange(_size0):
|
|
||||||
_elem5 = Mutation()
|
|
||||||
_elem5.read(iprot)
|
|
||||||
self.mutations.append(_elem5)
|
|
||||||
iprot.readListEnd()
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
iprot.readFieldEnd()
|
|
||||||
iprot.readStructEnd()
|
|
||||||
|
|
||||||
def write(self, oprot):
|
|
||||||
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
|
|
||||||
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
|
|
||||||
return
|
|
||||||
oprot.writeStructBegin('BatchMutation')
|
|
||||||
if self.row != None:
|
|
||||||
oprot.writeFieldBegin('row', TType.STRING, 1)
|
|
||||||
oprot.writeString(self.row)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
if self.mutations != None:
|
|
||||||
oprot.writeFieldBegin('mutations', TType.LIST, 2)
|
|
||||||
oprot.writeListBegin(TType.STRUCT, len(self.mutations))
|
|
||||||
for iter6 in self.mutations:
|
|
||||||
iter6.write(oprot)
|
|
||||||
oprot.writeListEnd()
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
oprot.writeFieldStop()
|
|
||||||
oprot.writeStructEnd()
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return str(self.__dict__)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return repr(self.__dict__)
|
|
||||||
|
|
||||||
def __eq__(self, other):
|
|
||||||
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
|
|
||||||
|
|
||||||
def __ne__(self, other):
|
|
||||||
return not (self == other)
|
|
||||||
|
|
||||||
class ScanEntry:
|
|
||||||
|
|
||||||
thrift_spec = (
|
|
||||||
None, # 0
|
|
||||||
(1, TType.STRING, 'row', None, None, ), # 1
|
|
||||||
(2, TType.MAP, 'columns', (TType.STRING,None,TType.STRING,None), None, ), # 2
|
|
||||||
)
|
|
||||||
|
|
||||||
def __init__(self, d=None):
|
|
||||||
self.row = None
|
|
||||||
self.columns = None
|
|
||||||
if isinstance(d, dict):
|
|
||||||
if 'row' in d:
|
|
||||||
self.row = d['row']
|
|
||||||
if 'columns' in d:
|
|
||||||
self.columns = d['columns']
|
|
||||||
|
|
||||||
def read(self, iprot):
|
|
||||||
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
|
|
||||||
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
|
|
||||||
return
|
|
||||||
iprot.readStructBegin()
|
|
||||||
while True:
|
|
||||||
(fname, ftype, fid) = iprot.readFieldBegin()
|
|
||||||
if ftype == TType.STOP:
|
|
||||||
break
|
|
||||||
if fid == 1:
|
|
||||||
if ftype == TType.STRING:
|
|
||||||
self.row = iprot.readString();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
elif fid == 2:
|
|
||||||
if ftype == TType.MAP:
|
|
||||||
self.columns = {}
|
|
||||||
(_ktype8, _vtype9, _size7 ) = iprot.readMapBegin()
|
|
||||||
for _i11 in xrange(_size7):
|
|
||||||
_key12 = iprot.readString();
|
|
||||||
_val13 = iprot.readString();
|
|
||||||
self.columns[_key12] = _val13
|
|
||||||
iprot.readMapEnd()
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
iprot.readFieldEnd()
|
|
||||||
iprot.readStructEnd()
|
|
||||||
|
|
||||||
def write(self, oprot):
|
|
||||||
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
|
|
||||||
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
|
|
||||||
return
|
|
||||||
oprot.writeStructBegin('ScanEntry')
|
|
||||||
if self.row != None:
|
|
||||||
oprot.writeFieldBegin('row', TType.STRING, 1)
|
|
||||||
oprot.writeString(self.row)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
if self.columns != None:
|
|
||||||
oprot.writeFieldBegin('columns', TType.MAP, 2)
|
|
||||||
oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.columns))
|
|
||||||
for kiter14,viter15 in self.columns.items():
|
|
||||||
oprot.writeString(kiter14)
|
|
||||||
oprot.writeString(viter15)
|
|
||||||
oprot.writeMapEnd()
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
oprot.writeFieldStop()
|
|
||||||
oprot.writeStructEnd()
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return str(self.__dict__)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return repr(self.__dict__)
|
|
||||||
|
|
||||||
def __eq__(self, other):
|
|
||||||
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
|
|
||||||
|
|
||||||
def __ne__(self, other):
|
|
||||||
return not (self == other)
|
|
||||||
|
|
||||||
class IOError(Exception):
|
|
||||||
|
|
||||||
thrift_spec = (
|
|
||||||
None, # 0
|
|
||||||
(1, TType.STRING, 'message', None, None, ), # 1
|
|
||||||
)
|
|
||||||
|
|
||||||
def __init__(self, d=None):
|
|
||||||
self.message = None
|
|
||||||
if isinstance(d, dict):
|
|
||||||
if 'message' in d:
|
|
||||||
self.message = d['message']
|
|
||||||
|
|
||||||
def read(self, iprot):
|
|
||||||
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
|
|
||||||
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
|
|
||||||
return
|
|
||||||
iprot.readStructBegin()
|
|
||||||
while True:
|
|
||||||
(fname, ftype, fid) = iprot.readFieldBegin()
|
|
||||||
if ftype == TType.STOP:
|
|
||||||
break
|
|
||||||
if fid == 1:
|
|
||||||
if ftype == TType.STRING:
|
|
||||||
self.message = iprot.readString();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
iprot.readFieldEnd()
|
|
||||||
iprot.readStructEnd()
|
|
||||||
|
|
||||||
def write(self, oprot):
|
|
||||||
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
|
|
||||||
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
|
|
||||||
return
|
|
||||||
oprot.writeStructBegin('IOError')
|
|
||||||
if self.message != None:
|
|
||||||
oprot.writeFieldBegin('message', TType.STRING, 1)
|
|
||||||
oprot.writeString(self.message)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
oprot.writeFieldStop()
|
|
||||||
oprot.writeStructEnd()
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return str(self.__dict__)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return repr(self.__dict__)
|
|
||||||
|
|
||||||
def __eq__(self, other):
|
|
||||||
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
|
|
||||||
|
|
||||||
def __ne__(self, other):
|
|
||||||
return not (self == other)
|
|
||||||
|
|
||||||
class IllegalArgument(Exception):
|
|
||||||
|
|
||||||
thrift_spec = (
|
|
||||||
None, # 0
|
|
||||||
(1, TType.STRING, 'message', None, None, ), # 1
|
|
||||||
)
|
|
||||||
|
|
||||||
def __init__(self, d=None):
|
|
||||||
self.message = None
|
|
||||||
if isinstance(d, dict):
|
|
||||||
if 'message' in d:
|
|
||||||
self.message = d['message']
|
|
||||||
|
|
||||||
def read(self, iprot):
|
|
||||||
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
|
|
||||||
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
|
|
||||||
return
|
|
||||||
iprot.readStructBegin()
|
|
||||||
while True:
|
|
||||||
(fname, ftype, fid) = iprot.readFieldBegin()
|
|
||||||
if ftype == TType.STOP:
|
|
||||||
break
|
|
||||||
if fid == 1:
|
|
||||||
if ftype == TType.STRING:
|
|
||||||
self.message = iprot.readString();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
iprot.readFieldEnd()
|
|
||||||
iprot.readStructEnd()
|
|
||||||
|
|
||||||
def write(self, oprot):
|
|
||||||
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
|
|
||||||
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
|
|
||||||
return
|
|
||||||
oprot.writeStructBegin('IllegalArgument')
|
|
||||||
if self.message != None:
|
|
||||||
oprot.writeFieldBegin('message', TType.STRING, 1)
|
|
||||||
oprot.writeString(self.message)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
oprot.writeFieldStop()
|
|
||||||
oprot.writeStructEnd()
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return str(self.__dict__)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return repr(self.__dict__)
|
|
||||||
|
|
||||||
def __eq__(self, other):
|
|
||||||
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
|
|
||||||
|
|
||||||
def __ne__(self, other):
|
|
||||||
return not (self == other)
|
|
||||||
|
|
||||||
class NotFound(Exception):
|
|
||||||
|
|
||||||
thrift_spec = (
|
|
||||||
None, # 0
|
|
||||||
(1, TType.STRING, 'message', None, None, ), # 1
|
|
||||||
)
|
|
||||||
|
|
||||||
def __init__(self, d=None):
|
|
||||||
self.message = None
|
|
||||||
if isinstance(d, dict):
|
|
||||||
if 'message' in d:
|
|
||||||
self.message = d['message']
|
|
||||||
|
|
||||||
def read(self, iprot):
|
|
||||||
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
|
|
||||||
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
|
|
||||||
return
|
|
||||||
iprot.readStructBegin()
|
|
||||||
while True:
|
|
||||||
(fname, ftype, fid) = iprot.readFieldBegin()
|
|
||||||
if ftype == TType.STOP:
|
|
||||||
break
|
|
||||||
if fid == 1:
|
|
||||||
if ftype == TType.STRING:
|
|
||||||
self.message = iprot.readString();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
iprot.readFieldEnd()
|
|
||||||
iprot.readStructEnd()
|
|
||||||
|
|
||||||
def write(self, oprot):
|
|
||||||
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
|
|
||||||
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
|
|
||||||
return
|
|
||||||
oprot.writeStructBegin('NotFound')
|
|
||||||
if self.message != None:
|
|
||||||
oprot.writeFieldBegin('message', TType.STRING, 1)
|
|
||||||
oprot.writeString(self.message)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
oprot.writeFieldStop()
|
|
||||||
oprot.writeStructEnd()
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return str(self.__dict__)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return repr(self.__dict__)
|
|
||||||
|
|
||||||
def __eq__(self, other):
|
|
||||||
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
|
|
||||||
|
|
||||||
def __ne__(self, other):
|
|
||||||
return not (self == other)
|
|
||||||
|
|
||||||
class AlreadyExists(Exception):
|
|
||||||
|
|
||||||
thrift_spec = (
|
|
||||||
None, # 0
|
|
||||||
(1, TType.STRING, 'message', None, None, ), # 1
|
|
||||||
)
|
|
||||||
|
|
||||||
def __init__(self, d=None):
|
|
||||||
self.message = None
|
|
||||||
if isinstance(d, dict):
|
|
||||||
if 'message' in d:
|
|
||||||
self.message = d['message']
|
|
||||||
|
|
||||||
def read(self, iprot):
|
|
||||||
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
|
|
||||||
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
|
|
||||||
return
|
|
||||||
iprot.readStructBegin()
|
|
||||||
while True:
|
|
||||||
(fname, ftype, fid) = iprot.readFieldBegin()
|
|
||||||
if ftype == TType.STOP:
|
|
||||||
break
|
|
||||||
if fid == 1:
|
|
||||||
if ftype == TType.STRING:
|
|
||||||
self.message = iprot.readString();
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
else:
|
|
||||||
iprot.skip(ftype)
|
|
||||||
iprot.readFieldEnd()
|
|
||||||
iprot.readStructEnd()
|
|
||||||
|
|
||||||
def write(self, oprot):
|
|
||||||
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
|
|
||||||
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
|
|
||||||
return
|
|
||||||
oprot.writeStructBegin('AlreadyExists')
|
|
||||||
if self.message != None:
|
|
||||||
oprot.writeFieldBegin('message', TType.STRING, 1)
|
|
||||||
oprot.writeString(self.message)
|
|
||||||
oprot.writeFieldEnd()
|
|
||||||
oprot.writeFieldStop()
|
|
||||||
oprot.writeStructEnd()
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return str(self.__dict__)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return repr(self.__dict__)
|
|
||||||
|
|
||||||
def __eq__(self, other):
|
|
||||||
return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
|
|
||||||
|
|
||||||
def __ne__(self, other):
|
|
||||||
return not (self == other)
|
|
||||||
|
|
|
@ -1,107 +0,0 @@
|
||||||
hbrep is a tool for replicating data from postgresql tables to hbase tables.
|
|
||||||
|
|
||||||
Dependancies:
|
|
||||||
- python 2.4
|
|
||||||
- hbase 0.2.0
|
|
||||||
- skytools 2.1.7
|
|
||||||
- postgresql
|
|
||||||
|
|
||||||
It has two main functions.
|
|
||||||
- bootstrap, which bootstraps all the data from specified columns of a table
|
|
||||||
- play, which processes incoming insert, update and delete events and applies them to hbase.
|
|
||||||
|
|
||||||
Example usage:
|
|
||||||
install triggers:
|
|
||||||
./hbrep.py hbrep.ini install schema1.table1 schema2.table2
|
|
||||||
now that future updates are queuing, bootstrap the tables.
|
|
||||||
./hbrep.py hbrep.ini bootstrap schema1.table1 schema2.table2
|
|
||||||
start pgq ticker
|
|
||||||
pgqadm.py pgq.ini ticker
|
|
||||||
play our queue consumer
|
|
||||||
./hbrep.py hbrep.ini play schema1.table1 schema2.table2
|
|
||||||
|
|
||||||
|
|
||||||
More details follow.
|
|
||||||
|
|
||||||
|
|
||||||
All functions require an ini file (say hbrep.ini) with a HBaseReplic section, and a section for each postgresql table you wish to replicate containing the table mapping. Note the table mapping section names should match the name of the postgresql table.
|
|
||||||
|
|
||||||
eg. ini file:
|
|
||||||
####################
|
|
||||||
[HBaseReplic]
|
|
||||||
job_name = hbase_replic_job
|
|
||||||
logfile = %(job_name)s.log
|
|
||||||
pidfile = %(job_name)s.pid
|
|
||||||
postgresql_db = dbname=source_database user=dbuser
|
|
||||||
pgq_queue_name = hbase_replic_queue
|
|
||||||
hbase_hostname = localhost
|
|
||||||
hbase_port = 9090
|
|
||||||
# If omitted, default is 10000
|
|
||||||
max_batch_size = 10000
|
|
||||||
# file to use when copying a table, if omitted a select columns will be done instead.
|
|
||||||
bootstrap_tmpfile = tabledump.dat
|
|
||||||
|
|
||||||
# For each table mapping, there must be the same number psql_columns as hbase_column_descriptors
|
|
||||||
[public.users]
|
|
||||||
psql_schema = public
|
|
||||||
psql_table_name = users
|
|
||||||
psql_key_column = user_id
|
|
||||||
psql_columns = dob
|
|
||||||
hbase_table_name = stuff
|
|
||||||
hbase_column_descriptors = users:dob
|
|
||||||
hbase_row_prefix = user_id:
|
|
||||||
####################
|
|
||||||
|
|
||||||
Bootstrapping:
|
|
||||||
To bootstrap the public.users table from postgresql to hbase,
|
|
||||||
|
|
||||||
./hbrep.py hbrep.ini bootstrap public.users
|
|
||||||
|
|
||||||
you can specify multiple tables as arguments.
|
|
||||||
|
|
||||||
|
|
||||||
Play:
|
|
||||||
This mode uses pgq from the skytools package to create and manage event queues on postgresql.
|
|
||||||
You need to have pgq installed on the database you are replicating.
|
|
||||||
|
|
||||||
With a pgq.ini file like this:
|
|
||||||
####################
|
|
||||||
[pgqadm]
|
|
||||||
job_name = sourcedb_ticker
|
|
||||||
db = dbname=source_database user=dbuser
|
|
||||||
# how often to run maintenance [minutes]
|
|
||||||
maint_delay_min = 1
|
|
||||||
# how often to check for activity [secs]
|
|
||||||
loop_delay = 0.2
|
|
||||||
logfile = %(job_name)s.log
|
|
||||||
pidfile = %(job_name)s.pid
|
|
||||||
use_skylog = 0
|
|
||||||
####################
|
|
||||||
|
|
||||||
You install pgq on the database by,
|
|
||||||
|
|
||||||
pgqadm.py pgq.ini install
|
|
||||||
|
|
||||||
Next you install hbrep.
|
|
||||||
|
|
||||||
hbrep.py hbrep.ini install public.users
|
|
||||||
|
|
||||||
This creates a queue using pgq, which in this case will be called hbase_replic_queue. It also registers the hbrep consumer (called HBaseReplic) with that queue. Then finally it creates triggers on each table specified to add an event for each insert, update or delete.
|
|
||||||
|
|
||||||
Start the pgq event ticker,
|
|
||||||
|
|
||||||
pgqadm.py pgq.ini ticker
|
|
||||||
|
|
||||||
Finally, run the hbreplic consumer
|
|
||||||
./hbrep.py hbrep.ini play public.users
|
|
||||||
|
|
||||||
Now any inserts, updates or deletes on the postgresql users table will be processed and sent to the
|
|
||||||
hbase table.
|
|
||||||
|
|
||||||
|
|
||||||
uninstall:
|
|
||||||
You can remove the triggers from a table by
|
|
||||||
./hbrep.py hbrep.ini uninstall public.users
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,190 +0,0 @@
|
||||||
import sys, os
|
|
||||||
|
|
||||||
import pgq, pgq.producer
|
|
||||||
import skytools
|
|
||||||
|
|
||||||
from HBaseConnection import *
|
|
||||||
import tablemapping
|
|
||||||
|
|
||||||
class HBaseBootstrap(skytools.DBScript):
|
|
||||||
"""Bootstrapping script for loading columns from a table in postgresql to hbase."""
|
|
||||||
|
|
||||||
def __init__(self, service_name, args):
|
|
||||||
# This will process any options eg -k -v -d
|
|
||||||
skytools.DBScript.__init__(self, service_name, args)
|
|
||||||
|
|
||||||
config_file = self.args[0]
|
|
||||||
if len(self.args) < 2:
|
|
||||||
print "need table names"
|
|
||||||
sys.exit(1)
|
|
||||||
else:
|
|
||||||
self.table_names = self.args[1:]
|
|
||||||
|
|
||||||
#just to check this option exists
|
|
||||||
self.cf.get("postgresql_db")
|
|
||||||
|
|
||||||
self.max_batch_size = int(self.cf.get("max_batch_size", "10000"))
|
|
||||||
self.hbase_hostname = self.cf.get("hbase_hostname", "localhost")
|
|
||||||
self.hbase_port = int(self.cf.get("hbase_port", "9090"))
|
|
||||||
self.table_mappings = tablemapping.load_table_mappings(config_file, self.table_names)
|
|
||||||
|
|
||||||
def startup(self):
|
|
||||||
# make sure the script loops only once.
|
|
||||||
self.set_single_loop(1)
|
|
||||||
self.log.info("Starting " + self.job_name)
|
|
||||||
|
|
||||||
def work(self):
|
|
||||||
for t in self.table_names:
|
|
||||||
self.bootstrap_table(t)
|
|
||||||
|
|
||||||
def bootstrap_table(self, table_name):
|
|
||||||
try:
|
|
||||||
self.log.info("Bootstrapping table %s" % table_name)
|
|
||||||
hbase = HBaseConnection(self.hbase_hostname, self.hbase_port)
|
|
||||||
try:
|
|
||||||
table_mapping = self.table_mappings[table_name]
|
|
||||||
|
|
||||||
self.log.debug("Connecting to HBase")
|
|
||||||
hbase.connect()
|
|
||||||
|
|
||||||
# Fetch postgresql cursor
|
|
||||||
self.log.debug("Getting postgresql cursor")
|
|
||||||
db = self.get_database("postgresql_db")
|
|
||||||
curs = db.cursor()
|
|
||||||
|
|
||||||
hbase.validate_table_name(table_mapping.hbase_table_name)
|
|
||||||
hbase.validate_column_descriptors(table_mapping.hbase_table_name, table_mapping.hbase_column_descriptors)
|
|
||||||
|
|
||||||
try:
|
|
||||||
dump_file = self.cf.get("bootstrap_tmpfile")
|
|
||||||
except:
|
|
||||||
dump_file = None
|
|
||||||
|
|
||||||
if dump_file != None:
|
|
||||||
row_source = CopiedRows(self.log, curs, dump_file)
|
|
||||||
else:
|
|
||||||
row_source = SelectedRows(self.log, curs)
|
|
||||||
|
|
||||||
table_name = table_mapping.psql_schema+"."+table_mapping.psql_table_name
|
|
||||||
# we are careful to make sure that the first column will be the key.
|
|
||||||
column_list = [table_mapping.psql_key_column] + table_mapping.psql_columns
|
|
||||||
|
|
||||||
# Load the rows either via a select or via a table copy to file.
|
|
||||||
# Either way, it does not load it all into memory.
|
|
||||||
# copy is faster, but may incorrectly handle data with tabs in it.
|
|
||||||
row_source.load_rows(table_name, column_list)
|
|
||||||
|
|
||||||
# max number of rows to fetch at once
|
|
||||||
batch_size = self.max_batch_size
|
|
||||||
total_rows = 0L
|
|
||||||
|
|
||||||
self.log.debug("Starting puts to hbase")
|
|
||||||
rows = row_source.get_rows(batch_size)
|
|
||||||
while rows != []:
|
|
||||||
batches = []
|
|
||||||
for row in rows:
|
|
||||||
batches.append(self.createRowBatch(table_mapping, row))
|
|
||||||
|
|
||||||
hbase.client.mutateRows(table_mapping.hbase_table_name, batches)
|
|
||||||
total_rows = total_rows + len(batches)
|
|
||||||
self.log.debug("total rows put = %d" % (total_rows))
|
|
||||||
# get next batch of rows
|
|
||||||
rows = row_source.get_rows(batch_size)
|
|
||||||
|
|
||||||
self.log.info("total rows put = %d" % (total_rows))
|
|
||||||
self.log.info("Bootstrapping table %s complete" % table_name)
|
|
||||||
|
|
||||||
|
|
||||||
except Exception, e:
|
|
||||||
#self.log.info(e)
|
|
||||||
sys.exit(e)
|
|
||||||
|
|
||||||
finally:
|
|
||||||
hbase.disconnect()
|
|
||||||
|
|
||||||
def createRowBatch(self, table_mapping, row):
|
|
||||||
batch = BatchMutation()
|
|
||||||
batch.row = table_mapping.hbase_row_prefix + str(row[0])
|
|
||||||
batch.mutations = []
|
|
||||||
for column, value in zip(table_mapping.hbase_column_descriptors, row[1:]):
|
|
||||||
if value != 'NULL' and value != None:
|
|
||||||
m = Mutation()
|
|
||||||
m.column = column
|
|
||||||
m.value = str(value)
|
|
||||||
batch.mutations.append(m)
|
|
||||||
return batch
|
|
||||||
|
|
||||||
|
|
||||||
## Helper classes to fetch rows from a select, or from a table dumped by copy
|
|
||||||
|
|
||||||
class RowSource:
|
|
||||||
""" Base class for fetching rows from somewhere. """
|
|
||||||
|
|
||||||
def __init__(self, log):
|
|
||||||
self.log = log
|
|
||||||
|
|
||||||
def make_column_str(self, column_list):
|
|
||||||
i = 0
|
|
||||||
while i < len(column_list):
|
|
||||||
column_list[i] = '"%s"' % column_list[i]
|
|
||||||
i += 1
|
|
||||||
return ",".join(column_list)
|
|
||||||
|
|
||||||
|
|
||||||
class CopiedRows(RowSource):
|
|
||||||
"""
|
|
||||||
Class for fetching rows from a postgresql database,
|
|
||||||
rows are dumped to a copied to a file first
|
|
||||||
"""
|
|
||||||
def __init__(self, log, curs, dump_file):
|
|
||||||
RowSource.__init__(self, log)
|
|
||||||
self.dump_file = dump_file
|
|
||||||
# Set DBAPI-2.0 cursor
|
|
||||||
self.curs = curs
|
|
||||||
|
|
||||||
def load_rows(self, table_name, column_list):
|
|
||||||
columns = self.make_column_str(column_list)
|
|
||||||
self.log.debug("starting dump to file:%s. table:%s. columns:%s" % (self.dump_file, table_name, columns))
|
|
||||||
dump_out = open(self.dump_file, 'w')
|
|
||||||
self.curs.copy_to(dump_out, table_name + "(%s)" % columns, '\t', 'NULL')
|
|
||||||
dump_out.close()
|
|
||||||
self.log.debug("table %s dump complete" % table_name)
|
|
||||||
|
|
||||||
self.dump_in = open(self.dump_file, 'r')
|
|
||||||
|
|
||||||
def get_rows(self, no_of_rows):
|
|
||||||
rows = []
|
|
||||||
if not self.dump_in.closed:
|
|
||||||
for line in self.dump_in:
|
|
||||||
rows.append(line.split())
|
|
||||||
if len(rows) >= no_of_rows:
|
|
||||||
break
|
|
||||||
if rows == []:
|
|
||||||
self.dump_in.close()
|
|
||||||
return rows
|
|
||||||
|
|
||||||
|
|
||||||
class SelectedRows(RowSource):
|
|
||||||
"""
|
|
||||||
Class for fetching rows from a postgresql database,
|
|
||||||
rows are fetched via a select on the entire table.
|
|
||||||
"""
|
|
||||||
def __init__(self, log, curs):
|
|
||||||
RowSource.__init__(self, log)
|
|
||||||
# Set DBAPI-2.0 cursor
|
|
||||||
self.curs = curs
|
|
||||||
|
|
||||||
def load_rows(self, table_name, column_list):
|
|
||||||
columns = self.make_column_str(column_list)
|
|
||||||
q = "SELECT %s FROM %s" % (columns,table_name)
|
|
||||||
self.log.debug("Executing query %s" % q)
|
|
||||||
self.curs.execute(q)
|
|
||||||
self.log.debug("query finished")
|
|
||||||
|
|
||||||
def get_rows(self, no_of_rows):
|
|
||||||
return self.curs.fetchmany(no_of_rows)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
bootstrap = HBaseBootstrap("HBaseReplic",sys.argv[1:])
|
|
||||||
bootstrap.start()
|
|
|
@ -1,22 +0,0 @@
|
||||||
[HBaseReplic]
|
|
||||||
job_name = hbase_replic_job
|
|
||||||
logfile = %(job_name)s.log
|
|
||||||
pidfile = %(job_name)s.pid
|
|
||||||
postgresql_db = dbname=source_database user=dbuser
|
|
||||||
pgq_queue_name = hbase_replic_queue
|
|
||||||
hbase_hostname = localhost
|
|
||||||
hbase_port = 9090
|
|
||||||
# If omitted, default is 10000
|
|
||||||
max_batch_size = 10000
|
|
||||||
# file to use when copying a table, if omitted a select columns will be done instead.
|
|
||||||
bootstrap_tmpfile = tabledump.dat
|
|
||||||
|
|
||||||
# For each table mapping, there must be the same number psql_columns as hbase_column_descriptors
|
|
||||||
[public.users]
|
|
||||||
psql_schema = public
|
|
||||||
psql_table_name = users
|
|
||||||
psql_key_column = user_id
|
|
||||||
psql_columns = dob
|
|
||||||
hbase_table_name = stuff
|
|
||||||
hbase_column_descriptors = users:dob
|
|
||||||
hbase_row_prefix = user_id:
|
|
|
@ -1,126 +0,0 @@
|
||||||
#!/usr/bin/env python
|
|
||||||
import sys, os
|
|
||||||
|
|
||||||
import pgq, pgq.producer
|
|
||||||
import skytools, skytools._pyquoting
|
|
||||||
|
|
||||||
from bootstrap import HBaseBootstrap
|
|
||||||
from HBaseConsumer import HBaseConsumer
|
|
||||||
|
|
||||||
command_usage = """
|
|
||||||
%prog [options] inifile command [tablenames]
|
|
||||||
|
|
||||||
commands:
|
|
||||||
play Run event consumer to update specified tables with hbase.
|
|
||||||
bootstrap Bootstrap specified tables args into hbase.
|
|
||||||
install Setup the pgq queue, and install trigger on each table.
|
|
||||||
uninstall Remove the triggers from each specified table.
|
|
||||||
"""
|
|
||||||
|
|
||||||
class HBaseReplic(skytools.DBScript):
|
|
||||||
def __init__(self, service_name, args):
|
|
||||||
try:
|
|
||||||
self.run_script = 0
|
|
||||||
|
|
||||||
# This will process any options eg -k -v -d
|
|
||||||
skytools.DBScript.__init__(self, service_name, args)
|
|
||||||
|
|
||||||
self.config_file = self.args[0]
|
|
||||||
|
|
||||||
if len(self.args) < 2:
|
|
||||||
self.print_usage()
|
|
||||||
print "need command"
|
|
||||||
sys.exit(0)
|
|
||||||
cmd = self.args[1]
|
|
||||||
|
|
||||||
if not cmd in ["play","bootstrap","install", "uninstall"]:
|
|
||||||
self.print_usage()
|
|
||||||
print "unknown command"
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
if len(self.args) < 3:
|
|
||||||
self.print_usage()
|
|
||||||
print "need table names"
|
|
||||||
sys.exit(0)
|
|
||||||
else:
|
|
||||||
self.table_names = self.args[2:]
|
|
||||||
|
|
||||||
if cmd == "play":
|
|
||||||
self.run_script = HBaseConsumer(service_name, [self.config_file] + self.table_names)
|
|
||||||
elif cmd == "bootstrap":
|
|
||||||
self.run_script = HBaseBootstrap(service_name, [self.config_file] + self.table_names)
|
|
||||||
elif cmd == "install":
|
|
||||||
self.work = self.do_install
|
|
||||||
elif cmd == "uninstall":
|
|
||||||
self.work = self.do_uninstall
|
|
||||||
|
|
||||||
except Exception, e:
|
|
||||||
sys.exit(e)
|
|
||||||
|
|
||||||
def print_usage(self):
|
|
||||||
print "Usage: " + command_usage
|
|
||||||
|
|
||||||
def init_optparse(self, parser=None):
|
|
||||||
p = skytools.DBScript.init_optparse(self, parser)
|
|
||||||
p.set_usage(command_usage.strip())
|
|
||||||
return p
|
|
||||||
|
|
||||||
def start(self):
|
|
||||||
if self.run_script:
|
|
||||||
self.run_script.start()
|
|
||||||
else:
|
|
||||||
skytools.DBScript.start(self)
|
|
||||||
|
|
||||||
def startup(self):
|
|
||||||
# make sure the script loops only once.
|
|
||||||
self.set_single_loop(1)
|
|
||||||
|
|
||||||
def do_install(self):
|
|
||||||
try:
|
|
||||||
queue_name = self.cf.get("pgq_queue_name")
|
|
||||||
consumer = self.job_name
|
|
||||||
|
|
||||||
self.log.info('Creating queue: %s' % queue_name)
|
|
||||||
self.exec_sql("select pgq.create_queue(%s)", [queue_name])
|
|
||||||
|
|
||||||
self.log.info('Registering consumer %s on queue %s' % (consumer, queue_name))
|
|
||||||
self.exec_sql("select pgq.register_consumer(%s, %s)", [queue_name, consumer])
|
|
||||||
|
|
||||||
for table_name in self.table_names:
|
|
||||||
self.log.info('Creating trigger hbase_replic on table %s' % (table_name))
|
|
||||||
q = """
|
|
||||||
CREATE TRIGGER hbase_replic
|
|
||||||
AFTER INSERT OR UPDATE OR DELETE
|
|
||||||
ON %s
|
|
||||||
FOR EACH ROW
|
|
||||||
EXECUTE PROCEDURE pgq.logutriga('%s')"""
|
|
||||||
self.exec_sql(q % (table_name, queue_name), [])
|
|
||||||
except Exception, e:
|
|
||||||
sys.exit(e)
|
|
||||||
|
|
||||||
def do_uninstall(self):
|
|
||||||
try:
|
|
||||||
queue_name = self.cf.get("pgq_queue_name")
|
|
||||||
consumer = "HBaseReplic"
|
|
||||||
|
|
||||||
#self.log.info('Unregistering consumer %s on queue %s' % (consumer, queue_name))
|
|
||||||
#self.exec_sql("select pgq.unregister_consumer(%s, %s)", [queue_name, consumer])
|
|
||||||
|
|
||||||
for table_name in self.table_names:
|
|
||||||
self.log.info('Dropping trigger hbase_replic on table %s' % (table_name))
|
|
||||||
q = "DROP TRIGGER hbase_replic ON %s" % table_name
|
|
||||||
self.exec_sql(q, [])
|
|
||||||
|
|
||||||
except Exception, e:
|
|
||||||
sys.exit(e)
|
|
||||||
|
|
||||||
def exec_sql(self, q, args):
|
|
||||||
self.log.debug(q)
|
|
||||||
db = self.get_database('postgresql_db')
|
|
||||||
curs = db.cursor()
|
|
||||||
curs.execute(q, args)
|
|
||||||
db.commit()
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
script = HBaseReplic("HBaseReplic",sys.argv[1:])
|
|
||||||
script.start()
|
|
|
@ -1,10 +0,0 @@
|
||||||
[pgqadm]
|
|
||||||
job_name = sourcedb_ticker
|
|
||||||
db = dbname=source_database user=dbuser
|
|
||||||
# how often to run maintenance [minutes]
|
|
||||||
maint_delay_min = 1
|
|
||||||
# how often to check for activity [secs]
|
|
||||||
loop_delay = 0.2
|
|
||||||
logfile = %(job_name)s.log
|
|
||||||
pidfile = %(job_name)s.pid
|
|
||||||
use_skylog = 0
|
|
|
@ -1,33 +0,0 @@
|
||||||
import sys, os
|
|
||||||
from skytools.config import *
|
|
||||||
|
|
||||||
PSQL_SCHEMA = "psql_schema"
|
|
||||||
PSQL_TABLENAME = "psql_table_name"
|
|
||||||
PSQL_KEYCOL = "psql_key_column"
|
|
||||||
PSQL_COLUMNS = "psql_columns"
|
|
||||||
HBASE_TABLENAME = "hbase_table_name"
|
|
||||||
HBASE_COLUMNDESCS = "hbase_column_descriptors"
|
|
||||||
HBASE_ROWPREFIX = "hbase_row_prefix"
|
|
||||||
|
|
||||||
def load_table_mappings(config_file, table_names):
|
|
||||||
table_mappings = {}
|
|
||||||
for table_name in table_names:
|
|
||||||
conf = Config(table_name, config_file)
|
|
||||||
table_mappings[table_name] = PSqlHBaseTableMapping(conf)
|
|
||||||
return table_mappings
|
|
||||||
|
|
||||||
class PSqlHBaseTableMapping:
|
|
||||||
# conf can be anything with a get function eg, a dictionary
|
|
||||||
def __init__(self, conf):
|
|
||||||
self.psql_schema = conf.get(PSQL_SCHEMA)
|
|
||||||
self.psql_table_name = conf.get(PSQL_TABLENAME)
|
|
||||||
self.psql_key_column = conf.get(PSQL_KEYCOL)
|
|
||||||
self.psql_columns = conf.get(PSQL_COLUMNS).split()
|
|
||||||
self.hbase_table_name = conf.get(HBASE_TABLENAME)
|
|
||||||
self.hbase_column_descriptors = conf.get(HBASE_COLUMNDESCS).split()
|
|
||||||
self.hbase_row_prefix = conf.get(HBASE_ROWPREFIX)
|
|
||||||
|
|
||||||
if len(self.psql_columns) != len(self.hbase_column_descriptors):
|
|
||||||
raise Exception("psql_columns and hbase_column_descriptors must have same length")
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue