HBASE-13867: Add endpoint coprocessor guide to HBase book.

This commit is contained in:
g-bhardwaj 2015-10-25 17:29:35 +05:30 committed by Misty Stanley-Jones
parent 46c646d615
commit 8a2cef3315
1 changed files with 753 additions and 144 deletions

View File

@ -27,203 +27,812 @@
:icons: font :icons: font
:experimental: :experimental:
HBase coprocessors are modeled after the coprocessors which are part of Google's BigTable (http://www.scribd.com/doc/21631448/Dean-Keynote-Ladis2009, pages 66-67.). Coprocessors function in a similar way to Linux kernel modules. HBase Coprocessors are modeled after the Coprocessors which are part of Google's BigTable
They provide a way to run server-level code against locally-stored data. (http://research.google.com/people/jeff/SOCC2010-keynote-slides.pdf pages 41-42.). +
The functionality they provide is very powerful, but also carries great risk and can have adverse effects on the system, at the level of the operating system. Coprocessor is a framework that provides an easy way to run your custom code directly on
The information in this chapter is primarily sourced and heavily reused from Mingjie Lai's blog post at https://blogs.apache.org/hbase/entry/coprocessor_introduction. Region Server.
The information in this chapter is primarily sourced and heavily reused from:
. Mingjie Lai's blog post
link:https://blogs.apache.org/hbase/entry/coprocessor_introduction[Coprocessor Introduction].
. Gaurav Bhardwaj's blog post
link:http://www.3pillarglobal.com/insights/hbase-coprocessors[The How To Of HBase Coprocessors].
Coprocessors are not designed to be used by end users of HBase, but by HBase developers who need to add specialized functionality to HBase.
One example of the use of coprocessors is pluggable compaction and scan policies, which are provided as coprocessors in link:https://issues.apache.org/jira/browse/HBASE-6427[HBASE-6427].
== Coprocessor Framework == Coprocessor Framework
The implementation of HBase coprocessors diverges from the BigTable implementation. When working with any data store (like RDBMS or HBase) you fetch the data (in case of RDBMS you
The HBase framework provides a library and runtime environment for executing user code within the HBase region server and master processes. might use SQL query and in case of HBase you use either Get or Scan). To fetch only relevant data
you filter it (for RDBMS you put conditions in 'WHERE' predicate and in HBase you use
link:http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/filter/Filter.html[Filter]).
After fetching the desired data, you perform your business computation on the data.
This scenario is close to ideal for "small data", where few thousand rows and a bunch of columns
are returned from the data store. Now imagine a scenario where there are billions of rows and
millions of columns and you want to perform some computation which requires all the data, like
calculating average or sum. Even if you are interested in just few columns, you still have to
fetch all the rows. There are a few drawbacks in this approach as described below:
The framework API is provided in the link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/coprocessor/package-summary.html[coprocessor] package. . In this approach the data transfer (from data store to client side) will become the bottleneck,
and the time required to complete the operation is limited by the rate at which data transfer
takes place.
. It's not always possible to hold so much data in memory and perform computation.
. Bandwidth is one of the most precious resources in any data center. Operations like this may
saturate your data centers bandwidth and will severely impact the performance of your cluster.
. Your client code is becoming thick as you are maintaining the code for calculating average or
summation on client side. Not a major drawback when talking of severe issues like
performance/bandwidth but still worth giving consideration.
Two different types of coprocessors are provided by the framework, based on their scope. In a scenario like this it's better to move the computation (i.e. user's custom code) to the data
itself (Region Server). Coprocessor helps you achieve this but you can do more than that.
There is another advantage that your code runs in parallel (i.e. on all Regions).
To give an idea of Coprocessor's capabilities, different people give different analogies.
The three most famous analogies for Coprocessor are:
[[cp_analogies]]
Triggers and Stored Procedure:: This is the most common analogy for Coprocessor. Observer
Coprocessor is compared to triggers because like triggers they execute your custom code when
certain event occurs (like Get or Put etc.). Similarly Endpoints Coprocessor is compared to the
stored procedures and you can perform custom computation on data directly inside the region server.
.Types of Coprocessors MapReduce:: As in MapReduce you move the computation to the data in the same way. Coprocessor
executes your custom computation directly on Region Servers, i.e. where data resides. That's why
some people compare Coprocessor to a small MapReduce jobs.
System Coprocessors:: AOP:: Some people compare it to _Aspect Oriented Programming_ (AOP). As in AOP, you apply advice
System coprocessors are loaded globally on all tables and regions hosted by a region server. (on occurrence of specific event) by intercepting the request and then running some custom code
(probably cross-cutting concerns) and then forwarding the request on its path as if nothing
happened (or even return it back). Similarly in Coprocessor you have this facility of intercepting
the request and running custom code and then forwarding it on its path (or returning it).
Table Coprocessors::
You can specify which coprocessors should be loaded on all regions for a table on a per-table basis.
The framework provides two different aspects of extensions as well: _observers_ and _endpoints_. Although Coprocessor derives its roots from Google's Bigtable but it deviates from it largely in
its design. Currently there are efforts going on to bridge this gap. For more information see
link:https://issues.apache.org/jira/browse/HBASE-4047[HBASE-4047].
Observers:: In HBase, to implement a Coprocessor certain steps must be followed as described below:
Observers are analogous to triggers in conventional databases.
They allow you to insert user code by overriding upcall methods provided by the coprocessor framework.
Callback functions are executed from core HBase code when events occur.
Callbacks are handled by the framework, and the coprocessor itself only needs to insert the extended or alternate functionality.
Endpoints (HBase 0.96.x and later):: . Either your class should extend one of the Coprocessor classes (like
The implementation for endpoints changed significantly in HBase 0.96.x due to the introduction of protocol buffers (protobufs) (link:https://issues.apache.org/jira/browse/HBASE-5448[HBASE-5488]). If you created endpoints before 0.96.x, you will need to rewrite them. // Below URL is more than 100 characters long.
Endpoints are now defined and callable as protobuf services, rather than endpoint invocations passed through as Writable blobs link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.html[BaseRegionObserver]
) or it should implement Coprocessor interfaces (like
link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/Coprocessor.html[Coprocessor],
// Below URL is more than 100 characters long.
link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/CoprocessorService.html[CoprocessorService]).
Endpoints (HBase 0.94.x and earlier):: . Load the Coprocessor: Currently there are two ways to load the Coprocessor. +
Dynamic RPC endpoints resemble stored procedures. Static:: Loading from configuration
An endpoint can be invoked at any time from the client. Dynammic:: Loading via 'hbase shell' or via Java code using HTableDescriptor class). +
When it is invoked, it is executed remotely at the target region or regions, and results of the executions are returned to the client. For more details see <<cp_loading,Loading Coprocessors>>.
== Examples . Finally your client-side code to call the Coprocessor. This is the easiest step, as HBase
handles the Coprocessor transparently and you don't have to do much to call the Coprocessor.
An example of an observer is included in _hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestZooKeeperScanPolicyObserver.java_.
Several endpoint examples are included in the same directory.
== Building A Coprocessor The framework API is provided in the
// Below URL is more than 100 characters long.
link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/coprocessor/package-summary.html[coprocessor]
package. +
Coprocessors are not designed to be used by the end users but by developers. Coprocessors are
executed directly on region server; therefore a faulty/malicious code can bring your region server
down. Currently there is no mechanism to prevent this, but there are efforts going on for this.
For more, see link:https://issues.apache.org/jira/browse/HBASE-4047[HBASE-4047]. +
Two different types of Coprocessors are provided by the framework, based on their functionality.
Before you can build a processor, it must be developed, compiled, and packaged in a JAR file.
The next step is to configure the coprocessor framework to use your coprocessor.
You can load the coprocessor from your HBase configuration, so that the coprocessor starts with HBase, or you can configure the coprocessor from the HBase shell, as a table attribute, so that it is loaded dynamically when the table is opened or reopened.
=== Load from Configuration
To configure a coprocessor to be loaded when HBase starts, modify the RegionServer's _hbase-site.xml_ and configure one of the following properties, based on the type of observer you are configuring: == Types of Coprocessors
* `hbase.coprocessor.region.classes`for RegionObservers and Endpoints Coprocessor can be broadly divided into two categories: Observer and Endpoint.
* `hbase.coprocessor.wal.classes`for WALObservers
* `hbase.coprocessor.master.classes`for MasterObservers
.Example RegionObserver Configuration === Observer
==== Observer Coprocessor are easy to understand. People coming from RDBMS background can compare them
In this example, one RegionObserver is configured for all the HBase tables. to the triggers available in relational databases. Folks coming from programming background can
visualize it like advice (before and after only) available in AOP (Aspect Oriented Programming).
See <<cp_analogies, Coprocessor Analogy>> +
Coprocessors allows you to hook your custom code in two places during the life cycle of an event. +
First is just _before_ the occurrence of the event (just like 'before' advice in AOP or triggers
like 'before update'). All methods providing this kind feature will start with the prefix `pre`. +
For example if you want your custom code to get executed just before the `Put` operation, you can
use the override the
// Below URL is more than 100 characters long.
link:http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html#prePut%28org.apache.hadoop.hbase.coprocessor.ObserverContext,%20org.apache.hadoop.hbase.client.Put,%20org.apache.hadoop.hbase.regionserver.wal.WALEdit,%20org.apache.hadoop.hbase.client.Durability%29[`prePut`]
method of
// Below URL is more than 100 characters long.
link:http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html[RegionCoprocessor].
This method has following signature:
[source,java]
----
public void prePut (final ObserverContext e, final Put put, final WALEdit edit,final Durability
durability) throws IOException;
----
Secondly, the Observer Coprocessor also provides hooks for your code to get executed just _after_
the occurrence of the event (similar to after advice in AOP terminology or 'after update' triggers
). The methods giving this functionality will start with the prefix `post`. For example, if you
want your code to be executed after the 'Put' operation, you should consider overriding
// Below URL is more than 100 characters long.
link:http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html#postPut%28org.apache.hadoop.hbase.coprocessor.ObserverContext,%20org.apache.hadoop.hbase.client.Put,%20org.apache.hadoop.hbase.regionserver.wal.WALEdit,%20org.apache.hadoop.hbase.client.Durability%29[`postPut`]
method of
// Below URL is more than 100 characters long.
link:http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html[RegionCoprocessor]:
[source,java]
----
public void postPut(final ObserverContext e, final Put put, final WALEdit edit, final Durability
durability) throws IOException;
----
In short, the following conventions are generally followed: +
Override _preXXX()_ method if you want your code to be executed just before the occurrence of the
event. +
Override _postXXX()_ method if you want your code to be executed just after the occurrence of the
event. +
.Use Cases for Observer Coprocessors:
Few use cases of the Observer Coprocessor are:
. *Security*: Before performing any operation (like 'Get', 'Put') you can check for permission in
the 'preXXX' methods.
. *Referential Integrity*: Unlike traditional RDBMS, HBase doesn't have the concept of referential
integrity (foreign key). Suppose for example you have a requirement that whenever you insert a
record in 'users' table, a corresponding entry should also be created in 'user_daily_attendance'
table. One way you could solve this is by using two 'Put' one for each table, this way you are
throwing the responsibility (of the referential integrity) to the user. A better way is to use
Coprocessor and overriding 'postPut' method in which you write the code to insert the record in
'user_daily_attendance' table. This way client code is more lean and clean.
. *Secondary Index*: Coprocessor can be used to maintain secondary indexes. For more information
see link:http://wiki.apache.org/hadoop/Hbase/SecondaryIndexing[SecondaryIndexing].
==== Types of Observer Coprocessor
Observer Coprocessor comes in following flavors:
. *RegionObserver*: This Coprocessor provides the facility to hook your code when the events on
region are triggered. Most common example include 'preGet' and 'postGet' for 'Get' operation and
'prePut' and 'postPut' for 'Put' operation. For exhaustive list of supported methods (events) see
// Below URL is more than 100 characters long.
link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionObserver.html[RegionObserver].
. *Region Server Observer*: Provides hook for the events related to the RegionServer, such as
stopping the RegionServer and performing operations before or after merges, commits, or rollbacks.
For more details please refer
link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/RegionServerObserver.html[RegionServerObserver].
. *Master Observer*: This observer provides hooks for DDL like operation, such as create, delete,
modify table. For entire list of available methods see
// Below URL is more than 100 characters long.
link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/MasterObserver.html[MasterObserver].
. *WAL Observer*: Provides hooks for WAL (Write-Ahead-Log) related operation. It has only two
method 'preWALWrite()' and 'postWALWrite()'. For more details see
// Below URL is more than 100 characters long.
link:http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/coprocessor/WALObserver.html[WALObserver].
For example see <<cp_example,Examples>>
=== Endpoint Coprocessor
Endpoint Coprocessor can be compared to stored procedure found in RDBMS.
See <<cp_analogies, Coprocessor Analogy>>. They help in performing computation which is not
possible either through Observer Coprocessor or otherwise. For example, calculating average or
summation over the entire table that spans across multiple regions. They do so by providing a hook
for your custom code and then running it across all regions. +
With Endpoints Coprocessor you can create your own dynamic RPC protocol and thus can provide
communication between client and region server, hence enabling you to run your custom code on
region server (on each region of a table). +
Unlike observer Coprocessor (where your custom code is
executed transparently when events like 'Get' operation occurs), in Endpoint Coprocessor you have
to explicitly invoke the Coprocessor by using the
// Below URL is more than 100 characters long.
link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/Table.html#coprocessorService%28java.lang.Class,%20byte%5B%5D,%20byte%5B%5D,%20org.apache.hadoop.hbase.client.coprocessor.Batch.Call%29[CoprocessorService()]
method available in
link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/Table.html[Table]
(or
// Below URL is more than 100 characters long.
link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/HTableInterface.html[HTableInterface]
or
link:https://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/client/HTable.html[HTable]).
From version 0.96, implementing Endpoint Coprocessor is not straight forward. Now it is done with
the help of Google's Protocol Buffer. For more details on Protocol Buffer, please see
link:https://developers.google.com/protocol-buffers/docs/proto[Protocol Buffer Guide].
Endpoints Coprocessor written in version 0.94 are not compatible with with version 0.96 or later
(for more details, see
link:https://issues.apache.org/jira/browse/HBASE-5448[HBASE-5448]),
so if your are upgrading your HBase cluster from version 0.94 (or before) to 0.96 (or later) you
have to rewrite your Endpoint coprocessor.
For example see <<cp_example,Examples>>
[[cp_loading]]
== Loading Coprocessors
_Loading of Coprocessor refers to the process of making your custom Coprocessor implementation
available to the the HBase, so that when a requests comes in or an event takes place the desired
functionality implemented in your custom code gets executed. +
Coprocessor can be loaded broadly in two ways. One is static (loading through configuration files)
and the other one is dynamic loading (using hbase shell or java code).
=== Static Loading
Static loading means that your Coprocessor will take effect only when you restart your HBase and
there is a reason for it. In this you make changes 'hbase-site.xml' and therefore have to restart
HBase for your changes to take place. +
Following are the steps for loading Coprocessor statically.
. Define the Coprocessor in hbase-site.xml: Define a <property> element which consist of two
sub elements <name> and <value> respectively.
+
.. <name> can have one of the following values:
+
... 'hbase.coprocessor.region.classes' for RegionObservers and Endpoints.
... 'hbase.coprocessor.wal.classes' for WALObservers.
... 'hbase.coprocessor.master.classes' for MasterObservers.
.. <value> must contain the fully qualified class name of your class implmenting the Coprocessor.
+
For example to load a Coprocessor (implemented in class SumEndPoint.java) you have to create
following entry in RegionServer's 'hbase-site.xml' file (generally located under 'conf' directiory):
+
[source,xml] [source,xml]
---- ----
<property> <property>
<name>hbase.coprocessor.region.classes</name> <name>hbase.coprocessor.region.classes</name>
<value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value> <value>org.myname.hbase.coprocessor.endpoint.SumEndPoint</value>
</property> </property>
---- ----
====
If multiple classes are specified for loading, the class names must be comma-separated. If multiple classes are specified for loading, the class names must be comma-separated.
The framework attempts to load all the configured classes using the default class loader. The framework attempts to load all the configured classes using the default class loader.
Therefore, the jar file must reside on the server-side HBase classpath. Therefore, the jar file must reside on the server-side HBase classpath.
+
Coprocessors which are loaded in this way will be active on all regions of all tables. Coprocessors which are loaded in this way will be active on all regions of all tables.
These are the system coprocessor introduced earlier. These are also called system Coprocessor.
The first listed coprocessors will be assigned the priority `Coprocessor.Priority.SYSTEM`. The first listed Coprocessors will be assigned the priority `Coprocessor.Priority.SYSTEM`.
Each subsequent coprocessor in the list will have its priority value incremented by one (which reduces its priority, because priorities have the natural sort order of Integers). Each subsequent coprocessor in the list will have its priority value incremented by one (which
reduces its priority, because priorities have the natural sort order of Integers).
When calling out to registered observers, the framework executes their callbacks methods in the sorted order of their priority. +
When calling out to registered observers, the framework executes their callbacks methods in the
sorted order of their priority. +
Ties are broken arbitrarily. Ties are broken arbitrarily.
=== Load from the HBase Shell . Put your code on classpth of HBase: There are various ways to do so, like adding jars on
classpath etc. One easy way to do this is to drop the jar (containing you code and all the
dependencies) in 'lib' folder of the HBase installation.
You can load a coprocessor on a specific table via a table attribute. . Restart the HBase.
The following example will load the `FooRegionObserver` observer when table `t1` is read or re-read.
.Load a Coprocessor On a Table Using HBase Shell
==== Unloading Static Coprocessor
Unloading static Coprocessor is easy. Following are the steps:
. Delete the Coprocessor's entry from the 'hbase-site.xml' i.e. remove the <property> tag.
. Restart the Hbase.
. Optionally remove the Coprocessor jar file from the classpath (or from the lib directory if you
copied it over there). Removing the coprocessor JARs from HBases classpath is a good practice.
=== Dynamic Loading
Dynamic loading refers to the process of loading Coprocessor without restarting HBase. This may
sound better than the static loading (and in some scenarios it may) but there is a caveat, dynamic
loaded Coprocessor applies to the table only for which it was loaded while same is not true for
static loading as it applies to all the tables. Due to this difference sometimes dynamically
loaded Coprocessor are also called *Table Coprocessor* (as they applies only to a single table)
while statically loaded Coprocessor are called *System Coprocessor* (as they applies to all the
tables). +
To dynamically load the Coprocessor you have to take the table offline hence during this time you
won't be able to process any request involving this table. +
There are three ways to dynamically load Coprocessor as shown below:
[NOTE]
.Assumptions
==== ====
---- The below mentioned instructions makes the following assumptions:
hbase(main):005:0> alter 't1', METHOD => 'table_att',
'coprocessor'=>'hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2'
Updating all regions with the new schema...
1/1 regions updated.
Done.
0 row(s) in 1.0730 seconds
hbase(main):006:0> describe 't1' * A JAR called `coprocessor.jar` contains the Coprocessor implementation along with all of its
DESCRIPTION ENABLED dependencies if any.
{NAME => 't1', coprocessor$1 => 'hdfs:///foo.jar|com.foo.FooRegio false * The JAR is available in HDFS in some location like
nObserver|1001|arg1=1,arg2=2', FAMILIES => [{NAME => 'c1', DATA_B `hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar`.
LOCK_ENCODING => 'NONE', BLOOMFILTER => 'NONE', REPLICATION_SCOPE
=> '0', VERSIONS => '3', COMPRESSION => 'NONE', MIN_VERSIONS =>
'0', TTL => '2147483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZ
E => '65536', IN_MEMORY => 'false', ENCODE_ON_DISK => 'true', BLO
CKCACHE => 'true'}, {NAME => 'f1', DATA_BLOCK_ENCODING => 'NONE',
BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', VERSIONS => '3'
, COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '2147483647'
, KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY
=> 'false', ENCODE_ON_DISK => 'true', BLOCKCACHE => 'true'}]}
1 row(s) in 0.0190 seconds
----
==== ====
The coprocessor framework will try to read the class information from the coprocessor table attribute value. . *Using Shell*: You can load the Coprocessor using the HBase shell as follows:
The value contains four pieces of information which are separated by the `|` character. .. Disable Table: Take table offline by disabling it. Suppose if the table name is 'users', then
to disable it enter following command:
+
[source]
----
hbase(main):001:0> disable 'users'
----
* File path: The jar file containing the coprocessor implementation must be in a location where all region servers can read it. .. Load the Coprocessor: The Coprocessor jar should be on HDFS and should be accessible to HBase,
You could copy the file onto the local disk on each region server, but it is recommended to store it in HDFS. to load the Coprocessor use following command:
* Class name: The full class name of the coprocessor. +
* Priority: An integer. [source]
The framework will determine the execution sequence of all configured observers registered at the same hook using priorities. ----
This field can be left blank. hbase(main):002:0> alter 'users', METHOD => 'table_att', 'Coprocessor'=>'hdfs://<namenode>:<port>/
In that case the framework will assign a default priority value. user/<hadoop-user>/coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823|
* Arguments: This field is passed to the coprocessor implementation. arg1=1,arg2=2'
----
+
The Coprocessor framework will try to read the class information from the coprocessor table
attribute value.
The value contains four pieces of information which are separated by the pipe (`|`) character.
+
* File path: The jar file containing the Coprocessor implementation must be in a location where
all region servers can read it. +
You could copy the file onto the local disk on each region server, but it is recommended to store
it in HDFS.
* Class name: The full class name of the Coprocessor.
* Priority: An integer. The framework will determine the execution sequence of all configured
observers registered at the same hook using priorities. This field can be left blank. In that
case the framework will assign a default priority value.
* Arguments (Optional): This field is passed to the Coprocessor implementation. This is optional.
.. Enable the table: To enable table type following command:
+
----
hbase(main):003:0> enable 'users'
----
.. Verification: This is optional but generally good practice to see if your Coprocessor is
loaded successfully. Enter following command:
+
----
hbase(main):04:0> describe 'users'
----
+
You must see some output like this:
+
----
DESCRIPTION ENABLED
'users', {TABLE_ATTRIBUTES => {coprocessor$1 => true 'hdfs://<namenode>:<port>/user/<hadoop-user>/
coprocessor.jar| org.myname.hbase.Coprocessor.RegionObserverExample|1073741823|'}, {NAME =>
'personalDet'.....
----
. *Using setValue()* method of HTableDescriptor: This is done entirely in Java as follows:
+
[source,java]
----
String tableName = "users";
String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar";
Configuration conf = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(conf);
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
hTableDescriptor.setValue("COPROCESSOR$1", path + "|"
+ RegionObserverExample.class.getCanonicalName() + "|"
+ Coprocessor.PRIORITY_USER);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);
----
. *Using addCoprocessor()* method of HTableDescriptor: This method is available from 0.96 version
onwards.
+
[source,java]
----
String tableName = "users";
String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar";
Configuration conf = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(conf);
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
hTableDescriptor.addCoprocessor(RegionObserverExample.class.getCanonicalName(), path,
Coprocessor.PRIORITY_USER, null);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);
----
.Unload a Coprocessor From a Table Using HBase Shell
==== ====
---- WARNING: There is no guarantee that the framework will load a given Coprocessor successfully.
For example, the shell command neither guarantees a jar file exists at a particular location nor
hbase(main):007:0> alter 't1', METHOD => 'table_att_unset', verifies whether the given class is actually contained in the jar file.
hbase(main):008:0* NAME => 'coprocessor$1'
Updating all regions with the new schema...
1/1 regions updated.
Done.
0 row(s) in 1.1130 seconds
hbase(main):009:0> describe 't1'
DESCRIPTION ENABLED
{NAME => 't1', FAMILIES => [{NAME => 'c1', DATA_BLOCK_ENCODING => false
'NONE', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', VERSION
S => '3', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '214
7483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN
_MEMORY => 'false', ENCODE_ON_DISK => 'true', BLOCKCACHE => 'true
'}, {NAME => 'f1', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER =>
'NONE', REPLICATION_SCOPE => '0', VERSIONS => '3', COMPRESSION =>
'NONE', MIN_VERSIONS => '0', TTL => '2147483647', KEEP_DELETED_C
ELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', ENCO
DE_ON_DISK => 'true', BLOCKCACHE => 'true'}]}
1 row(s) in 0.0180 seconds
----
==== ====
WARNING: There is no guarantee that the framework will load a given coprocessor successfully.
For example, the shell command neither guarantees a jar file exists at a particular location nor verifies whether the given class is actually contained in the jar file.
== Check the Status of a Coprocessor
To check the status of a coprocessor after it has been configured, use the `status` HBase Shell command.
==== Unloading Dynamic Coprocessor
. Using shell: Run following command from HBase shell to remove Coprocessor from a table.
+
[source]
----
hbase(main):003:0> alter 'users', METHOD => 'table_att_unset',
hbase(main):004:0* NAME => 'coprocessor$1'
---- ----
hbase(main):020:0> status 'detailed' . Using HtableDescriptor: Simply reload the table definition _without_ setting the value of
version 0.92-tm-6 Coprocessor either in setValue() or addCoprocessor() methods. This will remove the Coprocessor
0 regionsInTransition attached to this table, if any. For example:
master coprocessors: [] +
1 live servers [source,java]
localhost:52761 1328082515520
requestsPerSecond=3, numberOfOnlineRegions=3, usedHeapMB=32, maxHeapMB=995
-ROOT-,,0
numberOfStores=1, numberOfStorefiles=1, storefileUncompressedSizeMB=0, storefileSizeMB=0, memstoreSizeMB=0,
storefileIndexSizeMB=0, readRequestsCount=54, writeRequestsCount=1, rootIndexSizeKB=0, totalStaticIndexSizeKB=0,
totalStaticBloomSizeKB=0, totalCompactingKVs=0, currentCompactedKVs=0, compactionProgressPct=NaN, coprocessors=[]
.META.,,1
numberOfStores=1, numberOfStorefiles=0, storefileUncompressedSizeMB=0, storefileSizeMB=0, memstoreSizeMB=0,
storefileIndexSizeMB=0, readRequestsCount=97, writeRequestsCount=4, rootIndexSizeKB=0, totalStaticIndexSizeKB=0,
totalStaticBloomSizeKB=0, totalCompactingKVs=0, currentCompactedKVs=0, compactionProgressPct=NaN, coprocessors=[]
t1,,1328082575190.c0491168a27620ffe653ec6c04c9b4d1.
numberOfStores=2, numberOfStorefiles=1, storefileUncompressedSizeMB=0, storefileSizeMB=0, memstoreSizeMB=0,
storefileIndexSizeMB=0, readRequestsCount=0, writeRequestsCount=0, rootIndexSizeKB=0, totalStaticIndexSizeKB=0,
totalStaticBloomSizeKB=0, totalCompactingKVs=0, currentCompactedKVs=0, compactionProgressPct=NaN,
coprocessors=[AggregateImplementation]
0 dead servers
---- ----
String tableName = "users";
String path = "hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar";
Configuration conf = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(conf);
admin.disableTable(tableName);
HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
HColumnDescriptor columnFamily1 = new HColumnDescriptor("personalDet");
columnFamily1.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily1);
HColumnDescriptor columnFamily2 = new HColumnDescriptor("salaryDet");
columnFamily2.setMaxVersions(3);
hTableDescriptor.addFamily(columnFamily2);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName);
----
+
Optionally you can also use removeCoprocessor() method of HTableDescriptor class.
[[cp_example]]
== Examples
HBase ships Coprocessor examples for Observer Coprocessor see
// Below URL is more than 100 characters long.
link:http://hbase.apache.org/xref/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.html[ZooKeeperScanPolicyObserver]
and for Endpoint Coprocessor see
// Below URL is more than 100 characters long.
link:http://hbase.apache.org/xref/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.html[RowCountEndpoint]
A more detailed example is given below.
For the sake of example let's take an hypothetical case. Suppose there is a HBase table called
'users'. The table has two column families 'personalDet' and 'salaryDet' containing personal
details and salary details respectively. Below is the graphical representation of the 'users'
table.
.Users Table
[width="100%",cols="7",options="header,footer"]
|====================
| 3+|personalDet 3+|salaryDet
|*rowkey* |*name* |*lastname* |*dob* |*gross* |*net* |*allowances*
|admin |Admin |Admin | 3+|
|cdickens |Charles |Dickens |02/07/1812 |10000 |8000 |2000
|jverne |Jules |Verne |02/08/1828 |12000 |9000 |3000
|====================
=== Observer Example
For the purpose of demonstration of Coprocessor we are assuming that 'admin' is a special person
and his details shouldn't be visible or returned to any client querying the 'users' table. +
To implement this functionality we will take the help of Observer Coprocessor.
Following are the implementation steps:
. Write a class that extends the
// Below URL is more than 100 characters long.
link:https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.html[BaseRegionObserver]
class.
. Override the 'preGetOp()' method (Note that 'preGet()' method is now deprecated). The reason for
overriding this method is to check if the client has queried for the rowkey with value 'admin' or
not. If the client has queried rowkey with 'admin' value then return the call without allowing the
system to perform the get operation thus saving on performance, otherwise process the request as
normal.
. Put your code and dependencies in the jar file.
. Place the jar in HDFS where HBase can locate it.
. Load the Coprocessor.
. Write a simple program to test it.
Following are the implementation of the above steps:
. For Step 1 and Step 2, below is the code.
+
[source,java]
----
public class RegionObserverExample extends BaseRegionObserver {
private static final byte[] ADMIN = Bytes.toBytes("admin");
private static final byte[] COLUMN_FAMILY = Bytes.toBytes("details");
private static final byte[] COLUMN = Bytes.toBytes("Admin_det");
private static final byte[] VALUE = Bytes.toBytes("You can't see Admin details");
@Override
public void preGetOp(final ObserverContext e, final Get get, final List results)
throws IOException {
if (Bytes.equals(get.getRow(),ADMIN)) {
Cell c = CellUtil.createCell(get.getRow(),COLUMN _FAMILY, COLUMN,
System.currentTimeMillis(), (byte)4, VALUE);
results.add(c);
e.bypass();
}
List kvs = new ArrayList(results.size());
for (Cell c : results) {
kvs.add(KeyValueUtil.ensureKeyValue(c));
}
preGet(e, get, kvs);
results.clear();
results.addAll(kvs);
}
}
----
Overriding the 'preGetOp()' will only work for 'Get' operation. For 'Scan' operation it won't help
you. To deal with it you have to override another method called 'preScannerOpen()' method, and
add a Filter explicitly for admin as shown below:
+
[source,java]
----
@Override
public RegionScanner preScannerOpen(final ObserverContext e, final Scan scan,
final RegionScanner s) throws IOException {
Filter filter = new RowFilter(CompareOp.NOT_EQUAL, new BinaryComparator(ADMIN));
scan.setFilter(filter);
return s;
}
----
+
This method works but there is a _side effect_. If the client has used any Filter in his scan,
then that Filter won't have any effect because our filter has replaced it. +
Another option you can try is to deliberately remove the admin from result. This approach is
shown below:
+
[source,java]
----
@Override
public boolean postScannerNext(final ObserverContext e, final InternalScanner s,
final List results, final int limit, final boolean hasMore) throws IOException {
Result result = null;
Iterator iterator = results.iterator();
while (iterator.hasNext()) {
result = iterator.next();
if (Bytes.equals(result.getRow(), ROWKEY)) {
iterator.remove();
break;
}
}
return hasMore;
}
----
. Step 3: It's pretty convenient to export the above program in a jar file. Let's assume that was
exported in a file called 'coprocessor.jar'.
. Step 4: Copy the jar to HDFS. You may use command like this:
+
[source]
----
hadoop fs -copyFromLocal coprocessor.jar coprocessor.jar
----
. Step 5: Load the Coprocessor, see <<cp_loading,Loading of Coprocessor>>.
. Step 6: Run the following program to test. The first part is testing 'Get' and second 'Scan'.
+
[source,java]
----
Configuration conf = HBaseConfiguration.create();
// Use below code for HBase verion 1.x.x or above.
Connection connection = ConnectionFactory.createConnection(conf);
TableName tableName = TableName.valueOf("users");
Table table = connection.getTable(tableName);
//Use below code HBase verion 0.98.xx or below.
//HConnection connection = HConnectionManager.createConnection(conf);
//HTableInterface table = connection.getTable("users");
Get get = new Get(Bytes.toBytes("admin"));
Result result = table.get(get);
for (Cell c : result.rawCells()) {
System.out.println(Bytes.toString(CellUtil.cloneRow(c))
+ "==> " + Bytes.toString(CellUtil.cloneFamily(c))
+ "{" + Bytes.toString(CellUtil.cloneQualifier(c))
+ ":" + Bytes.toLong(CellUtil.cloneValue(c)) + "}");
}
Scan scan = new Scan();
ResultScanner scanner = table.getScanner(scan);
for (Result res : scanner) {
for (Cell c : res.rawCells()) {
System.out.println(Bytes.toString(CellUtil.cloneRow(c))
+ " ==> " + Bytes.toString(CellUtil.cloneFamily(c))
+ " {" + Bytes.toString(CellUtil.cloneQualifier(c))
+ ":" + Bytes.toLong(CellUtil.cloneValue(c))
+ "}");
}
}
----
=== Endpoint Example
In our hypothetical example (See Users Table), to demonstrate the Endpoint Coprocessor we see a
trivial use case in which we will try to calculate the total (Sum) of gross salary of all
employees. One way of implementing Endpoint Coprocessor (for version 0.96 and above) is as follows:
. Create a '.proto' file defining your service.
. Execute the 'protoc' command to generate the Java code from the above '.proto' file.
. Write a class that should:
.. Extend the above generated service class.
.. It should also implement two interfaces Coprocessor and CoprocessorService.
.. Override the service method.
. Load the Coprocessor.
. Write a client code to call Coprocessor.
Implementation detail of the above steps is as follows:
. Step 1: Create a 'proto' file to define your service, request and response. Let's call this file
"sum.proto". Below is the content of the 'sum.proto' file.
+
[source]
----
option java_package = "org.myname.hbase.coprocessor.autogenerated";
option java_outer_classname = "Sum";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message SumRequest {
required string family = 1;
required string column = 2;
}
message SumResponse {
required int64 sum = 1 [default = 0];
}
service SumService {
rpc getSum(SumRequest)
returns (SumResponse);
}
----
. Step 2: Compile the proto file using proto compiler (for detailed instructions see the
link:https://developers.google.com/protocol-buffers/docs/overview[official documentation]).
+
[source]
----
$ protoc --java_out=src ./sum.proto
----
+
[note]
----
(Note: It is necessary for you to create the src folder).
This will generate a class call "Sum.java".
----
. Step 3: Write your Endpoint Coprocessor: Firstly your class should extend the service just
defined above (i.e. Sum.SumService). Second it should implement Coprocessor and CoprocessorService
interfaces. Third, override the 'getService()', 'start()', 'stop()' and 'getSum()' methods.
Below is the full code:
+
[source,java]
----
public class SumEndPoint extends SumService implements Coprocessor, CoprocessorService {
private RegionCoprocessorEnvironment env;
@Override
public Service getService() {
return this;
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment)env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// do mothing
}
@Override
public void getSum(RpcController controller, SumRequest request, RpcCallback done) {
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(request.getFamily()));
scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));
SumResponse response = null;
InternalScanner scanner = null;
try {
scanner = env.getRegion().getScanner(scan);
List results = new ArrayList();
boolean hasMore = false;
long sum = 0L;
do {
hasMore = scanner.next(results);
for (Cell cell : results) {
sum = sum + Bytes.toLong(CellUtil.cloneValue(cell));
}
results.clear();
} while (hasMore);
response = SumResponse.newBuilder().setSum(sum).build();
} catch (IOException ioe) {
ResponseConverter.setControllerException(controller, ioe);
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (IOException ignored) {}
}
}
done.run(response);
}
}
----
. Step 4: Load the Coprocessor. See <<cp_loading,loading of Coprocessor>>.
. Step 5: Now we have to write the client code to test it. To do so in your main method, write the
following code as shown below:
+
[source,java]
----
Configuration conf = HBaseConfiguration.create();
// Use below code for HBase verion 1.x.x or above.
Connection connection = ConnectionFactory.createConnection(conf);
TableName tableName = TableName.valueOf("users");
Table table = connection.getTable(tableName);
//Use below code HBase verion 0.98.xx or below.
//HConnection connection = HConnectionManager.createConnection(conf);
//HTableInterface table = connection.getTable("users");
final SumRequest request = SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross")
.build();
try {
Map<byte[], Long> results = table.CoprocessorService (SumService.class, null, null,
new Batch.Call<SumService, Long>() {
@Override
public Long call(SumService aggregate) throws IOException {
BlockingRpcCallback rpcCallback = new BlockingRpcCallback();
aggregate.getSum(null, request, rpcCallback);
SumResponse response = rpcCallback.get();
return response.hasSum() ? response.getSum() : 0L;
}
});
for (Long sum : results.values()) {
System.out.println("Sum = " + sum);
}
} catch (ServiceException e) {
e.printStackTrace();
} catch (Throwable e) {
e.printStackTrace();
}
----
== Monitor Time Spent in Coprocessors == Monitor Time Spent in Coprocessors
HBase 0.98.5 introduced the ability to monitor some statistics relating to the amount of time spent executing a given coprocessor. HBase 0.98.5 introduced the ability to monitor some statistics relating to the amount of time
You can see these statistics via the HBase Metrics framework (see <<hbase_metrics>> or the Web UI for a given Region Server, via the _Coprocessor Metrics_ tab. spent executing a given Coprocessor.
These statistics are valuable for debugging and benchmarking the performance impact of a given coprocessor on your cluster. You can see these statistics via the HBase Metrics framework (see <<hbase_metrics>> or the Web UI
for a given Region Server, via the _Coprocessor Metrics_ tab.
These statistics are valuable for debugging and benchmarking the performance impact of a given
Coprocessor on your cluster.
Tracked statistics include min, max, average, and 90th, 95th, and 99th percentile. Tracked statistics include min, max, average, and 90th, 95th, and 99th percentile.
All times are shown in milliseconds. All times are shown in milliseconds.
The statistics are calculated over coprocessor execution samples recorded during the reporting interval, which is 10 seconds by default. The statistics are calculated over Coprocessor execution samples recorded during the reporting
interval, which is 10 seconds by default.
The metrics sampling rate as described in <<hbase_metrics>>. The metrics sampling rate as described in <<hbase_metrics>>.
.Coprocessor Metrics UI .Coprocessor Metrics UI