From 7299a72715f0ef1b05e478bee2e60d8f26fc2c24 Mon Sep 17 00:00:00 2001 From: Andrew Kyle Purtell Date: Sat, 20 Nov 2010 01:23:39 +0000 Subject: [PATCH] HBASE-2001 Coprocessors: Colocate user code with regions git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1037102 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 16 +- .../coprocessor/BaseEndpointCoprocessor.java | 93 ++ .../BaseRegionObserverCoprocessor.java | 225 +++ .../hadoop/hbase/coprocessor/Coprocessor.java | 120 ++ .../coprocessor/CoprocessorEnvironment.java | 71 + .../coprocessor/CoprocessorException.java | 52 + .../hbase/coprocessor/RegionObserver.java | 336 +++++ .../hbase/coprocessor/package-info.java | 367 +++++ .../hbase/regionserver/CoprocessorHost.java | 1241 +++++++++++++++++ .../hadoop/hbase/regionserver/HRegion.java | 233 +++- .../hbase/regionserver/HRegionServer.java | 74 +- .../regionserver/RegionServerServices.java | 13 +- .../hbase/regionserver/SplitTransaction.java | 28 +- .../handler/OpenRegionHandler.java | 2 +- src/main/resources/hbase-default.xml | 12 + .../ColumnAggregationEndpoint.java | 61 + .../ColumnAggregationProtocol.java | 39 + .../coprocessor/SimpleRegionObserver.java | 274 ++++ .../coprocessor/TestCoprocessorEndpoint.java | 138 ++ .../coprocessor/TestCoprocessorInterface.java | 271 ++++ .../TestRegionObserverInterface.java | 195 +++ .../TestRegionObserverStacking.java | 134 ++ 22 files changed, 3926 insertions(+), 69 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/hbase/coprocessor/BaseEndpointCoprocessor.java create mode 100644 src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java create mode 100644 src/main/java/org/apache/hadoop/hbase/coprocessor/Coprocessor.java create mode 100644 src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java create mode 100644 src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorException.java create mode 100644 src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java create mode 100644 src/main/java/org/apache/hadoop/hbase/coprocessor/package-info.java create mode 100644 src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java create mode 100644 src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationEndpoint.java create mode 100644 src/test/java/org/apache/hadoop/hbase/coprocessor/ColumnAggregationProtocol.java create mode 100644 src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java create mode 100644 src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorEndpoint.java create mode 100644 src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java create mode 100644 src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java create mode 100644 src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java diff --git a/CHANGES.txt b/CHANGES.txt index 868b0464729..a0bb505ec22 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,17 @@ HBase Change Log + +Release 0.91.0 - Unreleased + INCOMPATIBLE CHANGES + HBASE-2002 Coprocessors: Client side support; Support RPC interface + changes at runtime (Gary Helmling via Andrew Purtell) + + BUG FIXES + + IMPROVEMENTS + HBASE-2001 Coprocessors: Colocate user code with regions (Mingjie Lai via + Andrew Purtell) + + Release 0.90.0 - Unreleased INCOMPATIBLE CHANGES HBASE-1822 Remove the deprecated APIs @@ -33,9 +46,6 @@ Release 0.90.0 - Unreleased HBASE-2641 HBASE-2641 Refactor HLog splitLog, hbase-2437 continued; break out split code as new classes (James Kennedy via Stack) - HBASE-2002 Coprocessors: Client side support; Support RPC interface - changes at runtime (Gary Helmling via Andrew Purtell) - BUG FIXES HBASE-1791 Timeout in IndexRecordWriter (Bradford Stephens via Andrew diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseEndpointCoprocessor.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseEndpointCoprocessor.java new file mode 100644 index 00000000000..b81a46589be --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseEndpointCoprocessor.java @@ -0,0 +1,93 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; + +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; +import org.apache.hadoop.hbase.regionserver.HRegion; + +/** + * This abstract class provides default implementation of an Endpoint. + * It also maintains a CoprocessorEnvironment object which can be + * used to access region resource. + * + * It's recommended to use this abstract class to implement your Endpoint. + * However you still can just implement the interface CoprocessorProtocol + * and Coprocessor to develop an Endpoint. But you won't be able to access + * the region related resource, i.e., CoprocessorEnvironment. + */ +public abstract class BaseEndpointCoprocessor implements Coprocessor, + CoprocessorProtocol { + private CoprocessorEnvironment env; + + /** + * @param e Coprocessor environment. + */ + private void setEnvironment(CoprocessorEnvironment e) { + env = e; + } + + /** + * @return env Coprocessor environment. + */ + public CoprocessorEnvironment getEnvironment() { + return env; + } + + @Override + public long getProtocolVersion(String arg0, long arg1) throws IOException { + return HBaseRPCProtocolVersion.versionID; + } + + @Override + public void preOpen(CoprocessorEnvironment e) { } + + /** + * It initializes the coprocessor resources. If you need to override this + * method, please remember to call super(e). + */ + @Override + public void postOpen(CoprocessorEnvironment e) { + setEnvironment(e); + } + + @Override + public void preClose(CoprocessorEnvironment e, boolean abortRequested) { } + + @Override + public void postClose(CoprocessorEnvironment e, boolean abortRequested) { } + + @Override + public void preFlush(CoprocessorEnvironment e) { } + + @Override + public void postFlush(CoprocessorEnvironment e) { } + + @Override + public void preCompact(CoprocessorEnvironment e, boolean willSplit) { } + + @Override + public void postCompact(CoprocessorEnvironment e, boolean willSplit) { } + + @Override + public void preSplit(CoprocessorEnvironment e) { } + + @Override + public void postSplit(CoprocessorEnvironment e, HRegion l, HRegion r) { } +} diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java new file mode 100644 index 00000000000..134ed2f6ffe --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java @@ -0,0 +1,225 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.HRegion; +import java.io.IOException; + +/** + * An abstract class that implements Coprocessor and RegionObserver. + * By extending it, you can create you own region observer without + * overriding all abstract methods of Coprocessor and RegionObserver. + */ +public abstract class BaseRegionObserverCoprocessor implements Coprocessor, + RegionObserver { + + @Override + public void preOpen(CoprocessorEnvironment e) { } + + @Override + public void postOpen(CoprocessorEnvironment e) { } + + @Override + public void preClose(CoprocessorEnvironment e, boolean abortRequested) + { } + + @Override + public void postClose(CoprocessorEnvironment e, boolean abortRequested) + { } + + @Override + public void preFlush(CoprocessorEnvironment e) { } + + @Override + public void postFlush(CoprocessorEnvironment e) { } + + @Override + public void preSplit(CoprocessorEnvironment e) { } + + @Override + public void postSplit(CoprocessorEnvironment e, HRegion l, HRegion r) { } + + @Override + public void preCompact(CoprocessorEnvironment e, boolean willSplit) { } + + @Override + public void postCompact(CoprocessorEnvironment e, boolean willSplit) { } + + @Override + public void preGetClosestRowBefore(final CoprocessorEnvironment e, + final byte [] row, final byte [] family) + throws IOException { + } + + @Override + public Result postGetClosestRowBefore(final CoprocessorEnvironment e, + byte[] row, byte[] family, final Result result) + throws IOException { + return result; + } + + @Override + public Get preGet(final CoprocessorEnvironment e, final Get get) + throws IOException { + return get; + } + + @Override + public List postGet(final CoprocessorEnvironment e, final Get get, + List results) throws IOException { + return results; + } + + @Override + public Get preExists(final CoprocessorEnvironment e, final Get get) + throws IOException { + return get; + } + + @Override + public boolean postExists(final CoprocessorEnvironment e, + final Get get, boolean exists) + throws IOException { + return exists; + } + + @Override + public Map> prePut(final CoprocessorEnvironment e, + final Map> familyMap) throws IOException { + return familyMap; + } + + @Override + public void postPut(final CoprocessorEnvironment e, + final Map> familyMap) + throws IOException { + } + + @Override + public Map> preDelete(final CoprocessorEnvironment e, + final Map> familyMap) throws IOException { + return familyMap; + } + + @Override + public void postDelete(CoprocessorEnvironment e, + Map> familyMap) throws IOException { + } + + @Override + public Put preCheckAndPut(final CoprocessorEnvironment e, + final byte [] row, final byte [] family, final byte [] qualifier, + final byte [] value, final Put put) throws IOException { + return put; + } + + @Override + public boolean postCheckAndPut(final CoprocessorEnvironment e, + final byte [] row, final byte [] family, final byte [] qualifier, + final byte [] value, final Put put, final boolean result) + throws IOException { + return result; + } + + @Override + public Delete preCheckAndDelete(final CoprocessorEnvironment e, + final byte [] row, final byte [] family, final byte [] qualifier, + final byte [] value, final Delete delete) + throws IOException { + return delete; + } + + @Override + public boolean postCheckAndDelete(final CoprocessorEnvironment e, + final byte [] row, final byte [] family, final byte [] qualifier, + final byte [] value, final Delete delete, final boolean result) + throws IOException { + return result; + } + + @Override + public long preIncrementColumnValue(final CoprocessorEnvironment e, + final byte [] row, final byte [] family, final byte [] qualifier, + final long amount, final boolean writeToWAL) throws IOException { + return amount; + } + + @Override + public long postIncrementColumnValue(final CoprocessorEnvironment e, + final byte [] row, final byte [] family, final byte [] qualifier, + final long amount, final boolean writeToWAL, long result) + throws IOException { + return result; + } + + @Override + public Increment preIncrement(final CoprocessorEnvironment e, + final Increment increment) + throws IOException { + return increment; + } + + @Override + public Result postIncrement(final CoprocessorEnvironment e, + final Increment increment, + final Result result) throws IOException { + return result; + } + + @Override + public Scan preScannerOpen(final CoprocessorEnvironment e, final Scan scan) + throws IOException { + return scan; + } + + @Override + public void postScannerOpen(final CoprocessorEnvironment e, + final Scan scan, + final long scannerId) throws IOException { } + + @Override + public void preScannerNext(final CoprocessorEnvironment e, + final long scannerId) throws IOException { + } + + @Override + public List postScannerNext(final CoprocessorEnvironment e, + final long scannerId, final List results) + throws IOException { + return results; + } + + @Override + public void preScannerClose(final CoprocessorEnvironment e, + final long scannerId) + throws IOException { } + + @Override + public void postScannerClose(final CoprocessorEnvironment e, + final long scannerId) + throws IOException { } +} diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/Coprocessor.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/Coprocessor.java new file mode 100644 index 00000000000..7ea1c5eb868 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/Coprocessor.java @@ -0,0 +1,120 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import org.apache.hadoop.hbase.regionserver.HRegion; + +/** + * Coprocess interface. + */ +public interface Coprocessor { + public static final int VERSION = 1; + /** + * Installation priority. Coprocessors will be executed in sequence + * by the order of coprocessor priority. + */ + public enum Priority { + HIGHEST(0), + SYSTEM(Integer.MAX_VALUE/4), + USER(Integer.MAX_VALUE/2), + LOWEST(Integer.MAX_VALUE); + + private int prio; + + Priority(int prio) { + this.prio = prio; + } + + public int intValue() { + return prio; + } + } + + // Interface + /** + * Called before the region is reported as open to the master. + * @param e the environment provided by the region server + */ + public void preOpen(final CoprocessorEnvironment e); + + /** + * Called after the region is reported as open to the master. + * @param e the environment provided by the region server + */ + public void postOpen(final CoprocessorEnvironment e); + + /** + * Called before the memstore is flushed to disk. + * @param e the environment provided by the region server + */ + public void preFlush(final CoprocessorEnvironment e); + + /** + * Called after the memstore is flushed to disk. + * @param e the environment provided by the region server + */ + public void postFlush(final CoprocessorEnvironment e); + + /** + * Called before compaction. + * @param e the environment provided by the region server + * @param willSplit true if compaction will result in a split, false + * otherwise + */ + public void preCompact(final CoprocessorEnvironment e, + final boolean willSplit); + + /** + * Called after compaction. + * @param e the environment provided by the region server + * @param willSplit true if compaction will result in a split, false + * otherwise + */ + public void postCompact(final CoprocessorEnvironment e, + final boolean willSplit); + + /** + * Called before the region is split. + * @param e the environment provided by the region server + * (e.getRegion() returns the parent region) + */ + public void preSplit(final CoprocessorEnvironment e); + + /** + * Called after the region is split. + * @param e the environment provided by the region server + * (e.getRegion() returns the parent region) + * @param l the left daughter region + * @param r the right daughter region + */ + public void postSplit(final CoprocessorEnvironment e, final HRegion l, + final HRegion r); + + /** + * Called before the region is reported as closed to the master. + * @param e the environment provided by the region server + * @param abortRequested true if the region server is aborting + */ + public void preClose(final CoprocessorEnvironment e, boolean abortRequested); + + /** + * Called after the region is reported as closed to the master. + * @param e the environment provided by the region server + * @param abortRequested true if the region server is aborting + */ + public void postClose(final CoprocessorEnvironment e, boolean abortRequested); +} diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java new file mode 100644 index 00000000000..654b1791085 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorEnvironment.java @@ -0,0 +1,71 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; + +/** + * Coprocessor environment state. + */ +public interface CoprocessorEnvironment { + + /** @return the Coprocessor interface version */ + public int getVersion(); + + /** @return the HBase version as a string (e.g. "0.21.0") */ + public String getHBaseVersion(); + + /** @return the region associated with this coprocessor */ + public HRegion getRegion(); + + /** @return reference to the region server services */ + public RegionServerServices getRegionServerServices(); + + /** + * @return an interface for accessing the given table + * @throws IOException + */ + public HTableInterface getTable(byte[] tableName) throws IOException; + + // environment variables + + /** + * Get an environment variable + * @param key the key + * @return the object corresponding to the environment variable, if set + */ + public Object get(Object key); + + /** + * Set an environment variable + * @param key the key + * @param value the value + */ + public void put(Object key, Object value); + + /** + * Remove an environment variable + * @param key the key + * @return the object corresponding to the environment variable, if set + */ + public Object remove(Object key); + +} diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorException.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorException.java new file mode 100644 index 00000000000..d4344d0bc6f --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorException.java @@ -0,0 +1,52 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import org.apache.hadoop.hbase.DoNotRetryIOException; + +/** + * Thrown if a coprocessor encounters any exception. + */ +public class CoprocessorException extends DoNotRetryIOException { + private static final long serialVersionUID = 4357922136679804887L; + + /** Default Constructor */ + public CoprocessorException() { + super(); + } + + /** + * Constructor with a Class object and exception message. + * @param clazz + * @param s + */ + public CoprocessorException(Class clazz, String s) { + super( "Coprocessor [" + clazz.getName() + "]: " + s); + } + + /** + * Constructs the exception and supplies a string as the message + * @param s - message + */ + public CoprocessorException(String s) { + super(s); + } + +} diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java new file mode 100644 index 00000000000..10dfff4a142 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -0,0 +1,336 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.coprocessor; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment; +import java.io.IOException; + +/** + * Coprocessors implement this interface to observe and mediate client actions + * on the region. + */ +public interface RegionObserver { + + /** + * Called before a client makes a GetClosestRowBefore request. + * @param e the environment provided by the region server + * @param row the row + * @param family the family + * @throws IOException if an error occurred on the coprocessor + */ + public void preGetClosestRowBefore(final CoprocessorEnvironment e, + final byte [] row, final byte [] family) + throws IOException; + + /** + * Called after a client makes a GetClosestRowBefore request. + * @param e the environment provided by the region server + * @param row the row + * @param family the desired family + * @param result the result set + * @return the possible tranformed result set to return to the client + * @throws IOException if an error occurred on the coprocessor + */ + public Result postGetClosestRowBefore(final CoprocessorEnvironment e, + final byte [] row, final byte [] family, final Result result) + throws IOException; + + /** + * Called before the client perform a get() + * @param e the environment provided by the region server + * @param get the Get request + * @return the possibly transformed Get object by coprocessor + * @throws IOException if an error occurred on the coprocessor + */ + public Get preGet(final CoprocessorEnvironment e, final Get get) + throws IOException; + + /** + * Called after the client perform a get() + * @param e the environment provided by the region server + * @param get the Get request + * @param results the result list + * @return the possibly transformed result list to return to client + * @throws IOException if an error occurred on the coprocessor + */ + public List postGet(final CoprocessorEnvironment e, final Get get, + final List results) + throws IOException; + + /** + * Called before the client tests for existence using a Get. + * @param e the environment provided by the region server + * @param get the Get request + * @return the possibly transformed Get object by coprocessor + * @throws IOException if an error occurred on the coprocessor + */ + public Get preExists(final CoprocessorEnvironment e, final Get get) + throws IOException; + + /** + * Called after the client tests for existence using a Get. + * @param e the environment provided by the region server + * @param get the Get request + * @param exists the result returned by the region server + * @return the result to return to the client + * @throws IOException if an error occurred on the coprocessor + */ + public boolean postExists(final CoprocessorEnvironment e, final Get get, + final boolean exists) + throws IOException; + + /** + * Called before the client stores a value. + * @param e the environment provided by the region server + * @param familyMap map of family to edits for the given family. + * @return the possibly transformed map to actually use + * @throws IOException if an error occurred on the coprocessor + */ + public Map> prePut(final CoprocessorEnvironment e, + final Map> familyMap) + throws IOException; + + /** + * Called after the client stores a value. + * @param e the environment provided by the region server + * @param familyMap map of family to edits for the given family. + * @throws IOException if an error occurred on the coprocessor + */ + public void postPut(final CoprocessorEnvironment e, final Map> familyMap) + throws IOException; + + /** + * Called before the client deletes a value. + * @param e the environment provided by the region server + * @param familyMap map of family to edits for the given family. + * @return the possibly transformed map to actually use + * @throws IOException if an error occurred on the coprocessor + */ + public Map> preDelete(final CoprocessorEnvironment e, + final Map> familyMap) + throws IOException; + + /** + * Called after the client deletes a value. + * @param e the environment provided by the region server + * @param familyMap map of family to edits for the given family. + * @throws IOException if an error occurred on the coprocessor + */ + public void postDelete(final CoprocessorEnvironment e, + final Map> familyMap) + throws IOException; + + /** + * Called before checkAndPut + * @param e the environment provided by the region server + * @param row row to check + * @param family column family + * @param qualifier column qualifier + * @param value the expected value + * @param put data to put if check succeeds + * @return the possibly transformed map to actually use + * @throws IOException if an error occurred on the coprocessor + */ + public Put preCheckAndPut(final CoprocessorEnvironment e, + final byte [] row, final byte [] family, final byte [] qualifier, + final byte [] value, final Put put) + throws IOException; + + /** + * Called after checkAndPut + * @param e the environment provided by the region server + * @param row row to check + * @param family column family + * @param qualifier column qualifier + * @param value the expected value + * @param put data to put if check succeeds + * @param result from the checkAndPut + * @return the possibly transformed value to return to client + * @throws IOException if an error occurred on the coprocessor + */ + public boolean postCheckAndPut(final CoprocessorEnvironment e, + final byte [] row, final byte [] family, final byte [] qualifier, + final byte [] value, final Put put, final boolean result) + throws IOException; + + /** + * Called before checkAndPut + * @param e the environment provided by the region server + * @param row row to check + * @param family column family + * @param qualifier column qualifier + * @param value the expected value + * @param delete delete to commit if check succeeds + * @return the possibly transformed map to actually use + * @throws IOException if an error occurred on the coprocessor + */ + public Delete preCheckAndDelete(final CoprocessorEnvironment e, + final byte [] row, final byte [] family, final byte [] qualifier, + final byte [] value, final Delete delete) + throws IOException; + + /** + * Called after checkAndDelete + * @param e the environment provided by the region server + * @param row row to check + * @param family column family + * @param qualifier column qualifier + * @param value the expected value + * @param delete delete to commit if check succeeds + * @param result from the CheckAndDelete + * @return the possibly transformed value to return to client + * @throws IOException if an error occurred on the coprocessor + */ + public boolean postCheckAndDelete(final CoprocessorEnvironment e, + final byte [] row, final byte [] family, final byte [] qualifier, + final byte [] value, final Delete delete, final boolean result) + throws IOException; + + /** + * Called before incrementColumnValue + * @param e the environment provided by the region server + * @param row row to check + * @param family column family + * @param qualifier column qualifier + * @param amount long amount to increment + * @param writeToWAL whether to write the increment to the WAL + * @return new amount to increment + * @throws IOException if an error occurred on the coprocessor + */ + public long preIncrementColumnValue(final CoprocessorEnvironment e, + final byte [] row, final byte [] family, final byte [] qualifier, + final long amount, final boolean writeToWAL) + throws IOException; + + /** + * Called after incrementColumnValue + * @param e the environment provided by the region server + * @param row row to check + * @param family column family + * @param qualifier column qualifier + * @param amount long amount to increment + * @param writeToWAL whether to write the increment to the WAL + * @param result the result returned by incrementColumnValue + * @return the result to return to the client + * @throws IOException if an error occurred on the coprocessor + */ + public long postIncrementColumnValue(final CoprocessorEnvironment e, + final byte [] row, final byte [] family, final byte [] qualifier, + final long amount, final boolean writeToWAL, final long result) + throws IOException; + + /** + * Called before incrementColumnValue + * @param e the environment provided by the region server + * @param increment increment object + * @param writeToWAL whether to write the increment to the WAL + * @return new Increment instance + * @throws IOException if an error occurred on the coprocessor + */ + public Increment preIncrement(final CoprocessorEnvironment e, + final Increment increment) + throws IOException; + + /** + * Called after increment + * @param e the environment provided by the region server + * @param increment increment object + * @param writeToWAL whether to write the increment to the WAL + * @param result the result returned by increment + * @return the result to return to the client + * @throws IOException if an error occurred on the coprocessor + */ + public Result postIncrement(final CoprocessorEnvironment e, + final Increment increment, final Result result) + throws IOException; + + /** + * Called before the client opens a new scanner. + * @param e the environment provided by the region server + * @param scan the Scan specification + * @return the possibly transformed Scan to actually use + * @throws IOException if an error occurred on the coprocessor + */ + public Scan preScannerOpen(final CoprocessorEnvironment e, final Scan scan) + throws IOException; + + /** + * Called after the client opens a new scanner. + * @param e the environment provided by the region server + * @param scan the Scan specification + * @param scannerId the scanner id allocated by the region server + * @throws IOException if an error occurred on the coprocessor + */ + public void postScannerOpen(final CoprocessorEnvironment e, final Scan scan, + final long scannerId) + throws IOException; + + /** + * Called before the client asks for the next row on a scanner. + * @param e the environment provided by the region server + * @param scannerId the scanner id + * @param results the result set returned by the region server + * @return the possibly transformed result set to actually return + * @throws IOException if an error occurred on the coprocessor + */ + public void preScannerNext(final CoprocessorEnvironment e, + final long scannerId) + throws IOException; + + /** + * Called after the client asks for the next row on a scanner. + * @param e the environment provided by the region server + * @param scannerId the scanner id + * @param results the result set returned by the region server + * @return the possibly transformed result set to actually return + * @throws IOException if an error occurred on the coprocessor + */ + public List postScannerNext(final CoprocessorEnvironment e, + final long scannerId, final List results) + throws IOException; + + /** + * Called before the client closes a scanner. + * @param e the environment provided by the region server + * @param scannerId the scanner id + * @throws IOException if an error occurred on the coprocessor + */ + public void preScannerClose(final CoprocessorEnvironment e, + final long scannerId) + throws IOException; + + /** + * Called after the client closes a scanner. + * @param e the environment provided by the region server + * @param scannerId the scanner id + * @throws IOException if an error occurred on the coprocessor + */ + public void postScannerClose(final CoprocessorEnvironment e, + final long scannerId) + throws IOException; +} diff --git a/src/main/java/org/apache/hadoop/hbase/coprocessor/package-info.java b/src/main/java/org/apache/hadoop/hbase/coprocessor/package-info.java new file mode 100644 index 00000000000..1b7918cb826 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/coprocessor/package-info.java @@ -0,0 +1,367 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + +

Table of Contents

+ + +

Overview

+Coprocessors are code that runs in-process on each region server. Regions +contain references to the coprocessor implementation classes associated +with them. Coprocessor classes can be loaded either from local +jars on the region server's classpath or via the HDFS classloader. +

+Multiple types of coprocessors are provided to provide sufficient flexibility +for potential use cases. Right now there are: +

+

    +
  • Coprocessor: provides region lifecycle management hooks, e.g., region +open/close/split/flush/compact operations.
  • +
  • RegionObserver: provides hook for monitor table operations from +client side, such as table get/put/scan/delete, etc.
  • +
  • Endpoint: provides on demand triggers for any arbitrary function +executed at a region. One use case is column aggregation at region +server.
  • +
+ +

Coprocessor

+A coprocessor is required to +implement Coprocessor interface so that coprocessor framework +can manage it internally. +

+Another design goal of this interface is to provide simple features for +making coprocessors useful, while exposing no more internal state or +control actions of the region server than necessary and not exposing them +directly. +

+Over the lifecycle of a region, the methods of this interface are invoked +when the corresponding events happen. The master transitions regions +through the following states: +

+    +unassigned -> pendingOpen -> open -> pendingClose -> closed. +

+Coprocessors have opportunity to intercept and handle events in +pendingOpen, open, and pendingClose states. +

+ +

PendingOpen

+

+The region server is opening a region to bring it online. Coprocessors +can piggyback or fail this process. +

+

    +
  • preOpen, postOpen: Called before and after the region is reported as + online to the master.
  • +

+

+

Open

+The region is open on the region server and is processing both client +requests (get, put, scan, etc.) and administrative actions (flush, compact, +split, etc.). Coprocessors can piggyback administrative actions via: +

+

    +
  • preFlush, postFlush: Called before and after the memstore is flushed + into a new store file.
  • +

  • preCompact, postCompact: Called before and after compaction.
  • +

  • preSplit, postSplit: Called after the region is split.
  • +

+

+

PendingClose

+The region server is closing the region. This can happen as part of normal +operations or may happen when the region server is aborting due to fatal +conditions such as OOME, health check failure, or fatal filesystem +problems. Coprocessors can piggyback this event. If the server is aborting +an indication to this effect will be passed as an argument. +

+

    +
  • preClose and postClose: Called before and after the region is + reported as closed to the master.
  • +

+

+ +

RegionObserver

+If the coprocessor implements the RegionObserver interface it can +observe and mediate client actions on the region: +

+

    +
  • preGet, postGet: Called before and after a client makes a Get + request.
  • +

  • preExists, postExists: Called before and after the client tests + for existence using a Get.
  • +

  • prePut and postPut: Called before and after the client stores a value. +
  • +

  • preDelete and postDelete: Called before and after the client + deletes a value.
  • +

  • preScannerOpen postScannerOpen: Called before and after the client + opens a new scanner.
  • +

  • preScannerNext, postScannerNext: Called before and after the client + asks for the next row on a scanner.
  • +

  • preScannerClose, postScannerClose: Called before and after the client + closes a scanner.
  • +

  • preCheckAndPut, postCheckAndPut: Called before and after the client + calls checkAndPut().
  • +

  • preCheckAndDelete, postCheckAndDelete: Called before and after the client + calls checkAndDelete().
  • +

+You can also extend abstract class BaseRegionObserverCoprocessor +which +implements both Coprocessor and RegionObserver. +In addition, it overrides all methods with default behaviors so you don't +have to override all of them. +

+Here's an example of what a simple RegionObserver might look like. This +example shows how to implement access control for HBase. This +coprocessor checks user information for a given client request, e.g., +Get/Put/Delete/Scan by injecting code at certain +RegionObserver +preXXX hooks. If the user is not allowed to access the resource, a +CoprocessorException will be thrown. And the client request will be +denied by receiving this exception. +

+
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.util.List;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+
+// Sample access-control coprocessor. It utilizes RegionObserver
+// and intercept preXXX() method to check user privilege for the given table
+// and column family.
+public class AccessControlCoprocessor extends BaseRegionObserverCoprocessor {
+  @Override
+  public Get preGet(CoprocessorEnvironment e, Get get)
+      throws CoprocessorException {
+
+    // check permissions..
+    if (access_not_allowed)  {
+      throw new AccessDeniedException("User is not allowed to access.");
+    }
+    return get;
+  }
+
+  // override prePut(), preDelete(), etc.
+}
+
+
+ +

Endpoint

+Coprocessor and RegionObserver provide certain hooks +for injecting user code running at each region. The user code will be triggerd +by existing HTable and HBaseAdmin operations at +the certain hook points. +

+Through Endpoint and dynamic RPC protocol, you can define your own +interface communicated between client and region server, +i.e., you can create a new method, specify passed parameters and return types +for this new method. +And the new Endpoint methods can be triggered by +calling client side dynamic RPC functions -- HTable.coprocessorExec(...) +. +

+To implement a Endpoint, you need to: +

    +
  • Extend CoprocessorProtocol: the interface defines +communication protocol for the new Endpoint, and will be +served as the RPC protocol between client and region server.
  • +
  • Extend both BaseEndpointCoprocessor abstract class, +and the above extended CoprocessorProtocol interface: +the actually implemented class running at region server.
  • +
+

+Here's an example of performing column aggregation at region server: +

+
+// A sample protocol for performing aggregation at regions.
+public static interface ColumnAggregationProtocol
+extends CoprocessorProtocol {
+  // Perform aggregation for a given column at the region. The aggregation
+  // will include all the rows inside the region. It can be extended to
+  // allow passing start and end rows for a fine-grained aggregation.
+  public int sum(byte[] family, byte[] qualifier) throws IOException;
+}
+// Aggregation implementation at a region.
+public static class ColumnAggregationEndpoint extends BaseEndpointCoprocessor
+implements ColumnAggregationProtocol {
+  @Override
+   // Scan the region by the given family and qualifier. Return the aggregation
+   // result.
+  public int sum(byte[] family, byte[] qualifier)
+  throws IOException {
+    // aggregate at each region
+    Scan scan = new Scan();
+    scan.addColumn(family, qualifier);
+    int sumResult = 0;
+    // use an internal scanner to perform scanning.
+    InternalScanner scanner = getEnvironment().getRegion().getScanner(scan);
+    try {
+      List<KeyValue> curVals = new ArrayList<KeyValue>();
+      boolean done = false;
+      do {
+        curVals.clear();
+        done = scanner.next(curVals);
+        KeyValue kv = curVals.get(0);
+        sumResult += Bytes.toInt(kv.getValue());
+      } while (done);
+    } finally {
+      scanner.close();
+    }
+    return sumResult;
+  }
+}
+
+
+

+Client invocations are performed through HTable, +which has the following methods added by dynamic RPC protocol: + +

+
+public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol, Row row)
+
+public <T extends CoprocessorProtocol, R> void coprocessorExec(
+    Class<T> protocol, List<? extends Row> rows,
+    BatchCall<T,R> callable, BatchCallback<R> callback)
+
+public <T extends CoprocessorProtocol, R> void coprocessorExec(
+    Class<T> protocol, RowRange range,
+    BatchCall<T,R> callable, BatchCallback<R> callback)
+
+
+ +

+Here is a client side example of invoking +ColumnAggregationEndpoint: +

+
+HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
+Scan scan;
+Map<byte[], Integer> results;
+
+// scan: for all regions
+scan = new Scan();
+results = table.coprocessorExec(ColumnAggregationProtocol.class, scan,
+    new BatchCall<ColumnAggregationProtocol,Integer>() {
+      public Integer call(ColumnAggregationProtocol instance) throws IOException{
+        return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
+      }
+    });
+int sumResult = 0;
+int expectedResult = 0;
+for (Map.Entry<byte[], Integer> e : results.entrySet()) {
+  sumResult += e.getValue();
+}
+
+
+

Coprocess loading

+A customized coprocessor can be loaded by two different ways, by configuration, +or by HTableDescriptor for a newly created table. +

+(Currently we don't really have an on demand coprocessor loading machanism for +opened regions.) +

Load from configuration

+Whenever a region is opened, it will read coprocessor class names from +hbase.coprocessor.default.classes from Configuration. +Coprocessor framework will automatically load the configured classes as +default coprocessors. The classes must be included in the classpath already. + +

+

+
+  <property>
+    <name>hbase.coprocessor.default.classes</name>
+    <value>org.apache.hadoop.hbase.coprocessor.AccessControllCoprocessor, org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol</value>
+    <description>A comma-separated list of Coprocessors that are loaded by
+    default. For any override coprocessor method from RegionObservor or
+    Coprocessor, these classes' implementation will be called
+    in order. After implement your own
+    Coprocessor, just put it in HBase's classpath and add the fully
+    qualified class name here.
+    </description>
+  </property>
+
+
+

+The first defined coprocessor will be assigned +Coprocessor.Priority.SYSTEM as priority. And each following +coprocessor's priority will be incremented by one. Coprocessors are executed +in order according to the natural ordering of the int. + +

Load from table attribute

+Coprocessor classes can also be configured at table attribute. The +attribute key must start with "Coprocessor" and values of the form is +<path>:<class>:<priority>, so that the framework can +recognize and load it. +

+

+
+'COPROCESSOR$1' => 'hdfs://localhost:8020/hbase/coprocessors/test.jar:Test:1000'
+'COPROCESSOR$2' => '/hbase/coprocessors/test2.jar:AnotherTest:1001'
+
+
+

+<path> must point to a jar, can be on any filesystem supported by the +Hadoop FileSystem object. +

+<class> is the coprocessor implementation class. A jar can contain +more than one coprocessor implementation, but only one can be specified +at a time in each table attribute. +

+<priority> is an integer. Coprocessors are executed in order according +to the natural ordering of the int. Coprocessors can optionally abort +actions. So typically one would want to put authoritative CPs (security +policy implementations, perhaps) ahead of observers. +

+

+
+  Path path = new Path(fs.getUri() + Path.SEPARATOR +
+    "TestClassloading.jar");
+
+  // create a table that references the jar
+  HTableDescriptor htd = new HTableDescriptor(getClass().getName());
+  htd.addFamily(new HColumnDescriptor("test"));
+  htd.setValue("Coprocessor$1",
+    path.toString() +
+    ":" + classFullName +
+    ":" + Coprocessor.Priority.USER);
+  HBaseAdmin admin = new HBaseAdmin(this.conf);
+  admin.createTable(htd);
+
+

Chain of RegionObservers

+As described above, multiple coprocessors can be loaded at one region at the +same time. In case of RegionObserver, you can have more than one +RegionObservers register to one same hook point, i.e, preGet(), etc. +When a region reach the +hook point, the framework will invoke each registered RegionObserver by the +order of assigned priority. + +
+
+ +*/ +package org.apache.hadoop.hbase.coprocessor; diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java b/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java new file mode 100644 index 00000000000..c57ca0cde88 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/CoprocessorHost.java @@ -0,0 +1,1241 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Call; +import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback; +import org.apache.hadoop.hbase.coprocessor.Coprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.ipc.CoprocessorProtocol; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.VersionInfo; +import org.apache.hadoop.util.StringUtils; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Implements the coprocessor environment and runtime support. + */ +public class CoprocessorHost { + + /** + * Environment priority comparator. + * Coprocessors are chained in sorted order. + */ + static class EnvironmentPriorityComparator implements Comparator { + public int compare(Environment env1, Environment env2) { + if (env1.priority.intValue() < env2.priority.intValue()) { + return -1; + } else if (env1.priority.intValue() > env2.priority.intValue()) { + return 1; + } + return 0; + } + } + + /** + * Encapsulation of the environment of each coprocessor + */ + class Environment implements CoprocessorEnvironment { + + /** + * A wrapper for HTable. Can be used to restrict privilege. + * + * Currently it just helps to track tables opened by a Coprocessor and + * facilitate close of them if it is aborted. + * + * We also disallow row locking. + * + * There is nothing now that will stop a coprocessor from using HTable + * objects directly instead of this API, but in the future we intend to + * analyze coprocessor implementations as they are loaded and reject those + * which attempt to use objects and methods outside the Environment + * sandbox. + */ + class HTableWrapper implements HTableInterface { + + private byte[] tableName; + private HTable table; + + public HTableWrapper(byte[] tableName) throws IOException { + this.tableName = tableName; + this.table = new HTable(tableName); + openTables.add(this); + } + + void internalClose() throws IOException { + table.close(); + } + + public Configuration getConfiguration() { + return table.getConfiguration(); + } + + public void close() throws IOException { + try { + internalClose(); + } finally { + openTables.remove(this); + } + } + + public Result getRowOrBefore(byte[] row, byte[] family) + throws IOException { + return table.getRowOrBefore(row, family); + } + + public Result get(Get get) throws IOException { + return table.get(get); + } + + public boolean exists(Get get) throws IOException { + return table.exists(get); + } + + public void put(Put put) throws IOException { + table.put(put); + } + + public void put(List puts) throws IOException { + table.put(puts); + } + + public void delete(Delete delete) throws IOException { + table.delete(delete); + } + + public void delete(List deletes) throws IOException { + table.delete(deletes); + } + + public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Put put) throws IOException { + return table.checkAndPut(row, family, qualifier, value, put); + } + + public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + byte[] value, Delete delete) throws IOException { + return table.checkAndDelete(row, family, qualifier, value, delete); + } + + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount) throws IOException { + return table.incrementColumnValue(row, family, qualifier, amount); + } + + public long incrementColumnValue(byte[] row, byte[] family, + byte[] qualifier, long amount, boolean writeToWAL) + throws IOException { + return table.incrementColumnValue(row, family, qualifier, amount, + writeToWAL); + } + + @Override + public Result increment(Increment increment) throws IOException { + return table.increment(increment); + } + + public void flushCommits() throws IOException { + table.flushCommits(); + } + + public boolean isAutoFlush() { + return table.isAutoFlush(); + } + + public ResultScanner getScanner(Scan scan) throws IOException { + return table.getScanner(scan); + } + + public ResultScanner getScanner(byte[] family) throws IOException { + return table.getScanner(family); + } + + public ResultScanner getScanner(byte[] family, byte[] qualifier) + throws IOException { + return table.getScanner(family, qualifier); + } + + public HTableDescriptor getTableDescriptor() throws IOException { + return table.getTableDescriptor(); + } + + public byte[] getTableName() { + return tableName; + } + + public RowLock lockRow(byte[] row) throws IOException { + throw new RuntimeException( + "row locking is not allowed within the coprocessor environment"); + } + + public void unlockRow(RowLock rl) throws IOException { + throw new RuntimeException( + "row locking is not allowed within the coprocessor environment"); + } + + @Override + public void batch(List actions, Object[] results) + throws IOException, InterruptedException { + table.batch(actions, results); + } + + @Override + public Object[] batch(List actions) + throws IOException, InterruptedException { + return table.batch(actions); + } + + @Override + public Result[] get(List gets) throws IOException { + return table.get(gets); + } + + @Override + public void coprocessorExec(Class protocol, + byte[] startKey, byte[] endKey, Call callable, + Callback callback) throws IOException, Throwable { + table.coprocessorExec(protocol, startKey, endKey, callable, callback); + } + + @Override + public Map coprocessorExec( + Class protocol, byte[] startKey, byte[] endKey, Call callable) + throws IOException, Throwable { + return table.coprocessorExec(protocol, startKey, endKey, callable); + } + + @Override + public T coprocessorProxy(Class protocol, + byte[] row) { + return table.coprocessorProxy(protocol, row); + } + } + + /** The coprocessor */ + Coprocessor impl; + /** Environment variables */ + Map vars = new ConcurrentHashMap(); + /** Chaining priority */ + Coprocessor.Priority priority = Coprocessor.Priority.USER; + /** Accounting for tables opened by the coprocessor */ + List openTables = + Collections.synchronizedList(new ArrayList()); + + /** + * Constructor + * @param impl the coprocessor instance + * @param priority chaining priority + */ + public Environment(final Coprocessor impl, Coprocessor.Priority priority) { + this.impl = impl; + this.priority = priority; + } + + /** Clean up the environment */ + void shutdown() { + // clean up any table references + for (HTableInterface table: openTables) { + try { + ((HTableWrapper)table).internalClose(); + } catch (IOException e) { + // nothing can be done here + LOG.warn("Failed to close " + + Bytes.toStringBinary(table.getTableName()), e); + } + } + } + + /** @return the coprocessor environment version */ + @Override + public int getVersion() { + return Coprocessor.VERSION; + } + + /** @return the HBase release */ + @Override + public String getHBaseVersion() { + return VersionInfo.getVersion(); + } + + /** @return the region */ + @Override + public HRegion getRegion() { + return region; + } + + /** @return reference to the region server services */ + @Override + public RegionServerServices getRegionServerServices() { + return rsServices; + } + + /** + * Open a table from within the Coprocessor environment + * @param tableName the table name + * @return an interface for manipulating the table + * @exception IOException Exception + */ + @Override + public HTableInterface getTable(byte[] tableName) throws IOException { + return new HTableWrapper(tableName); + } + + /** + * @param key the key + * @return the value, or null if it does not exist + */ + @Override + public Object get(Object key) { + return vars.get(key); + } + + /** + * @param key the key + * @param value the value + */ + @Override + public void put(Object key, Object value) { + vars.put(key, value); + } + + /** + * @param key the key + */ + @Override + public Object remove(Object key) { + return vars.remove(key); + } + } + + static final Log LOG = LogFactory.getLog(CoprocessorHost.class); + static final Pattern attrSpecMatch = Pattern.compile("(.+):(.+):(.+)"); + + /** The region server services */ + RegionServerServices rsServices; + /** The region */ + HRegion region; + /** Ordered set of loaded coprocessors with lock */ + final ReentrantReadWriteLock coprocessorLock = new ReentrantReadWriteLock(); + Set coprocessors = + new TreeSet(new EnvironmentPriorityComparator()); + + /** + * Constructor + * @param server the regionServer + * @param region the region + * @param conf the configuration + */ + public CoprocessorHost(final HRegion region, + final RegionServerServices rsServices, final Configuration conf) { + this.rsServices = rsServices; + this.region = region; + + // load system default cp's from configuration. + loadSystemCoprocessors(conf); + + // load Coprocessor From HDFS + loadTableCoprocessors(); + } + + /** + * Load system coprocessors. Read the class names from configuration. + * Called by constructor. + */ + private void loadSystemCoprocessors(Configuration conf) { + Class implClass = null; + + // load default coprocessors from configure file + String defaultCPClasses = conf.get("hbase.coprocessor.default.classes"); + if (defaultCPClasses == null || defaultCPClasses.length() == 0) + return; + StringTokenizer st = new StringTokenizer(defaultCPClasses, ","); + int priority = Coprocessor.Priority.SYSTEM.intValue(); + while (st.hasMoreTokens()) { + String className = st.nextToken(); + if (findCoprocessor(className) != null) { + continue; + } + ClassLoader cl = ClassLoader.getSystemClassLoader(); + Thread.currentThread().setContextClassLoader(cl); + try { + implClass = cl.loadClass(className); + load(implClass, Coprocessor.Priority.SYSTEM); + LOG.info("System coprocessor " + className + " was loaded " + + "successfully with priority (" + priority++ + ")."); + } catch (ClassNotFoundException e) { + LOG.warn("Class " + className + " cannot be found. " + + e.getMessage()); + } catch (IOException e) { + LOG.warn("Load coprocessor " + className + " failed. " + + e.getMessage()); + } + } + } + + /** + * Load a coprocessor implementation into the host + * @param path path to implementation jar + * @param className the main class name + * @param priority chaining priority + * @throws IOException Exception + */ + @SuppressWarnings("deprecation") + public void load(Path path, String className, Coprocessor.Priority priority) + throws IOException { + Class implClass = null; + + // Have we already loaded the class, perhaps from an earlier region open + // for the same table? + try { + implClass = getClass().getClassLoader().loadClass(className); + } catch (ClassNotFoundException e) { + LOG.info("Class " + className + " needs to be loaded from a file - " + + path.toString() + "."); + // go ahead to load from file system. + } + + // If not, load + if (implClass == null) { + // copy the jar to the local filesystem + if (!path.toString().endsWith(".jar")) { + throw new IOException(path.toString() + ": not a jar file?"); + } + FileSystem fs = path.getFileSystem(HBaseConfiguration.create()); + Path dst = new Path("/tmp/." + + region.getRegionNameAsString().replace(',', '_') + + "." + className + "." + System.currentTimeMillis() + ".jar"); + fs.copyToLocalFile(path, dst); + fs.deleteOnExit(dst); + + // TODO: code weaving goes here + + // TODO: wrap heap allocations and enforce maximum usage limits + + /* TODO: inject code into loop headers that monitors CPU use and + aborts runaway user code */ + + // load the jar and get the implementation main class + String cp = System.getProperty("java.class.path"); + // NOTE: Path.toURL is deprecated (toURI instead) but the URLClassLoader + // unsuprisingly wants URLs, not URIs; so we will use the deprecated + // method which returns URLs for as long as it is available + List paths = new ArrayList(); + paths.add(new File(dst.toString()).getCanonicalFile().toURL()); + StringTokenizer st = new StringTokenizer(cp, File.pathSeparator); + while (st.hasMoreTokens()) { + paths.add((new File(st.nextToken())).getCanonicalFile().toURL()); + } + ClassLoader cl = new URLClassLoader(paths.toArray(new URL[]{}), + ClassLoader.getSystemClassLoader()); + Thread.currentThread().setContextClassLoader(cl); + try { + implClass = cl.loadClass(className); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } + } + + load(implClass, priority); + } + + /** + * @param implClass Implementation class + * @param priority priority + * @throws IOException Exception + */ + public void load(Class implClass, Coprocessor.Priority priority) + throws IOException { + // create the instance + Coprocessor impl; + Object o = null; + try { + o = implClass.newInstance(); + impl = (Coprocessor)o; + } catch (InstantiationException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } + // create the environment + Environment env = new Environment(impl, priority); + + // Check if it's an Endpoint. + // Due to current dynamic protocol design, Endpoint + // uses a different way to be registered and executed. + // It uses a visitor pattern to invoke registered Endpoint + // method. + for (Class c : implClass.getInterfaces()) { + if (CoprocessorProtocol.class.isAssignableFrom(c)) { + region.registerProtocol(c, (CoprocessorProtocol)o); + break; + } + } + try { + coprocessorLock.writeLock().lock(); + coprocessors.add(env); + } finally { + coprocessorLock.writeLock().unlock(); + } + } + + /** + * Find a coprocessor implementation by class name + * @param className the class name + * @return the coprocessor, or null if not found + */ + public Coprocessor findCoprocessor(String className) { + // initialize the coprocessors + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl.getClass().getName().equals(className)) { + return env.impl; + } + } + for (Environment env: coprocessors) { + if (env.impl.getClass().getName().endsWith(className)) { + return env.impl; + } + } + return null; + } finally { + coprocessorLock.readLock().unlock(); + } + } + + void loadTableCoprocessors () { + // scan the table attributes for coprocessor load specifications + // initialize the coprocessors + for (Map.Entry e: + region.getTableDesc().getValues().entrySet()) { + String key = Bytes.toString(e.getKey().get()); + if (key.startsWith("COPROCESSOR")) { + // found one + try { + String spec = Bytes.toString(e.getValue().get()); + Matcher matcher = attrSpecMatch.matcher(spec); + if (matcher.matches()) { + Path path = new Path(matcher.group(1)); + String className = matcher.group(2); + Coprocessor.Priority priority = + Coprocessor.Priority.valueOf(matcher.group(3)); + load(path, className, priority); + LOG.info("Load coprocessor " + className + " from HTD of " + + Bytes.toString(region.getTableDesc().getName()) + + " successfully."); + } else { + LOG.warn("attribute '" + key + "' has invalid coprocessor spec"); + } + } catch (IOException ex) { + LOG.warn(StringUtils.stringifyException(ex)); + } + } + } + } + + /** + * Invoked before a region open + */ + public void preOpen() { + loadTableCoprocessors(); + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + env.impl.preOpen(env); + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * Invoked after a region open + */ + public void postOpen() { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + env.impl.postOpen(env); + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * Invoked before a region is closed + * @param abortRequested true if the server is aborting + */ + public void preClose(boolean abortRequested) { + try { + coprocessorLock.writeLock().lock(); + for (Environment env: coprocessors) { + env.impl.preClose(env, abortRequested); + env.shutdown(); + } + } finally { + coprocessorLock.writeLock().unlock(); + } + } + + /** + * Invoked after a region is closed + * @param abortRequested true if the server is aborting + */ + public void postClose(boolean abortRequested) { + try { + coprocessorLock.writeLock().lock(); + for (Environment env: coprocessors) { + env.impl.postClose(env, abortRequested); + env.shutdown(); + } + } finally { + coprocessorLock.writeLock().unlock(); + } + } + + /** + * Invoked before a region is compacted. + * @param willSplit true if the compaction is about to trigger a split + */ + public void preCompact(boolean willSplit) { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + env.impl.preCompact(env, willSplit); + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * Invoked after a region is compacted. + * @param willSplit true if the compaction is about to trigger a split + */ + public void postCompact(boolean willSplit) { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + env.impl.postCompact(env, willSplit); + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * Invoked before a memstore flush + */ + public void preFlush() { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + env.impl.preFlush(env); + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * Invoked after a memstore flush + */ + public void postFlush() { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + env.impl.postFlush(env); + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * Invoked just before a split + */ + public void preSplit() { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + env.impl.preSplit(env); + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * Invoked just after a split + * @param l the new left-hand daughter region + * @param r the new right-hand daughter region + */ + public void postSplit(HRegion l, HRegion r) { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + env.impl.postSplit(env, l, r); + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + // RegionObserver support + + /** + * @param row the row key + * @param family the family + * @param result the result set from the region + * @exception IOException Exception + */ + public void preGetClosestRowBefore(final byte[] row, final byte[] family) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + ((RegionObserver)env.impl).preGetClosestRowBefore(env, row, family); + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param row the row key + * @param family the family + * @param result the result set from the region + * @return the result set to return to the client + * @exception IOException Exception + */ + public Result postGetClosestRowBefore(final byte[] row, final byte[] family, + Result result) throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + result = ((RegionObserver)env.impl) + .postGetClosestRowBefore(env, row, family, result); + } + } + return result; + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param get the Get request + * @return the possibly transformed Get object by coprocessor + * @exception IOException Exception + */ + public Get preGet(Get get) throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + get = ((RegionObserver)env.impl).preGet(env, get); + } + } + return get; + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param get the Get request + * @param results the result set + * @return the possibly transformed result set to use + * @exception IOException Exception + */ + public List postGet(final Get get, List results) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + results = ((RegionObserver)env.impl).postGet(env, get, results); + } + } + return results; + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param get the Get request + * @param exists the result returned by the region server + * @exception IOException Exception + */ + public Get preExists(Get get) throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + get = ((RegionObserver)env.impl).preExists(env, get); + } + } + return get; + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param get the Get request + * @param exists the result returned by the region server + * @return the result to return to the client + * @exception IOException Exception + */ + public boolean postExists(final Get get, boolean exists) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + exists &= ((RegionObserver)env.impl).postExists(env, get, exists); + } + } + return exists; + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param familyMap map of family to edits for the given family. + * @return the possibly transformed map to actually use + * @exception IOException Exception + */ + public Map> prePut(Map> familyMap) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + familyMap = ((RegionObserver)env.impl).prePut(env, familyMap); + } + } + return familyMap; + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param familyMap map of family to edits for the given family. + * @exception IOException Exception + */ + public void postPut(Map> familyMap) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + ((RegionObserver)env.impl).postPut(env, familyMap); + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param familyMap map of family to edits for the given family. + * @return the possibly transformed map to actually use + * @exception IOException Exception + */ + public Map> preDelete(Map> familyMap) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + familyMap = ((RegionObserver)env.impl).preDelete(env, familyMap); + } + } + return familyMap; + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param familyMap map of family to edits for the given family. + * @exception IOException Exception + */ + public void postDelete(Map> familyMap) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + ((RegionObserver)env.impl).postDelete(env, familyMap); + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param row row to check + * @param family column family + * @param qualifier column qualifier + * @param value the expected value + * @param put data to put if check succeeds + * @throws IOException e + */ + public Put preCheckAndPut(final byte [] row, final byte [] family, + final byte [] qualifier, final byte [] value, Put put) + throws IOException + { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + put = ((RegionObserver)env.impl).preCheckAndPut(env, row, family, + qualifier, value, put); + } + } + return put; + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param row row to check + * @param family column family + * @param qualifier column qualifier + * @param value the expected value + * @param put data to put if check succeeds + * @throws IOException e + */ + public boolean postCheckAndPut(final byte [] row, final byte [] family, + final byte [] qualifier, final byte [] value, final Put put, + boolean result) + throws IOException + { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + result = ((RegionObserver)env.impl).postCheckAndPut(env, row, + family, qualifier, value, put, result); + } + } + return result; + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param row row to check + * @param family column family + * @param qualifier column qualifier + * @param value the expected value + * @param delete delete to commit if check succeeds + * @throws IOException e + */ + public Delete preCheckAndDelete(final byte [] row, final byte [] family, + final byte [] qualifier, final byte [] value, Delete delete) + throws IOException + { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + delete = ((RegionObserver)env.impl).preCheckAndDelete(env, row, + family, qualifier, value, delete); + } + } + return delete; + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param row row to check + * @param family column family + * @param qualifier column qualifier + * @param value the expected value + * @param delete delete to commit if check succeeds + * @throws IOException e + */ + public boolean postCheckAndDelete(final byte [] row, final byte [] family, + final byte [] qualifier, final byte [] value, final Delete delete, + boolean result) + throws IOException + { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + result = ((RegionObserver)env.impl).postCheckAndDelete(env, row, + family, qualifier, value, delete, result); + } + } + return result; + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param row row to check + * @param family column family + * @param qualifier column qualifier + * @param amount long amount to increment + * @param writeToWAL whether to write the increment to the WAL + * @return new amount to increment + * @throws IOException if an error occurred on the coprocessor + */ + public long preIncrementColumnValue(final byte [] row, final byte [] family, + final byte [] qualifier, long amount, final boolean writeToWAL) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + amount = ((RegionObserver)env.impl).preIncrementColumnValue(env, + row, family, qualifier, amount, writeToWAL); + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + return amount; + } + + /** + * @param row row to check + * @param family column family + * @param qualifier column qualifier + * @param amount long amount to increment + * @param writeToWAL whether to write the increment to the WAL + * @param result the result returned by incrementColumnValue + * @return the result to return to the client + * @throws IOException if an error occurred on the coprocessor + */ + public long postIncrementColumnValue(final byte [] row, final byte [] family, + final byte [] qualifier, final long amount, final boolean writeToWAL, + long result) throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + result = ((RegionObserver)env.impl).postIncrementColumnValue(env, + row, family, qualifier, amount, writeToWAL, result); + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + return result; + } + + /** + * @param increment increment object + * @param writeToWAL whether to write the increment to the WAL + * @return new amount to increment + * @throws IOException if an error occurred on the coprocessor + */ + public Increment preIncrement(Increment increment) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + increment = ((RegionObserver)env.impl).preIncrement(env, increment); + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + return increment; + } + + /** + * @param increment increment object + * @param writeToWAL whether to write the increment to the WAL + * @param result the result returned by incrementColumnValue + * @return the result to return to the client + * @throws IOException if an error occurred on the coprocessor + */ + public Result postIncrement(final Increment increment, Result result) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + result = ((RegionObserver)env.impl).postIncrement(env, increment, + result); + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + return result; + } + + /** + * @param scan the Scan specification + * @exception IOException Exception + */ + public Scan preScannerOpen(Scan scan) throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + scan = ((RegionObserver)env.impl).preScannerOpen(env, scan); + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + return scan; + } + + /** + * @param scan the Scan specification + * @param scannerId the scanner id allocated by the region server + * @exception IOException Exception + */ + public void postScannerOpen(final Scan scan, long scannerId) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + ((RegionObserver)env.impl).postScannerOpen(env, scan, scannerId); + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param scannerId the scanner id + * @param results the result set returned by the region server + * @return the possibly transformed result set to actually return + * @exception IOException Exception + */ + public void preScannerNext(final long scannerId) throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + ((RegionObserver)env.impl).preScannerNext(env, scannerId); + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param scannerId the scanner id + * @param results the result set returned by the region server + * @return the possibly transformed result set to actually return + * @exception IOException Exception + */ + public List postScannerNext(final long scannerId, + List results) throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + results = ((RegionObserver)env.impl).postScannerNext(env, scannerId, + results); + } + } + return results; + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param scannerId the scanner id + * @exception IOException Exception + */ + public void preScannerClose(final long scannerId) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + ((RegionObserver)env.impl).preScannerClose(env, scannerId); + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } + + /** + * @param scannerId the scanner id + * @exception IOException Exception + */ + public void postScannerClose(final long scannerId) + throws IOException { + try { + coprocessorLock.readLock().lock(); + for (Environment env: coprocessors) { + if (env.impl instanceof RegionObserver) { + ((RegionObserver)env.impl).postScannerClose(env, scannerId); + } + } + } finally { + coprocessorLock.readLock().unlock(); + } + } +} diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index d669de1b8bb..4fe67ccfd1f 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.RowLock; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.CoprocessorException; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.IncompatibleFilterException; import org.apache.hadoop.hbase.io.HeapSize; @@ -231,8 +232,8 @@ public class HRegion implements HeapSize { // , Writable{ final long memstoreFlushSize; private volatile long lastFlushTime; + final RegionServerServices rsServices; private List> recentFlushes = new ArrayList>(); - final FlushRequester flushRequester; private final long blockingMemStoreSize; final long threadWakeFrequency; // Used to guard closes @@ -247,6 +248,9 @@ public class HRegion implements HeapSize { // , Writable{ private final ReadWriteConsistencyControl rwcc = new ReadWriteConsistencyControl(); + // Coprocessor host + private CoprocessorHost coprocessorHost; + /** * Name of the region info file that resides just under the region directory. */ @@ -259,13 +263,14 @@ public class HRegion implements HeapSize { // , Writable{ this.tableDir = null; this.blockingMemStoreSize = 0L; this.conf = null; - this.flushRequester = null; + this.rsServices = null; this.fs = null; this.memstoreFlushSize = 0L; this.log = null; this.regiondir = null; this.regionInfo = null; this.threadWakeFrequency = 0L; + this.coprocessorHost = null; } /** @@ -287,19 +292,19 @@ public class HRegion implements HeapSize { // , Writable{ * @param conf is global configuration settings. * @param regionInfo - HRegionInfo that describes the region * is new), then read them from the supplied path. - * @param flushRequester an object that implements {@link FlushRequester} or null + * @param rsServices reference to {@link RegionServerServices} or null * * @see HRegion#newHRegion(Path, HLog, FileSystem, Configuration, org.apache.hadoop.hbase.HRegionInfo, FlushRequester) */ public HRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf, - HRegionInfo regionInfo, FlushRequester flushRequester) { + HRegionInfo regionInfo, RegionServerServices rsServices) { this.tableDir = tableDir; this.comparator = regionInfo.getComparator(); this.log = log; this.fs = fs; this.conf = conf; this.regionInfo = regionInfo; - this.flushRequester = flushRequester; + this.rsServices = rsServices; this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); String encodedNameStr = this.regionInfo.getEncodedName(); @@ -312,6 +317,11 @@ public class HRegion implements HeapSize { // , Writable{ this.memstoreFlushSize = flushSize; this.blockingMemStoreSize = this.memstoreFlushSize * conf.getLong("hbase.hregion.memstore.block.multiplier", 2); + // don't initialize coprocessors if not running within a regionserver + // TODO: revisit if coprocessors should load in other cases + if (rsServices != null) { + this.coprocessorHost = new CoprocessorHost(this, rsServices, conf); + } if (LOG.isDebugEnabled()) { // Write out region name as string and its encoded name. LOG.debug("Instantiated " + this); @@ -336,6 +346,9 @@ public class HRegion implements HeapSize { // , Writable{ */ public long initialize(final Progressable reporter) throws IOException { + if (coprocessorHost != null) { + coprocessorHost.preOpen(); + } // A region can be reopened if failed a split; reset flags this.closing.set(false); this.closed.set(false); @@ -376,6 +389,10 @@ public class HRegion implements HeapSize { // , Writable{ // (particularly if no recovered edits, seqid will be -1). long nextSeqid = maxSeqId + 1; LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid); + + if (coprocessorHost != null) { + coprocessorHost.postOpen(); + } return nextSeqid; } @@ -498,6 +515,11 @@ public class HRegion implements HeapSize { // , Writable{ LOG.warn("Region " + this + " already closed"); return null; } + + if (coprocessorHost != null) { + this.coprocessorHost.preClose(abort); + } + boolean wasFlushing = false; synchronized (writestate) { // Disable compacting and flushing by background threads for this @@ -543,6 +565,10 @@ public class HRegion implements HeapSize { // , Writable{ result.addAll(store.close()); } this.closed.set(true); + + if (coprocessorHost != null) { + this.coprocessorHost.postClose(abort); + } LOG.info("Closed " + this); return result; } finally { @@ -734,15 +760,18 @@ public class HRegion implements HeapSize { // , Writable{ } lock.readLock().lock(); this.lastCompactInfo = null; + byte [] splitRow = null; try { if (this.closed.get()) { LOG.debug("Skipping compaction on " + this + " because closed"); return null; } - byte [] splitRow = null; if (this.closed.get()) { return splitRow; } + if (coprocessorHost != null) { + coprocessorHost.preCompact(false); + } try { synchronized (writestate) { if (!writestate.compacting && writestate.writesEnabled) { @@ -789,10 +818,13 @@ public class HRegion implements HeapSize { // , Writable{ writestate.notifyAll(); } } - return splitRow; + if (coprocessorHost != null) { + coprocessorHost.postCompact(splitRow != null); + } } finally { lock.readLock().unlock(); } + return splitRow; } /** @@ -827,6 +859,9 @@ public class HRegion implements HeapSize { // , Writable{ LOG.debug("Skipping flush on " + this + " because closed"); return false; } + if (coprocessorHost != null) { + coprocessorHost.preFlush(); + } try { synchronized (writestate) { if (!writestate.flushing && writestate.writesEnabled) { @@ -841,7 +876,11 @@ public class HRegion implements HeapSize { // , Writable{ return false; } } - return internalFlushcache(); + boolean result = internalFlushcache(); + if (coprocessorHost != null) { + coprocessorHost.postFlush(); + } + return result; } finally { synchronized (writestate) { writestate.flushing = false; @@ -1063,22 +1102,29 @@ public class HRegion implements HeapSize { // , Writable{ */ public Result getClosestRowBefore(final byte [] row, final byte [] family) throws IOException { + Result result = null; // look across all the HStores for this region and determine what the // closest key is across all column families, since the data may be sparse KeyValue key = null; checkRow(row); startRegionOperation(); + if (coprocessorHost != null) { + coprocessorHost.preGetClosestRowBefore(row, family); + } try { Store store = getStore(family); KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP); // get the closest key. (HStore.getRowKeyAtOrBefore can return null) key = store.getRowKeyAtOrBefore(kv); - if (key == null) { - return null; + if (key != null) { + Get get = new Get(key.getRow()); + get.addFamily(family); + result = get(get, null); } - Get get = new Get(key.getRow()); - get.addFamily(family); - return get(get, null); + if (coprocessorHost != null) { + result = coprocessorHost.postGetClosestRowBefore(row, family, result); + } + return result; } finally { closeRegionOperation(); } @@ -1120,7 +1166,14 @@ public class HRegion implements HeapSize { // , Writable{ } protected InternalScanner instantiateInternalScanner(Scan scan, List additionalScanners) throws IOException { - return new RegionScanner(scan, additionalScanners); + if (coprocessorHost != null) { + coprocessorHost.preScannerOpen(scan); + } + InternalScanner s = new RegionScanner(scan, additionalScanners); + if (coprocessorHost != null) { + coprocessorHost.postScannerOpen(scan, s.hashCode()); + } + return s; } /* @@ -1186,8 +1239,10 @@ public class HRegion implements HeapSize { // , Writable{ boolean flush = false; updatesLock.readLock().lock(); - try { + if (coprocessorHost != null) { + familyMap = coprocessorHost.preDelete(familyMap); + } for (Map.Entry> e : familyMap.entrySet()) { @@ -1214,7 +1269,7 @@ public class HRegion implements HeapSize { // , Writable{ get.setMaxVersions(count); get.addColumn(family, qual); - List result = get(get); + List result = get(get, false); if (result.size() < count) { // Nothing to delete @@ -1251,10 +1306,13 @@ public class HRegion implements HeapSize { // , Writable{ // Now make changes to the memstore. long addedSize = applyFamilyMapToMemstore(familyMap); flush = isFlushSize(memstoreSize.addAndGet(addedSize)); + + if (coprocessorHost != null) { + coprocessorHost.postDelete(familyMap); + } } finally { this.updatesLock.readLock().unlock(); } - if (flush) { // Request a cache flush. Do it outside update lock. requestFlush(); @@ -1315,6 +1373,7 @@ public class HRegion implements HeapSize { // , Writable{ try { // All edits for the given row (across all column families) must happen atomically. + // Coprocessor interception happens in put(Map,boolean) put(put.getFamilyMap(), writeToWAL); } finally { if(lockid == null) releaseRowLock(lid); @@ -1393,6 +1452,9 @@ public class HRegion implements HeapSize { // , Writable{ /** Keep track of the locks we hold so we can release them in finally clause */ List acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length); + // reference family maps directly so coprocessors can mutate them if desired + Map>[] familyMaps = + new Map[batchOp.operations.length]; // We try to set up a batch in the range [firstIndex,lastIndexExclusive) int firstIndex = batchOp.nextIndexToProcess; int lastIndexExclusive = firstIndex; @@ -1408,9 +1470,19 @@ public class HRegion implements HeapSize { // , Writable{ Put put = nextPair.getFirst(); Integer providedLockId = nextPair.getSecond(); + Map> familyMap = put.getFamilyMap(); + // Check any loaded coprocessors + /* TODO: we should catch any throws coprocessor exceptions here to allow the + rest of the batch to continue. This means fixing HBASE-2898 */ + if (coprocessorHost != null) { + familyMap = coprocessorHost.prePut(familyMap); + } + // store the family map reference to allow for mutations + familyMaps[lastIndexExclusive] = familyMap; + // Check the families in the put. If bad, skip this one. try { - checkFamilies(put.getFamilyMap().keySet()); + checkFamilies(familyMap.keySet()); } catch (NoSuchColumnFamilyException nscf) { LOG.warn("No such column family in batch put", nscf); batchOp.retCodes[lastIndexExclusive] = OperationStatusCode.BAD_FAMILY; @@ -1442,8 +1514,11 @@ public class HRegion implements HeapSize { // , Writable{ // STEP 2. Update any LATEST_TIMESTAMP timestamps // ---------------------------------- for (int i = firstIndex; i < lastIndexExclusive; i++) { + // skip invalid + if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue; + updateKVTimestamps( - batchOp.operations[i].getFirst().getFamilyMap().values(), + familyMaps[i].values(), byteNow); } @@ -1457,7 +1532,7 @@ public class HRegion implements HeapSize { // , Writable{ Put p = batchOp.operations[i].getFirst(); if (!p.getWriteToWAL()) continue; - addFamilyMapToWALEdit(p.getFamilyMap(), walEdit); + addFamilyMapToWALEdit(familyMaps[i], walEdit); } // Append the edit to WAL @@ -1471,9 +1546,13 @@ public class HRegion implements HeapSize { // , Writable{ for (int i = firstIndex; i < lastIndexExclusive; i++) { if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue; - Put p = batchOp.operations[i].getFirst(); - addedSize += applyFamilyMapToMemstore(p.getFamilyMap()); + addedSize += applyFamilyMapToMemstore(familyMaps[i]); batchOp.retCodes[i] = OperationStatusCode.SUCCESS; + + // execute any coprocessor post-hooks + if (coprocessorHost != null) { + coprocessorHost.postDelete(familyMaps[i]); + } } success = true; return addedSize; @@ -1529,7 +1608,7 @@ public class HRegion implements HeapSize { // , Writable{ Integer lid = getLock(lockId, get.getRow(), true); List result = new ArrayList(); try { - result = get(get); + result = get(get, false); boolean matches = false; if (result.size() == 0 && @@ -1633,9 +1712,11 @@ public class HRegion implements HeapSize { // , Writable{ * @praram now * @throws IOException */ - private void put(final byte [] family, final List edits) + private void put(byte [] family, List edits) throws IOException { - Map> familyMap = new HashMap>(); + Map> familyMap; + familyMap = new HashMap>(); + familyMap.put(family, edits); this.put(familyMap, true); } @@ -1647,13 +1728,18 @@ public class HRegion implements HeapSize { // , Writable{ * @param writeToWAL if true, then we should write to the log * @throws IOException */ - private void put(final Map> familyMap, - boolean writeToWAL) throws IOException { + private void put(Map> familyMap, + boolean writeToWAL) throws IOException { + long now = EnvironmentEdgeManager.currentTimeMillis(); byte[] byteNow = Bytes.toBytes(now); boolean flush = false; + this.updatesLock.readLock().lock(); try { + if (coprocessorHost != null) { + familyMap = coprocessorHost.prePut(familyMap); + } checkFamilies(familyMap.keySet()); updateKVTimestamps(familyMap.values(), byteNow); // write/sync to WAL should happen before we touch memstore. @@ -1670,6 +1756,10 @@ public class HRegion implements HeapSize { // , Writable{ long addedSize = applyFamilyMapToMemstore(familyMap); flush = isFlushSize(memstoreSize.addAndGet(addedSize)); + + if (coprocessorHost != null) { + coprocessorHost.postPut(familyMap); + } } finally { this.updatesLock.readLock().unlock(); } @@ -1737,7 +1827,7 @@ public class HRegion implements HeapSize { // , Writable{ } private void requestFlush() { - if (this.flushRequester == null) { + if (this.rsServices == null) { return; } synchronized (writestate) { @@ -1747,7 +1837,7 @@ public class HRegion implements HeapSize { // , Writable{ writestate.flushRequested = true; } // Make request outside of synchronize block; HBASE-818. - this.flushRequester.requestFlush(this); + this.rsServices.getFlushRequester().requestFlush(this); if (LOG.isDebugEnabled()) { LOG.debug("Flush requested on " + this); } @@ -2103,7 +2193,7 @@ public class HRegion implements HeapSize { // , Writable{ * Release the row lock! * @param lockid The lock ID to release. */ - void releaseRowLock(final Integer lockid) { + public void releaseRowLock(final Integer lockid) { synchronized (lockedRows) { byte[] row = lockIds.remove(lockid); lockedRows.remove(row); @@ -2262,8 +2352,17 @@ public class HRegion implements HeapSize { // , Writable{ ReadWriteConsistencyControl.setThreadReadPoint(this.readPt); results.clear(); + + if (coprocessorHost != null) { + coprocessorHost.preScannerNext(hashCode()); + } + boolean returnResult = nextInternal(limit); + if (coprocessorHost != null) { + results = coprocessorHost.postScannerNext(hashCode(), results); + } + outResults.addAll(results); resetFilters(); if (isFilterDone()) { @@ -2369,12 +2468,18 @@ public class HRegion implements HeapSize { // , Writable{ currentRow, 0, currentRow.length) <= isScan); } - public synchronized void close() { + public synchronized void close() throws IOException { + if (coprocessorHost != null) { + coprocessorHost.preScannerClose(hashCode()); + } if (storeHeap != null) { storeHeap.close(); storeHeap = null; } this.filterClosed = true; + if (coprocessorHost != null) { + coprocessorHost.postScannerClose(hashCode()); + } } } @@ -2395,13 +2500,12 @@ public class HRegion implements HeapSize { // , Writable{ * @param conf is global configuration settings. * @param regionInfo - HRegionInfo that describes the region * is new), then read them from the supplied path. - * @param flushListener an object that implements CacheFlushListener or null - * making progress to master -- otherwise master might think region deploy - * failed. Can be null. + * @param rsServices * @return the new instance */ - public static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf, - HRegionInfo regionInfo, FlushRequester flushListener) { + public static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs, + Configuration conf, HRegionInfo regionInfo, + RegionServerServices rsServices) { try { @SuppressWarnings("unchecked") Class regionClass = @@ -2409,9 +2513,9 @@ public class HRegion implements HeapSize { // , Writable{ Constructor c = regionClass.getConstructor(Path.class, HLog.class, FileSystem.class, - Configuration.class, HRegionInfo.class, FlushRequester.class); + Configuration.class, HRegionInfo.class, RegionServerServices.class); - return c.newInstance(tableDir, log, fs, conf, regionInfo, flushListener); + return c.newInstance(tableDir, log, fs, conf, regionInfo, rsServices); } catch (Throwable e) { // todo: what should I throw here? throw new IllegalStateException("Could not instantiate a region instance.", e); @@ -2480,7 +2584,7 @@ public class HRegion implements HeapSize { // , Writable{ * @throws IOException */ public static HRegion openHRegion(final HRegionInfo info, final HLog wal, - final Configuration conf, final FlushRequester flusher, + final Configuration conf, final RegionServerServices rsServices, final Progressable reporter) throws IOException { if (LOG.isDebugEnabled()) { @@ -2492,7 +2596,7 @@ public class HRegion implements HeapSize { // , Writable{ Path dir = HTableDescriptor.getTableDir(FSUtils.getRootDir(conf), info.getTableDesc().getName()); HRegion r = HRegion.newHRegion(dir, wal, FileSystem.get(conf), conf, info, - flusher); + rsServices); return r.openHRegion(reporter); } @@ -2854,9 +2958,9 @@ public class HRegion implements HeapSize { // , Writable{ get.addFamily(family); } } - List result = get(get); - - return new Result(result); + List results = new ArrayList(); + results = get(get, true); + return new Result(results); } /** @@ -2943,24 +3047,46 @@ public class HRegion implements HeapSize { // , Writable{ /* * Do a get based on the get parameter. + * @param withCoprocessor invoke coprocessor or not. We don't want to + * always invoke cp for this private method. */ - private List get(final Get get) throws IOException { + private List get(Get get, boolean withCoprocessor) + throws IOException { Scan scan = new Scan(get); - List results = new ArrayList(); + List results = null; + List getResults = new ArrayList(); + + // pre-get CP hook + if (withCoprocessor && (coprocessorHost != null)) { + get = coprocessorHost.preGet(get); + } InternalScanner scanner = null; try { scanner = getScanner(scan); - scanner.next(results); + scanner.next(getResults); } finally { if (scanner != null) scanner.close(); } + // append get results to pre-get results + if (results != null){ + results.addAll(getResults); + } + else { + results = getResults; + } + // post-get CP hook + if (withCoprocessor && (coprocessorHost != null)) { + results = coprocessorHost.postGet(get, results); + } + return results; } /** + * * Perform one or more increment operations on a row. *

* Increments performed are done under row lock but reads do not take locks @@ -3059,7 +3185,6 @@ public class HRegion implements HeapSize { // , Writable{ } /** - * * @param row * @param family * @param qualifier @@ -3085,6 +3210,8 @@ public class HRegion implements HeapSize { // , Writable{ Get get = new Get(row); get.addColumn(family, qualifier); + // we don't want to invoke coprocessor in this case; ICV is wrapped + // in HRegionServer, so we leave getLastIncrement alone List results = getLastIncrement(get); if (!results.isEmpty()) { @@ -3146,7 +3273,7 @@ public class HRegion implements HeapSize { // , Writable{ public static final long FIXED_OVERHEAD = ClassSize.align( (4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN + - (21 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT); + (22 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT); public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) + @@ -3375,6 +3502,16 @@ public class HRegion implements HeapSize { // , Writable{ return false; } + /** @return the coprocessor host */ + public CoprocessorHost getCoprocessorHost() { + return coprocessorHost; + } + + /** @param coprocessorHost the new coprocessor host */ + public void setCoprocessorHost(final CoprocessorHost coprocessorHost) { + this.coprocessorHost = coprocessorHost; + } + /** * This method needs to be called before any public call that reads or * modifies data. It has to be called just before a try. diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 2ef64321dc1..ca5382d35fc 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1550,8 +1550,15 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, requestCount.incrementAndGet(); try { HRegion region = getRegion(regionName); + if (region.getCoprocessorHost() != null) { + region.getCoprocessorHost().preExists(get); + } Result r = region.get(get, getLockFromId(get.getLockId())); - return r != null && !r.isEmpty(); + boolean result = r != null && !r.isEmpty(); + if (region.getCoprocessorHost() != null) { + result = region.getCoprocessorHost().postExists(get, result); + } + return result; } catch (Throwable t) { throw convertThrowableToIOE(cleanup(t)); } @@ -1640,8 +1647,23 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, public boolean checkAndPut(final byte[] regionName, final byte[] row, final byte[] family, final byte[] qualifier, final byte[] value, final Put put) throws IOException { - return checkAndMutate(regionName, row, family, qualifier, value, put, - getLockFromId(put.getLockId())); + checkOpen(); + if (regionName == null) { + throw new IOException("Invalid arguments to checkAndPut " + + "regionName is null"); + } + HRegion region = getRegion(regionName); + if (region.getCoprocessorHost() != null) { + region.getCoprocessorHost().preCheckAndPut(row, family, qualifier, + value, put); + } + boolean result = checkAndMutate(regionName, row, family, qualifier, + value, put, getLockFromId(put.getLockId())); + if (region.getCoprocessorHost() != null) { + result = region.getCoprocessorHost().postCheckAndPut(row, family, + qualifier, value, put, result); + } + return result; } /** @@ -1659,8 +1681,24 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, public boolean checkAndDelete(final byte[] regionName, final byte[] row, final byte[] family, final byte[] qualifier, final byte[] value, final Delete delete) throws IOException { - return checkAndMutate(regionName, row, family, qualifier, value, delete, - getLockFromId(delete.getLockId())); + checkOpen(); + + if (regionName == null) { + throw new IOException("Invalid arguments to checkAndDelete " + + "regionName is null"); + } + HRegion region = getRegion(regionName); + if (region.getCoprocessorHost() != null) { + region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, + value, delete); + } + boolean result = checkAndMutate(regionName, row, family, qualifier, value, + delete, getLockFromId(delete.getLockId())); + if (region.getCoprocessorHost() != null) { + result = region.getCoprocessorHost().postCheckAndDelete(row, family, + qualifier, value, delete, result); + } + return result; } // @@ -2341,8 +2379,17 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, requestCount.incrementAndGet(); try { HRegion region = getRegion(regionName); - return region.increment(increment, getLockFromId(increment.getLockId()), + Increment incVal = increment; + Result resVal; + if (region.getCoprocessorHost() != null) { + incVal = region.getCoprocessorHost().preIncrement(incVal); + } + resVal = region.increment(incVal, getLockFromId(increment.getLockId()), increment.getWriteToWAL()); + if (region.getCoprocessorHost() != null) { + resVal = region.getCoprocessorHost().postIncrement(incVal, resVal); + } + return resVal; } catch (IOException e) { checkFileSystem(); throw e; @@ -2362,9 +2409,17 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, requestCount.incrementAndGet(); try { HRegion region = getRegion(regionName); + long amountVal = amount; + if (region.getCoprocessorHost() != null) { + amountVal = region.getCoprocessorHost().preIncrementColumnValue(row, + family, qualifier, amountVal, writeToWAL); + } long retval = region.incrementColumnValue(row, family, qualifier, amount, writeToWAL); - + if (region.getCoprocessorHost() != null) { + retval = region.getCoprocessorHost().postIncrementColumnValue(row, + family, qualifier, amountVal, writeToWAL, retval); + } return retval; } catch (IOException e) { checkFileSystem(); @@ -2560,6 +2615,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, return this.compactSplitThread; } + @Override + public ZooKeeperWatcher getZooKeeperWatcher() { + return this.zooKeeper; + } + // // Main program and support routines // diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 1309f934678..26b919a69d2 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -24,6 +24,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; /** @@ -38,11 +39,21 @@ public interface RegionServerServices extends OnlineRegions { /** @return the HLog */ public HLog getWAL(); + /** + * @return Implementation of {@link CatalogTracker} or null. + */ + public CatalogTracker getCatalogTracker(); + + /* + * @return Implementation of {@link ZooKeeperWatcher} or null. + */ + public ZooKeeperWatcher getZooKeeperWatcher(); + /** * @return Implementation of {@link CompactionRequestor} or null. */ public CompactionRequestor getCompactionRequester(); - + /** * @return Implementation of {@link FlushRequester} or null. */ diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java index 1bcde8cc8bf..06b10ea98a1 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java @@ -64,7 +64,7 @@ import org.apache.zookeeper.KeeperException; *

This class is not thread safe. Caller needs ensure split is run by * one thread only. */ -class SplitTransaction { +public class SplitTransaction { private static final Log LOG = LogFactory.getLog(SplitTransaction.class); private static final String SPLITDIR = "splits"; @@ -120,7 +120,7 @@ class SplitTransaction { * @param r Region to split * @param splitrow Row to split around */ - SplitTransaction(final HRegion r, final byte [] splitrow) { + public SplitTransaction(final HRegion r, final byte [] splitrow) { this.parent = r; this.splitrow = splitrow; this.splitdir = getSplitDir(this.parent); @@ -177,7 +177,7 @@ class SplitTransaction { * @return Regions created * @see #rollback(OnlineRegions) */ - PairOfSameType execute(final Server server, + public PairOfSameType execute(final Server server, final RegionServerServices services) throws IOException { LOG.info("Starting split of region " + this.parent); @@ -187,6 +187,11 @@ class SplitTransaction { } assert !this.parent.lock.writeLock().isHeldByCurrentThread() : "Unsafe to hold write lock while performing RPCs"; + // Coprocessor callback + if (this.parent.getCoprocessorHost() != null) { + this.parent.getCoprocessorHost().preSplit(); + } + // If true, no cluster to write meta edits into. boolean testing = server == null? true: server.getConfiguration().getBoolean("hbase.testing.nocluster", false); @@ -201,7 +206,7 @@ class SplitTransaction { services.removeFromOnlineRegions(this.parent.getRegionInfo().getEncodedName()); } this.journal.add(JournalEntry.OFFLINED_PARENT); - + // TODO: If the below were multithreaded would we complete steps in less // elapsed time? St.Ack 20100920 @@ -215,11 +220,11 @@ class SplitTransaction { // stuff in fs that needs cleanup -- a storefile or two. Thats why we // add entry to journal BEFORE rather than AFTER the change. this.journal.add(JournalEntry.STARTED_REGION_A_CREATION); - HRegion a = createDaughterRegion(this.hri_a, this.parent.flushRequester); + HRegion a = createDaughterRegion(this.hri_a, this.parent.rsServices); // Ditto this.journal.add(JournalEntry.STARTED_REGION_B_CREATION); - HRegion b = createDaughterRegion(this.hri_b, this.parent.flushRequester); + HRegion b = createDaughterRegion(this.hri_b, this.parent.rsServices); // Edit parent in meta if (!testing) { @@ -249,6 +254,11 @@ class SplitTransaction { } } + // Coprocessor callback + if (this.parent.getCoprocessorHost() != null) { + this.parent.getCoprocessorHost().postSplit(a,b); + } + // Leaving here, the splitdir with its dross will be in place but since the // split was successful, just leave it; it'll be cleaned when parent is // deleted and cleaned up. @@ -393,7 +403,7 @@ class SplitTransaction { * @see #cleanupDaughterRegion(FileSystem, Path, HRegionInfo) */ HRegion createDaughterRegion(final HRegionInfo hri, - final FlushRequester flusher) + final RegionServerServices rsServices) throws IOException { // Package private so unit tests have access. FileSystem fs = this.parent.getFilesystem(); @@ -401,7 +411,7 @@ class SplitTransaction { this.splitdir, hri); HRegion r = HRegion.newHRegion(this.parent.getTableDir(), this.parent.getLog(), fs, this.parent.getConf(), - hri, flusher); + hri, rsServices); HRegion.moveInitialFilesIntoPlace(fs, regionDir, r.getRegionDir()); return r; } @@ -494,7 +504,7 @@ class SplitTransaction { * Call this method on initial region deploy. Cleans up any mess * left by previous deploys of passed r region. * @param r - * @throws IOException + * @throws IOException */ static void cleanupAnySplitDetritus(final HRegion r) throws IOException { Path splitdir = getSplitDir(r); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java b/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java index ace799761b0..c5c95625dcd 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java @@ -94,7 +94,7 @@ public class OpenRegionHandler extends EventHandler { try { // Instantiate the region. This also periodically updates OPENING. region = HRegion.openHRegion(regionInfo, this.rsServices.getWAL(), - server.getConfiguration(), this.rsServices.getFlushRequester(), + server.getConfiguration(), this.rsServices, new Progressable() { public void progress() { try { diff --git a/src/main/resources/hbase-default.xml b/src/main/resources/hbase-default.xml index 630c0410849..05d8ca0779c 100644 --- a/src/main/resources/hbase-default.xml +++ b/src/main/resources/hbase-default.xml @@ -484,6 +484,18 @@ + + hbase.coprocessor.default.classes + + A comma-separated list of Coprocessors that are loaded by + default. For any override coprocessor method, these classes will be called + in order. After implement your own + Coprocessor, just put it in HBase's classpath and add the fully + qualified class name here. + A coprocessor can also be loaded on demand by setting HTableDescriptor. + + +