From 8a2cef3315516501627c7a30bdcf989b12a32303 Mon Sep 17 00:00:00 2001 From: g-bhardwaj Date: Sun, 25 Oct 2015 17:29:35 +0530 Subject: [PATCH] HBASE-13867: Add endpoint coprocessor guide to HBase book. --- src/main/asciidoc/_chapters/cp.adoc | 897 +++++++++++++++++++++++----- 1 file changed, 753 insertions(+), 144 deletions(-) diff --git a/src/main/asciidoc/_chapters/cp.adoc b/src/main/asciidoc/_chapters/cp.adoc index a99e903c3ee..06930d1b423 100644 --- a/src/main/asciidoc/_chapters/cp.adoc +++ b/src/main/asciidoc/_chapters/cp.adoc @@ -27,203 +27,812 @@ :icons: font :experimental: -HBase coprocessors are modeled after the coprocessors which are part of Google's BigTable (http://www.scribd.com/doc/21631448/Dean-Keynote-Ladis2009, pages 66-67.). Coprocessors function in a similar way to Linux kernel modules. -They provide a way to run server-level code against locally-stored data. -The functionality they provide is very powerful, but also carries great risk and can have adverse effects on the system, at the level of the operating system. -The information in this chapter is primarily sourced and heavily reused from Mingjie Lai's blog post at https://blogs.apache.org/hbase/entry/coprocessor_introduction. +HBase Coprocessors are modeled after the Coprocessors which are part of Google's BigTable +(http://research.google.com/people/jeff/SOCC2010-keynote-slides.pdf pages 41-42.). + +Coprocessor is a framework that provides an easy way to run your custom code directly on +Region Server. +The information in this chapter is primarily sourced and heavily reused from: + +. Mingjie Lai's blog post +link:https://blogs.apache.org/hbase/entry/coprocessor_introduction[Coprocessor Introduction]. +. Gaurav Bhardwaj's blog post +link:http://www.3pillarglobal.com/insights/hbase-coprocessors[The How To Of HBase Coprocessors]. + -Coprocessors are not designed to be used by end users of HBase, but by HBase developers who need to add specialized functionality to HBase. -One example of the use of coprocessors is pluggable compaction and scan policies, which are provided as coprocessors in link:https://issues.apache.org/jira/browse/HBASE-6427[HBASE-6427]. == Coprocessor Framework -The implementation of HBase coprocessors diverges from the BigTable implementation. -The HBase framework provides a library and runtime environment for executing user code within the HBase region server and master processes. +When working with any data store (like RDBMS or HBase) you fetch the data (in case of RDBMS you +might use SQL query and in case of HBase you use either Get or Scan). To fetch only relevant data +you filter it (for RDBMS you put conditions in 'WHERE' predicate and in HBase you use +link:http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/Filter.html[Filter]). +After fetching the desired data, you perform your business computation on the data. +This scenario is close to ideal for "small data", where few thousand rows and a bunch of columns +are returned from the data store. Now imagine a scenario where there are billions of rows and +millions of columns and you want to perform some computation which requires all the data, like +calculating average or sum. Even if you are interested in just few columns, you still have to +fetch all the rows. There are a few drawbacks in this approach as described below: -The framework API is provided in the link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/coprocessor/package-summary.html[coprocessor] package. +. In this approach the data transfer (from data store to client side) will become the bottleneck, +and the time required to complete the operation is limited by the rate at which data transfer +takes place. +. It's not always possible to hold so much data in memory and perform computation. +. Bandwidth is one of the most precious resources in any data center. Operations like this may +saturate your data center’s bandwidth and will severely impact the performance of your cluster. +. Your client code is becoming thick as you are maintaining the code for calculating average or +summation on client side. Not a major drawback when talking of severe issues like +performance/bandwidth but still worth giving consideration. -Two different types of coprocessors are provided by the framework, based on their scope. +In a scenario like this it's better to move the computation (i.e. user's custom code) to the data +itself (Region Server). Coprocessor helps you achieve this but you can do more than that. +There is another advantage that your code runs in parallel (i.e. on all Regions). +To give an idea of Coprocessor's capabilities, different people give different analogies. +The three most famous analogies for Coprocessor are: +[[cp_analogies]] +Triggers and Stored Procedure:: This is the most common analogy for Coprocessor. Observer +Coprocessor is compared to triggers because like triggers they execute your custom code when +certain event occurs (like Get or Put etc.). Similarly Endpoints Coprocessor is compared to the +stored procedures and you can perform custom computation on data directly inside the region server. -.Types of Coprocessors +MapReduce:: As in MapReduce you move the computation to the data in the same way. Coprocessor +executes your custom computation directly on Region Servers, i.e. where data resides. That's why +some people compare Coprocessor to a small MapReduce jobs. -System Coprocessors:: - System coprocessors are loaded globally on all tables and regions hosted by a region server. +AOP:: Some people compare it to _Aspect Oriented Programming_ (AOP). As in AOP, you apply advice +(on occurrence of specific event) by intercepting the request and then running some custom code +(probably cross-cutting concerns) and then forwarding the request on its path as if nothing +happened (or even return it back). Similarly in Coprocessor you have this facility of intercepting +the request and running custom code and then forwarding it on its path (or returning it). -Table Coprocessors:: - You can specify which coprocessors should be loaded on all regions for a table on a per-table basis. -The framework provides two different aspects of extensions as well: _observers_ and _endpoints_. +Although Coprocessor derives its roots from Google's Bigtable but it deviates from it largely in +its design. Currently there are efforts going on to bridge this gap. For more information see +link:https://issues.apache.org/jira/browse/HBASE-4047[HBASE-4047]. -Observers:: - Observers are analogous to triggers in conventional databases. - They allow you to insert user code by overriding upcall methods provided by the coprocessor framework. - Callback functions are executed from core HBase code when events occur. - Callbacks are handled by the framework, and the coprocessor itself only needs to insert the extended or alternate functionality. +In HBase, to implement a Coprocessor certain steps must be followed as described below: -Endpoints (HBase 0.96.x and later):: - The implementation for endpoints changed significantly in HBase 0.96.x due to the introduction of protocol buffers (protobufs) (link:https://issues.apache.org/jira/browse/HBASE-5448[HBASE-5488]). If you created endpoints before 0.96.x, you will need to rewrite them. - Endpoints are now defined and callable as protobuf services, rather than endpoint invocations passed through as Writable blobs +. Either your class should extend one of the Coprocessor classes (like +// Below URL is more than 100 characters long. +link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.html[BaseRegionObserver] +) or it should implement Coprocessor interfaces (like +link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/Coprocessor.html[Coprocessor], +// Below URL is more than 100 characters long. +link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/CoprocessorService.html[CoprocessorService]). -Endpoints (HBase 0.94.x and earlier):: - Dynamic RPC endpoints resemble stored procedures. - An endpoint can be invoked at any time from the client. - When it is invoked, it is executed remotely at the target region or regions, and results of the executions are returned to the client. +. Load the Coprocessor: Currently there are two ways to load the Coprocessor. + +Static:: Loading from configuration +Dynammic:: Loading via 'hbase shell' or via Java code using HTableDescriptor class). + +For more details see <>. -== Examples +. Finally your client-side code to call the Coprocessor. This is the easiest step, as HBase +handles the Coprocessor transparently and you don't have to do much to call the Coprocessor. -An example of an observer is included in _hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java_. -Several endpoint examples are included in the same directory. -== Building A Coprocessor +The framework API is provided in the +// Below URL is more than 100 characters long. +link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/coprocessor/package-summary.html[coprocessor] +package. + +Coprocessors are not designed to be used by the end users but by developers. Coprocessors are +executed directly on region server; therefore a faulty/malicious code can bring your region server +down. Currently there is no mechanism to prevent this, but there are efforts going on for this. +For more, see link:https://issues.apache.org/jira/browse/HBASE-4047[HBASE-4047]. + +Two different types of Coprocessors are provided by the framework, based on their functionality. -Before you can build a processor, it must be developed, compiled, and packaged in a JAR file. -The next step is to configure the coprocessor framework to use your coprocessor. -You can load the coprocessor from your HBase configuration, so that the coprocessor starts with HBase, or you can configure the coprocessor from the HBase shell, as a table attribute, so that it is loaded dynamically when the table is opened or reopened. -=== Load from Configuration -To configure a coprocessor to be loaded when HBase starts, modify the RegionServer's _hbase-site.xml_ and configure one of the following properties, based on the type of observer you are configuring: +== Types of Coprocessors -* `hbase.coprocessor.region.classes`for RegionObservers and Endpoints -* `hbase.coprocessor.wal.classes`for WALObservers -* `hbase.coprocessor.master.classes`for MasterObservers +Coprocessor can be broadly divided into two categories: Observer and Endpoint. -.Example RegionObserver Configuration -==== -In this example, one RegionObserver is configured for all the HBase tables. +=== Observer +Observer Coprocessor are easy to understand. People coming from RDBMS background can compare them +to the triggers available in relational databases. Folks coming from programming background can +visualize it like advice (before and after only) available in AOP (Aspect Oriented Programming). +See <> + +Coprocessors allows you to hook your custom code in two places during the life cycle of an event. + +First is just _before_ the occurrence of the event (just like 'before' advice in AOP or triggers +like 'before update'). All methods providing this kind feature will start with the prefix `pre`. + +For example if you want your custom code to get executed just before the `Put` operation, you can +use the override the +// Below URL is more than 100 characters long. +link:http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html#prePut%28org.apache.hadoop.hbase.coprocessor.ObserverContext,%20org.apache.hadoop.hbase.client.Put,%20org.apache.hadoop.hbase.regionserver.wal.WALEdit,%20org.apache.hadoop.hbase.client.Durability%29[`prePut`] +method of +// Below URL is more than 100 characters long. +link:http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html[RegionCoprocessor]. +This method has following signature: +[source,java] +---- +public void prePut (final ObserverContext e, final Put put, final WALEdit edit,final Durability +durability) throws IOException; +---- +Secondly, the Observer Coprocessor also provides hooks for your code to get executed just _after_ +the occurrence of the event (similar to after advice in AOP terminology or 'after update' triggers +). The methods giving this functionality will start with the prefix `post`. For example, if you +want your code to be executed after the 'Put' operation, you should consider overriding +// Below URL is more than 100 characters long. +link:http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html#postPut%28org.apache.hadoop.hbase.coprocessor.ObserverContext,%20org.apache.hadoop.hbase.client.Put,%20org.apache.hadoop.hbase.regionserver.wal.WALEdit,%20org.apache.hadoop.hbase.client.Durability%29[`postPut`] +method of +// Below URL is more than 100 characters long. +link:http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html[RegionCoprocessor]: +[source,java] +---- +public void postPut(final ObserverContext e, final Put put, final WALEdit edit, final Durability +durability) throws IOException; +---- + +In short, the following conventions are generally followed: + +Override _preXXX()_ method if you want your code to be executed just before the occurrence of the +event. + +Override _postXXX()_ method if you want your code to be executed just after the occurrence of the +event. + + +.Use Cases for Observer Coprocessors: +Few use cases of the Observer Coprocessor are: + +. *Security*: Before performing any operation (like 'Get', 'Put') you can check for permission in +the 'preXXX' methods. + +. *Referential Integrity*: Unlike traditional RDBMS, HBase doesn't have the concept of referential +integrity (foreign key). Suppose for example you have a requirement that whenever you insert a +record in 'users' table, a corresponding entry should also be created in 'user_daily_attendance' +table. One way you could solve this is by using two 'Put' one for each table, this way you are +throwing the responsibility (of the referential integrity) to the user. A better way is to use +Coprocessor and overriding 'postPut' method in which you write the code to insert the record in +'user_daily_attendance' table. This way client code is more lean and clean. + +. *Secondary Index*: Coprocessor can be used to maintain secondary indexes. For more information +see link:http://wiki.apache.org/hadoop/Hbase/SecondaryIndexing[SecondaryIndexing]. + + +==== Types of Observer Coprocessor + +Observer Coprocessor comes in following flavors: + +. *RegionObserver*: This Coprocessor provides the facility to hook your code when the events on +region are triggered. Most common example include 'preGet' and 'postGet' for 'Get' operation and +'prePut' and 'postPut' for 'Put' operation. For exhaustive list of supported methods (events) see +// Below URL is more than 100 characters long. +link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html[RegionObserver]. + +. *Region Server Observer*: Provides hook for the events related to the RegionServer, such as +stopping the RegionServer and performing operations before or after merges, commits, or rollbacks. +For more details please refer +link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.html[RegionServerObserver]. + +. *Master Observer*: This observer provides hooks for DDL like operation, such as create, delete, +modify table. For entire list of available methods see +// Below URL is more than 100 characters long. +link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/MasterObserver.html[MasterObserver]. + +. *WAL Observer*: Provides hooks for WAL (Write-Ahead-Log) related operation. It has only two +method 'preWALWrite()' and 'postWALWrite()'. For more details see +// Below URL is more than 100 characters long. +link:http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/WALObserver.html[WALObserver]. + +For example see <> + + +=== Endpoint Coprocessor + +Endpoint Coprocessor can be compared to stored procedure found in RDBMS. +See <>. They help in performing computation which is not +possible either through Observer Coprocessor or otherwise. For example, calculating average or +summation over the entire table that spans across multiple regions. They do so by providing a hook +for your custom code and then running it across all regions. + +With Endpoints Coprocessor you can create your own dynamic RPC protocol and thus can provide +communication between client and region server, hence enabling you to run your custom code on +region server (on each region of a table). + +Unlike observer Coprocessor (where your custom code is +executed transparently when events like 'Get' operation occurs), in Endpoint Coprocessor you have +to explicitly invoke the Coprocessor by using the +// Below URL is more than 100 characters long. +link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/Table.html#coprocessorService%28java.lang.Class,%20byte%5B%5D,%20byte%5B%5D,%20org.apache.hadoop.hbase.client.coprocessor.Batch.Call%29[CoprocessorService()] +method available in +link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/Table.html[Table] +(or +// Below URL is more than 100 characters long. +link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/HTableInterface.html[HTableInterface] +or +link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/HTable.html[HTable]). + +From version 0.96, implementing Endpoint Coprocessor is not straight forward. Now it is done with +the help of Google's Protocol Buffer. For more details on Protocol Buffer, please see +link:https://developers.google.com/protocol-buffers/docs/proto[Protocol Buffer Guide]. +Endpoints Coprocessor written in version 0.94 are not compatible with with version 0.96 or later +(for more details, see +link:https://issues.apache.org/jira/browse/HBASE-5448[HBASE-5448]), +so if your are upgrading your HBase cluster from version 0.94 (or before) to 0.96 (or later) you +have to rewrite your Endpoint coprocessor. + +For example see <> + + +[[cp_loading]] +== Loading Coprocessors + +_Loading of Coprocessor refers to the process of making your custom Coprocessor implementation +available to the the HBase, so that when a requests comes in or an event takes place the desired +functionality implemented in your custom code gets executed. + +Coprocessor can be loaded broadly in two ways. One is static (loading through configuration files) +and the other one is dynamic loading (using hbase shell or java code). + +=== Static Loading +Static loading means that your Coprocessor will take effect only when you restart your HBase and +there is a reason for it. In this you make changes 'hbase-site.xml' and therefore have to restart +HBase for your changes to take place. + +Following are the steps for loading Coprocessor statically. + +. Define the Coprocessor in hbase-site.xml: Define a element which consist of two +sub elements and respectively. ++ +.. can have one of the following values: ++ +... 'hbase.coprocessor.region.classes' for RegionObservers and Endpoints. +... 'hbase.coprocessor.wal.classes' for WALObservers. +... 'hbase.coprocessor.master.classes' for MasterObservers. +.. must contain the fully qualified class name of your class implmenting the Coprocessor. ++ +For example to load a Coprocessor (implemented in class SumEndPoint.java) you have to create +following entry in RegionServer's 'hbase-site.xml' file (generally located under 'conf' directiory): ++ [source,xml] ---- - hbase.coprocessor.region.classes - org.apache.hadoop.hbase.coprocessor.AggregateImplementation + hbase.coprocessor.region.classes + org.myname.hbase.coprocessor.endpoint.SumEndPoint ---- -==== - If multiple classes are specified for loading, the class names must be comma-separated. The framework attempts to load all the configured classes using the default class loader. Therefore, the jar file must reside on the server-side HBase classpath. - ++ Coprocessors which are loaded in this way will be active on all regions of all tables. -These are the system coprocessor introduced earlier. -The first listed coprocessors will be assigned the priority `Coprocessor.Priority.SYSTEM`. -Each subsequent coprocessor in the list will have its priority value incremented by one (which reduces its priority, because priorities have the natural sort order of Integers). - -When calling out to registered observers, the framework executes their callbacks methods in the sorted order of their priority. +These are also called system Coprocessor. +The first listed Coprocessors will be assigned the priority `Coprocessor.Priority.SYSTEM`. +Each subsequent coprocessor in the list will have its priority value incremented by one (which +reduces its priority, because priorities have the natural sort order of Integers). ++ +When calling out to registered observers, the framework executes their callbacks methods in the +sorted order of their priority. + Ties are broken arbitrarily. -=== Load from the HBase Shell +. Put your code on classpth of HBase: There are various ways to do so, like adding jars on +classpath etc. One easy way to do this is to drop the jar (containing you code and all the +dependencies) in 'lib' folder of the HBase installation. -You can load a coprocessor on a specific table via a table attribute. -The following example will load the `FooRegionObserver` observer when table `t1` is read or re-read. +. Restart the HBase. -.Load a Coprocessor On a Table Using HBase Shell + +==== Unloading Static Coprocessor +Unloading static Coprocessor is easy. Following are the steps: + +. Delete the Coprocessor's entry from the 'hbase-site.xml' i.e. remove the tag. + +. Restart the Hbase. + +. Optionally remove the Coprocessor jar file from the classpath (or from the lib directory if you +copied it over there). Removing the coprocessor JARs from HBase’s classpath is a good practice. + +=== Dynamic Loading +Dynamic loading refers to the process of loading Coprocessor without restarting HBase. This may +sound better than the static loading (and in some scenarios it may) but there is a caveat, dynamic +loaded Coprocessor applies to the table only for which it was loaded while same is not true for +static loading as it applies to all the tables. Due to this difference sometimes dynamically +loaded Coprocessor are also called *Table Coprocessor* (as they applies only to a single table) +while statically loaded Coprocessor are called *System Coprocessor* (as they applies to all the +tables). + +To dynamically load the Coprocessor you have to take the table offline hence during this time you +won't be able to process any request involving this table. + +There are three ways to dynamically load Coprocessor as shown below: + +[NOTE] +.Assumptions ==== ----- -hbase(main):005:0> alter 't1', METHOD => 'table_att', - 'coprocessor'=>'hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2' -Updating all regions with the new schema... -1/1 regions updated. -Done. -0 row(s) in 1.0730 seconds +The below mentioned instructions makes the following assumptions: -hbase(main):006:0> describe 't1' -DESCRIPTION ENABLED - {NAME => 't1', coprocessor$1 => 'hdfs:///foo.jar|com.foo.FooRegio false - nObserver|1001|arg1=1,arg2=2', FAMILIES => [{NAME => 'c1', DATA_B - LOCK_ENCODING => 'NONE', BLOOMFILTER => 'NONE', REPLICATION_SCOPE - => '0', VERSIONS => '3', COMPRESSION => 'NONE', MIN_VERSIONS => - '0', TTL => '2147483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZ - E => '65536', IN_MEMORY => 'false', ENCODE_ON_DISK => 'true', BLO - CKCACHE => 'true'}, {NAME => 'f1', DATA_BLOCK_ENCODING => 'NONE', - BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', VERSIONS => '3' - , COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '2147483647' - , KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY - => 'false', ENCODE_ON_DISK => 'true', BLOCKCACHE => 'true'}]} -1 row(s) in 0.0190 seconds ----- +* A JAR called `coprocessor.jar` contains the Coprocessor implementation along with all of its +dependencies if any. +* The JAR is available in HDFS in some location like +`hdfs://:/user//coprocessor.jar`. ==== -The coprocessor framework will try to read the class information from the coprocessor table attribute value. -The value contains four pieces of information which are separated by the `|` character. +. *Using Shell*: You can load the Coprocessor using the HBase shell as follows: +.. Disable Table: Take table offline by disabling it. Suppose if the table name is 'users', then +to disable it enter following command: ++ +[source] +---- +hbase(main):001:0> disable 'users' +---- -* File path: The jar file containing the coprocessor implementation must be in a location where all region servers can read it. - You could copy the file onto the local disk on each region server, but it is recommended to store it in HDFS. -* Class name: The full class name of the coprocessor. -* Priority: An integer. - The framework will determine the execution sequence of all configured observers registered at the same hook using priorities. - This field can be left blank. - In that case the framework will assign a default priority value. -* Arguments: This field is passed to the coprocessor implementation. +.. Load the Coprocessor: The Coprocessor jar should be on HDFS and should be accessible to HBase, +to load the Coprocessor use following command: ++ +[source] +---- +hbase(main):002:0> alter 'users', METHOD => 'table_att', 'Coprocessor'=>'hdfs://:/ +user//coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823| +arg1=1,arg2=2' +---- ++ +The Coprocessor framework will try to read the class information from the coprocessor table +attribute value. +The value contains four pieces of information which are separated by the pipe (`|`) character. ++ +* File path: The jar file containing the Coprocessor implementation must be in a location where +all region servers can read it. + +You could copy the file onto the local disk on each region server, but it is recommended to store +it in HDFS. +* Class name: The full class name of the Coprocessor. +* Priority: An integer. The framework will determine the execution sequence of all configured +observers registered at the same hook using priorities. This field can be left blank. In that +case the framework will assign a default priority value. +* Arguments (Optional): This field is passed to the Coprocessor implementation. This is optional. + +.. Enable the table: To enable table type following command: ++ +---- +hbase(main):003:0> enable 'users' +---- +.. Verification: This is optional but generally good practice to see if your Coprocessor is +loaded successfully. Enter following command: ++ +---- +hbase(main):04:0> describe 'users' +---- ++ +You must see some output like this: ++ +---- +DESCRIPTION ENABLED +'users', {TABLE_ATTRIBUTES => {coprocessor$1 => true 'hdfs://:/user// +coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823|'}, {NAME => +'personalDet'..... +---- + + +. *Using setValue()* method of HTableDescriptor: This is done entirely in Java as follows: ++ +[source,java] +---- +String tableName = "users"; +String path = "hdfs://:/user//coprocessor.jar"; +Configuration conf = HBaseConfiguration.create(); +HBaseAdmin admin = new HBaseAdmin(conf); +admin.disableTable(tableName); +HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); +HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet"); +columnFamily1.setMaxVersions(3); +hTableDescriptor.addFamily(columnFamily1); +HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet"); +columnFamily2.setMaxVersions(3); +hTableDescriptor.addFamily(columnFamily2); +hTableDescriptor.setValue("COPROCESSOR$1", path + "|" ++ RegionObserverExample.class.getCanonicalName() + "|" ++ Coprocessor.PRIORITY_USER); +admin.modifyTable(tableName, hTableDescriptor); +admin.enableTable(tableName); +---- + +. *Using addCoprocessor()* method of HTableDescriptor: This method is available from 0.96 version +onwards. ++ +[source,java] +---- +String tableName = "users"; +String path = "hdfs://:/user//coprocessor.jar"; +Configuration conf = HBaseConfiguration.create(); +HBaseAdmin admin = new HBaseAdmin(conf); +admin.disableTable(tableName); +HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); +HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet"); +columnFamily1.setMaxVersions(3); +hTableDescriptor.addFamily(columnFamily1); +HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet"); +columnFamily2.setMaxVersions(3); +hTableDescriptor.addFamily(columnFamily2); +hTableDescriptor.addCoprocessor(RegionObserverExample.class.getCanonicalName(), path, +Coprocessor.PRIORITY_USER, null); +admin.modifyTable(tableName, hTableDescriptor); +admin.enableTable(tableName); +---- -.Unload a Coprocessor From a Table Using HBase Shell ==== ----- - -hbase(main):007:0> alter 't1', METHOD => 'table_att_unset', -hbase(main):008:0* NAME => 'coprocessor$1' -Updating all regions with the new schema... -1/1 regions updated. -Done. -0 row(s) in 1.1130 seconds - -hbase(main):009:0> describe 't1' -DESCRIPTION ENABLED - {NAME => 't1', FAMILIES => [{NAME => 'c1', DATA_BLOCK_ENCODING => false - 'NONE', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', VERSION - S => '3', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '214 - 7483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN - _MEMORY => 'false', ENCODE_ON_DISK => 'true', BLOCKCACHE => 'true - '}, {NAME => 'f1', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => - 'NONE', REPLICATION_SCOPE => '0', VERSIONS => '3', COMPRESSION => - 'NONE', MIN_VERSIONS => '0', TTL => '2147483647', KEEP_DELETED_C - ELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', ENCO - DE_ON_DISK => 'true', BLOCKCACHE => 'true'}]} -1 row(s) in 0.0180 seconds ----- +WARNING: There is no guarantee that the framework will load a given Coprocessor successfully. +For example, the shell command neither guarantees a jar file exists at a particular location nor +verifies whether the given class is actually contained in the jar file. ==== -WARNING: There is no guarantee that the framework will load a given coprocessor successfully. -For example, the shell command neither guarantees a jar file exists at a particular location nor verifies whether the given class is actually contained in the jar file. - -== Check the Status of a Coprocessor - -To check the status of a coprocessor after it has been configured, use the `status` HBase Shell command. +==== Unloading Dynamic Coprocessor +. Using shell: Run following command from HBase shell to remove Coprocessor from a table. ++ +[source] +---- +hbase(main):003:0> alter 'users', METHOD => 'table_att_unset', +hbase(main):004:0* NAME => 'coprocessor$1' ---- -hbase(main):020:0> status 'detailed' -version 0.92-tm-6 -0 regionsInTransition -master coprocessors: [] -1 live servers - localhost:52761 1328082515520 - requestsPerSecond=3, numberOfOnlineRegions=3, usedHeapMB=32, maxHeapMB=995 - -ROOT-,,0 - numberOfStores=1, numberOfStorefiles=1, storefileUncompressedSizeMB=0, storefileSizeMB=0, memstoreSizeMB=0, -storefileIndexSizeMB=0, readRequestsCount=54, writeRequestsCount=1, rootIndexSizeKB=0, totalStaticIndexSizeKB=0, -totalStaticBloomSizeKB=0, totalCompactingKVs=0, currentCompactedKVs=0, compactionProgressPct=NaN, coprocessors=[] - .META.,,1 - numberOfStores=1, numberOfStorefiles=0, storefileUncompressedSizeMB=0, storefileSizeMB=0, memstoreSizeMB=0, -storefileIndexSizeMB=0, readRequestsCount=97, writeRequestsCount=4, rootIndexSizeKB=0, totalStaticIndexSizeKB=0, -totalStaticBloomSizeKB=0, totalCompactingKVs=0, currentCompactedKVs=0, compactionProgressPct=NaN, coprocessors=[] - t1,,1328082575190.c0491168a27620ffe653ec6c04c9b4d1. - numberOfStores=2, numberOfStorefiles=1, storefileUncompressedSizeMB=0, storefileSizeMB=0, memstoreSizeMB=0, -storefileIndexSizeMB=0, readRequestsCount=0, writeRequestsCount=0, rootIndexSizeKB=0, totalStaticIndexSizeKB=0, -totalStaticBloomSizeKB=0, totalCompactingKVs=0, currentCompactedKVs=0, compactionProgressPct=NaN, -coprocessors=[AggregateImplementation] -0 dead servers +. Using HtableDescriptor: Simply reload the table definition _without_ setting the value of +Coprocessor either in setValue() or addCoprocessor() methods. This will remove the Coprocessor +attached to this table, if any. For example: ++ +[source,java] ---- +String tableName = "users"; +String path = "hdfs://:/user//coprocessor.jar"; +Configuration conf = HBaseConfiguration.create(); +HBaseAdmin admin = new HBaseAdmin(conf); +admin.disableTable(tableName); +HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName); +HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet"); +columnFamily1.setMaxVersions(3); +hTableDescriptor.addFamily(columnFamily1); +HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet"); +columnFamily2.setMaxVersions(3); +hTableDescriptor.addFamily(columnFamily2); +admin.modifyTable(tableName, hTableDescriptor); +admin.enableTable(tableName); +---- ++ +Optionally you can also use removeCoprocessor() method of HTableDescriptor class. + + + +[[cp_example]] +== Examples +HBase ships Coprocessor examples for Observer Coprocessor see +// Below URL is more than 100 characters long. +link:http://hbase.apache.org/xref/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.html[ZooKeeperScanPolicyObserver] +and for Endpoint Coprocessor see +// Below URL is more than 100 characters long. +link:http://hbase.apache.org/xref/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.html[RowCountEndpoint] + +A more detailed example is given below. + +For the sake of example let's take an hypothetical case. Suppose there is a HBase table called +'users'. The table has two column families 'personalDet' and 'salaryDet' containing personal +details and salary details respectively. Below is the graphical representation of the 'users' +table. + +.Users Table +[width="100%",cols="7",options="header,footer"] +|==================== +| 3+|personalDet 3+|salaryDet +|*rowkey* |*name* |*lastname* |*dob* |*gross* |*net* |*allowances* +|admin |Admin |Admin | 3+| +|cdickens |Charles |Dickens |02/07/1812 |10000 |8000 |2000 +|jverne |Jules |Verne |02/08/1828 |12000 |9000 |3000 +|==================== + + + +=== Observer Example +For the purpose of demonstration of Coprocessor we are assuming that 'admin' is a special person +and his details shouldn't be visible or returned to any client querying the 'users' table. + +To implement this functionality we will take the help of Observer Coprocessor. +Following are the implementation steps: + +. Write a class that extends the +// Below URL is more than 100 characters long. +link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.html[BaseRegionObserver] +class. + +. Override the 'preGetOp()' method (Note that 'preGet()' method is now deprecated). The reason for +overriding this method is to check if the client has queried for the rowkey with value 'admin' or +not. If the client has queried rowkey with 'admin' value then return the call without allowing the +system to perform the get operation thus saving on performance, otherwise process the request as +normal. + +. Put your code and dependencies in the jar file. + +. Place the jar in HDFS where HBase can locate it. + +. Load the Coprocessor. + +. Write a simple program to test it. + +Following are the implementation of the above steps: + +. For Step 1 and Step 2, below is the code. ++ +[source,java] +---- +public class RegionObserverExample extends BaseRegionObserver { + + private static final byte[] ADMIN = Bytes.toBytes("admin"); + private static final byte[] COLUMN_FAMILY = Bytes.toBytes("details"); + private static final byte[] COLUMN = Bytes.toBytes("Admin_det"); + private static final byte[] VALUE = Bytes.toBytes("You can't see Admin details"); + + @Override + public void preGetOp(final ObserverContext e, final Get get, final List results) + throws IOException { + + if (Bytes.equals(get.getRow(),ADMIN)) { + Cell c = CellUtil.createCell(get.getRow(),COLUMN _FAMILY, COLUMN, + System.currentTimeMillis(), (byte)4, VALUE); + results.add(c); + e.bypass(); + } + + List kvs = new ArrayList(results.size()); + for (Cell c : results) { + kvs.add(KeyValueUtil.ensureKeyValue(c)); + } + preGet(e, get, kvs); + results.clear(); + results.addAll(kvs); + } +} +---- +Overriding the 'preGetOp()' will only work for 'Get' operation. For 'Scan' operation it won't help +you. To deal with it you have to override another method called 'preScannerOpen()' method, and +add a Filter explicitly for admin as shown below: ++ +[source,java] +---- +@Override +public RegionScanner preScannerOpen(final ObserverContext e, final Scan scan, +final RegionScanner s) throws IOException { + + Filter filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(ADMIN)); + scan.setFilter(filter); + return s; +} +---- ++ +This method works but there is a _side effect_. If the client has used any Filter in his scan, +then that Filter won't have any effect because our filter has replaced it. + +Another option you can try is to deliberately remove the admin from result. This approach is +shown below: ++ +[source,java] +---- +@Override +public boolean postScannerNext(final ObserverContext e, final InternalScanner s, +final List results, final int limit, final boolean hasMore) throws IOException { + Result result = null; + Iterator iterator = results.iterator(); + while (iterator.hasNext()) { + result = iterator.next(); + if (Bytes.equals(result.getRow(), ROWKEY)) { + iterator.remove(); + break; + } + } + return hasMore; +} +---- + +. Step 3: It's pretty convenient to export the above program in a jar file. Let's assume that was +exported in a file called 'coprocessor.jar'. + +. Step 4: Copy the jar to HDFS. You may use command like this: ++ +[source] +---- +hadoop fs -copyFromLocal coprocessor.jar coprocessor.jar +---- + +. Step 5: Load the Coprocessor, see <>. + +. Step 6: Run the following program to test. The first part is testing 'Get' and second 'Scan'. ++ +[source,java] +---- +Configuration conf = HBaseConfiguration.create(); +// Use below code for HBase verion 1.x.x or above. +Connection connection = ConnectionFactory.createConnection(conf); +TableName tableName = TableName.valueOf("users"); +Table table = connection.getTable(tableName); + +//Use below code HBase verion 0.98.xx or below. +//HConnection connection = HConnectionManager.createConnection(conf); +//HTableInterface table = connection.getTable("users"); + +Get get = new Get(Bytes.toBytes("admin")); +Result result = table.get(get); +for (Cell c : result.rawCells()) { + System.out.println(Bytes.toString(CellUtil.cloneRow(c)) + + "==> " + Bytes.toString(CellUtil.cloneFamily(c)) + + "{" + Bytes.toString(CellUtil.cloneQualifier(c)) + + ":" + Bytes.toLong(CellUtil.cloneValue(c)) + "}"); +} +Scan scan = new Scan(); +ResultScanner scanner = table.getScanner(scan); +for (Result res : scanner) { + for (Cell c : res.rawCells()) { + System.out.println(Bytes.toString(CellUtil.cloneRow(c)) + + " ==> " + Bytes.toString(CellUtil.cloneFamily(c)) + + " {" + Bytes.toString(CellUtil.cloneQualifier(c)) + + ":" + Bytes.toLong(CellUtil.cloneValue(c)) + + "}"); + } +} +---- + +=== Endpoint Example + +In our hypothetical example (See Users Table), to demonstrate the Endpoint Coprocessor we see a +trivial use case in which we will try to calculate the total (Sum) of gross salary of all +employees. One way of implementing Endpoint Coprocessor (for version 0.96 and above) is as follows: + +. Create a '.proto' file defining your service. + +. Execute the 'protoc' command to generate the Java code from the above '.proto' file. + +. Write a class that should: +.. Extend the above generated service class. +.. It should also implement two interfaces Coprocessor and CoprocessorService. +.. Override the service method. + +. Load the Coprocessor. + +. Write a client code to call Coprocessor. + +Implementation detail of the above steps is as follows: + +. Step 1: Create a 'proto' file to define your service, request and response. Let's call this file +"sum.proto". Below is the content of the 'sum.proto' file. ++ +[source] +---- +option java_package = "org.myname.hbase.coprocessor.autogenerated"; +option java_outer_classname = "Sum"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; +message SumRequest { + required string family = 1; + required string column = 2; +} + +message SumResponse { + required int64 sum = 1 [default = 0]; +} + +service SumService { + rpc getSum(SumRequest) + returns (SumResponse); +} +---- + +. Step 2: Compile the proto file using proto compiler (for detailed instructions see the +link:https://developers.google.com/protocol-buffers/docs/overview[official documentation]). ++ +[source] +---- +$ protoc --java_out=src ./sum.proto +---- ++ +[note] +---- +(Note: It is necessary for you to create the src folder). +This will generate a class call "Sum.java". +---- + +. Step 3: Write your Endpoint Coprocessor: Firstly your class should extend the service just +defined above (i.e. Sum.SumService). Second it should implement Coprocessor and CoprocessorService +interfaces. Third, override the 'getService()', 'start()', 'stop()' and 'getSum()' methods. +Below is the full code: ++ +[source,java] +---- +public class SumEndPoint extends SumService implements Coprocessor, CoprocessorService { + + private RegionCoprocessorEnvironment env; + + @Override + public Service getService() { + return this; + } + + @Override + public void start(CoprocessorEnvironment env) throws IOException { + if (env instanceof RegionCoprocessorEnvironment) { + this.env = (RegionCoprocessorEnvironment)env; + } else { + throw new CoprocessorException("Must be loaded on a table region!"); + } + } + + @Override + public void stop(CoprocessorEnvironment env) throws IOException { + // do mothing + } + + @Override + public void getSum(RpcController controller, SumRequest request, RpcCallback done) { + Scan scan = new Scan(); + scan.addFamily(Bytes.toBytes(request.getFamily())); + scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn())); + SumResponse response = null; + InternalScanner scanner = null; + try { + scanner = env.getRegion().getScanner(scan); + List results = new ArrayList(); + boolean hasMore = false; + long sum = 0L; + do { + hasMore = scanner.next(results); + for (Cell cell : results) { + sum = sum + Bytes.toLong(CellUtil.cloneValue(cell)); + } + results.clear(); + } while (hasMore); + + response = SumResponse.newBuilder().setSum(sum).build(); + + } catch (IOException ioe) { + ResponseConverter.setControllerException(controller, ioe); + } finally { + if (scanner != null) { + try { + scanner.close(); + } catch (IOException ignored) {} + } + } + done.run(response); + } +} +---- + +. Step 4: Load the Coprocessor. See <>. + +. Step 5: Now we have to write the client code to test it. To do so in your main method, write the +following code as shown below: ++ +[source,java] +---- + +Configuration conf = HBaseConfiguration.create(); +// Use below code for HBase verion 1.x.x or above. +Connection connection = ConnectionFactory.createConnection(conf); +TableName tableName = TableName.valueOf("users"); +Table table = connection.getTable(tableName); + +//Use below code HBase verion 0.98.xx or below. +//HConnection connection = HConnectionManager.createConnection(conf); +//HTableInterface table = connection.getTable("users"); + +final SumRequest request = SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross") + .build(); +try { +Map results = table.CoprocessorService (SumService.class, null, null, +new Batch.Call() { + @Override + public Long call(SumService aggregate) throws IOException { +BlockingRpcCallback rpcCallback = new BlockingRpcCallback(); + aggregate.getSum(null, request, rpcCallback); + SumResponse response = rpcCallback.get(); + return response.hasSum() ? response.getSum() : 0L; + } + }); + for (Long sum : results.values()) { + System.out.println("Sum = " + sum); + } +} catch (ServiceException e) { +e.printStackTrace(); +} catch (Throwable e) { + e.printStackTrace(); +} +---- + == Monitor Time Spent in Coprocessors -HBase 0.98.5 introduced the ability to monitor some statistics relating to the amount of time spent executing a given coprocessor. -You can see these statistics via the HBase Metrics framework (see <> or the Web UI for a given Region Server, via the _Coprocessor Metrics_ tab. -These statistics are valuable for debugging and benchmarking the performance impact of a given coprocessor on your cluster. +HBase 0.98.5 introduced the ability to monitor some statistics relating to the amount of time +spent executing a given Coprocessor. +You can see these statistics via the HBase Metrics framework (see <> or the Web UI +for a given Region Server, via the _Coprocessor Metrics_ tab. +These statistics are valuable for debugging and benchmarking the performance impact of a given +Coprocessor on your cluster. Tracked statistics include min, max, average, and 90th, 95th, and 99th percentile. All times are shown in milliseconds. -The statistics are calculated over coprocessor execution samples recorded during the reporting interval, which is 10 seconds by default. +The statistics are calculated over Coprocessor execution samples recorded during the reporting +interval, which is 10 seconds by default. The metrics sampling rate as described in <>. .Coprocessor Metrics UI