HBASE-2001 Coprocessors: Colocate user code with regions

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1037102 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2010-11-20 01:23:39 +00:00
parent 5f8c1dd1fa
commit 7299a72715
22 changed files with 3926 additions and 69 deletions

View File

@ -1,4 +1,17 @@
HBase Change Log
Release 0.91.0 - Unreleased
INCOMPATIBLE CHANGES
HBASE-2002 Coprocessors: Client side support; Support RPC interface
changes at runtime (Gary Helmling via Andrew Purtell)
BUG FIXES
IMPROVEMENTS
HBASE-2001 Coprocessors: Colocate user code with regions (Mingjie Lai via
Andrew Purtell)
Release 0.90.0 - Unreleased
INCOMPATIBLE CHANGES
HBASE-1822 Remove the deprecated APIs
@ -33,9 +46,6 @@ Release 0.90.0 - Unreleased
HBASE-2641 HBASE-2641 Refactor HLog splitLog, hbase-2437 continued;
break out split code as new classes
(James Kennedy via Stack)
HBASE-2002 Coprocessors: Client side support; Support RPC interface
changes at runtime (Gary Helmling via Andrew Purtell)
BUG FIXES
HBASE-1791 Timeout in IndexRecordWriter (Bradford Stephens via Andrew

View File

@ -0,0 +1,93 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion;
import org.apache.hadoop.hbase.regionserver.HRegion;
/**
* This abstract class provides default implementation of an Endpoint.
* It also maintains a CoprocessorEnvironment object which can be
* used to access region resource.
*
* It's recommended to use this abstract class to implement your Endpoint.
* However you still can just implement the interface CoprocessorProtocol
* and Coprocessor to develop an Endpoint. But you won't be able to access
* the region related resource, i.e., CoprocessorEnvironment.
*/
public abstract class BaseEndpointCoprocessor implements Coprocessor,
CoprocessorProtocol {
private CoprocessorEnvironment env;
/**
* @param e Coprocessor environment.
*/
private void setEnvironment(CoprocessorEnvironment e) {
env = e;
}
/**
* @return env Coprocessor environment.
*/
public CoprocessorEnvironment getEnvironment() {
return env;
}
@Override
public long getProtocolVersion(String arg0, long arg1) throws IOException {
return HBaseRPCProtocolVersion.versionID;
}
@Override
public void preOpen(CoprocessorEnvironment e) { }
/**
* It initializes the coprocessor resources. If you need to override this
* method, please remember to call super(e).
*/
@Override
public void postOpen(CoprocessorEnvironment e) {
setEnvironment(e);
}
@Override
public void preClose(CoprocessorEnvironment e, boolean abortRequested) { }
@Override
public void postClose(CoprocessorEnvironment e, boolean abortRequested) { }
@Override
public void preFlush(CoprocessorEnvironment e) { }
@Override
public void postFlush(CoprocessorEnvironment e) { }
@Override
public void preCompact(CoprocessorEnvironment e, boolean willSplit) { }
@Override
public void postCompact(CoprocessorEnvironment e, boolean willSplit) { }
@Override
public void preSplit(CoprocessorEnvironment e) { }
@Override
public void postSplit(CoprocessorEnvironment e, HRegion l, HRegion r) { }
}

View File

@ -0,0 +1,225 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import java.io.IOException;
/**
* An abstract class that implements Coprocessor and RegionObserver.
* By extending it, you can create you own region observer without
* overriding all abstract methods of Coprocessor and RegionObserver.
*/
public abstract class BaseRegionObserverCoprocessor implements Coprocessor,
RegionObserver {
@Override
public void preOpen(CoprocessorEnvironment e) { }
@Override
public void postOpen(CoprocessorEnvironment e) { }
@Override
public void preClose(CoprocessorEnvironment e, boolean abortRequested)
{ }
@Override
public void postClose(CoprocessorEnvironment e, boolean abortRequested)
{ }
@Override
public void preFlush(CoprocessorEnvironment e) { }
@Override
public void postFlush(CoprocessorEnvironment e) { }
@Override
public void preSplit(CoprocessorEnvironment e) { }
@Override
public void postSplit(CoprocessorEnvironment e, HRegion l, HRegion r) { }
@Override
public void preCompact(CoprocessorEnvironment e, boolean willSplit) { }
@Override
public void postCompact(CoprocessorEnvironment e, boolean willSplit) { }
@Override
public void preGetClosestRowBefore(final CoprocessorEnvironment e,
final byte [] row, final byte [] family)
throws IOException {
}
@Override
public Result postGetClosestRowBefore(final CoprocessorEnvironment e,
byte[] row, byte[] family, final Result result)
throws IOException {
return result;
}
@Override
public Get preGet(final CoprocessorEnvironment e, final Get get)
throws IOException {
return get;
}
@Override
public List<KeyValue> postGet(final CoprocessorEnvironment e, final Get get,
List<KeyValue> results) throws IOException {
return results;
}
@Override
public Get preExists(final CoprocessorEnvironment e, final Get get)
throws IOException {
return get;
}
@Override
public boolean postExists(final CoprocessorEnvironment e,
final Get get, boolean exists)
throws IOException {
return exists;
}
@Override
public Map<byte[], List<KeyValue>> prePut(final CoprocessorEnvironment e,
final Map<byte[], List<KeyValue>> familyMap) throws IOException {
return familyMap;
}
@Override
public void postPut(final CoprocessorEnvironment e,
final Map<byte[], List<KeyValue>> familyMap)
throws IOException {
}
@Override
public Map<byte[], List<KeyValue>> preDelete(final CoprocessorEnvironment e,
final Map<byte[], List<KeyValue>> familyMap) throws IOException {
return familyMap;
}
@Override
public void postDelete(CoprocessorEnvironment e,
Map<byte[], List<KeyValue>> familyMap) throws IOException {
}
@Override
public Put preCheckAndPut(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Put put) throws IOException {
return put;
}
@Override
public boolean postCheckAndPut(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Put put, final boolean result)
throws IOException {
return result;
}
@Override
public Delete preCheckAndDelete(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Delete delete)
throws IOException {
return delete;
}
@Override
public boolean postCheckAndDelete(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Delete delete, final boolean result)
throws IOException {
return result;
}
@Override
public long preIncrementColumnValue(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final long amount, final boolean writeToWAL) throws IOException {
return amount;
}
@Override
public long postIncrementColumnValue(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final long amount, final boolean writeToWAL, long result)
throws IOException {
return result;
}
@Override
public Increment preIncrement(final CoprocessorEnvironment e,
final Increment increment)
throws IOException {
return increment;
}
@Override
public Result postIncrement(final CoprocessorEnvironment e,
final Increment increment,
final Result result) throws IOException {
return result;
}
@Override
public Scan preScannerOpen(final CoprocessorEnvironment e, final Scan scan)
throws IOException {
return scan;
}
@Override
public void postScannerOpen(final CoprocessorEnvironment e,
final Scan scan,
final long scannerId) throws IOException { }
@Override
public void preScannerNext(final CoprocessorEnvironment e,
final long scannerId) throws IOException {
}
@Override
public List<KeyValue> postScannerNext(final CoprocessorEnvironment e,
final long scannerId, final List<KeyValue> results)
throws IOException {
return results;
}
@Override
public void preScannerClose(final CoprocessorEnvironment e,
final long scannerId)
throws IOException { }
@Override
public void postScannerClose(final CoprocessorEnvironment e,
final long scannerId)
throws IOException { }
}

View File

@ -0,0 +1,120 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import org.apache.hadoop.hbase.regionserver.HRegion;
/**
* Coprocess interface.
*/
public interface Coprocessor {
public static final int VERSION = 1;
/**
* Installation priority. Coprocessors will be executed in sequence
* by the order of coprocessor priority.
*/
public enum Priority {
HIGHEST(0),
SYSTEM(Integer.MAX_VALUE/4),
USER(Integer.MAX_VALUE/2),
LOWEST(Integer.MAX_VALUE);
private int prio;
Priority(int prio) {
this.prio = prio;
}
public int intValue() {
return prio;
}
}
// Interface
/**
* Called before the region is reported as open to the master.
* @param e the environment provided by the region server
*/
public void preOpen(final CoprocessorEnvironment e);
/**
* Called after the region is reported as open to the master.
* @param e the environment provided by the region server
*/
public void postOpen(final CoprocessorEnvironment e);
/**
* Called before the memstore is flushed to disk.
* @param e the environment provided by the region server
*/
public void preFlush(final CoprocessorEnvironment e);
/**
* Called after the memstore is flushed to disk.
* @param e the environment provided by the region server
*/
public void postFlush(final CoprocessorEnvironment e);
/**
* Called before compaction.
* @param e the environment provided by the region server
* @param willSplit true if compaction will result in a split, false
* otherwise
*/
public void preCompact(final CoprocessorEnvironment e,
final boolean willSplit);
/**
* Called after compaction.
* @param e the environment provided by the region server
* @param willSplit true if compaction will result in a split, false
* otherwise
*/
public void postCompact(final CoprocessorEnvironment e,
final boolean willSplit);
/**
* Called before the region is split.
* @param e the environment provided by the region server
* (e.getRegion() returns the parent region)
*/
public void preSplit(final CoprocessorEnvironment e);
/**
* Called after the region is split.
* @param e the environment provided by the region server
* (e.getRegion() returns the parent region)
* @param l the left daughter region
* @param r the right daughter region
*/
public void postSplit(final CoprocessorEnvironment e, final HRegion l,
final HRegion r);
/**
* Called before the region is reported as closed to the master.
* @param e the environment provided by the region server
* @param abortRequested true if the region server is aborting
*/
public void preClose(final CoprocessorEnvironment e, boolean abortRequested);
/**
* Called after the region is reported as closed to the master.
* @param e the environment provided by the region server
* @param abortRequested true if the region server is aborting
*/
public void postClose(final CoprocessorEnvironment e, boolean abortRequested);
}

View File

@ -0,0 +1,71 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
/**
* Coprocessor environment state.
*/
public interface CoprocessorEnvironment {
/** @return the Coprocessor interface version */
public int getVersion();
/** @return the HBase version as a string (e.g. "0.21.0") */
public String getHBaseVersion();
/** @return the region associated with this coprocessor */
public HRegion getRegion();
/** @return reference to the region server services */
public RegionServerServices getRegionServerServices();
/**
* @return an interface for accessing the given table
* @throws IOException
*/
public HTableInterface getTable(byte[] tableName) throws IOException;
// environment variables
/**
* Get an environment variable
* @param key the key
* @return the object corresponding to the environment variable, if set
*/
public Object get(Object key);
/**
* Set an environment variable
* @param key the key
* @param value the value
*/
public void put(Object key, Object value);
/**
* Remove an environment variable
* @param key the key
* @return the object corresponding to the environment variable, if set
*/
public Object remove(Object key);
}

View File

@ -0,0 +1,52 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import org.apache.hadoop.hbase.DoNotRetryIOException;
/**
* Thrown if a coprocessor encounters any exception.
*/
public class CoprocessorException extends DoNotRetryIOException {
private static final long serialVersionUID = 4357922136679804887L;
/** Default Constructor */
public CoprocessorException() {
super();
}
/**
* Constructor with a Class object and exception message.
* @param clazz
* @param s
*/
public CoprocessorException(Class<?> clazz, String s) {
super( "Coprocessor [" + clazz.getName() + "]: " + s);
}
/**
* Constructs the exception and supplies a string as the message
* @param s - message
*/
public CoprocessorException(String s) {
super(s);
}
}

View File

@ -0,0 +1,336 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment;
import java.io.IOException;
/**
* Coprocessors implement this interface to observe and mediate client actions
* on the region.
*/
public interface RegionObserver {
/**
* Called before a client makes a GetClosestRowBefore request.
* @param e the environment provided by the region server
* @param row the row
* @param family the family
* @throws IOException if an error occurred on the coprocessor
*/
public void preGetClosestRowBefore(final CoprocessorEnvironment e,
final byte [] row, final byte [] family)
throws IOException;
/**
* Called after a client makes a GetClosestRowBefore request.
* @param e the environment provided by the region server
* @param row the row
* @param family the desired family
* @param result the result set
* @return the possible tranformed result set to return to the client
* @throws IOException if an error occurred on the coprocessor
*/
public Result postGetClosestRowBefore(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final Result result)
throws IOException;
/**
* Called before the client perform a get()
* @param e the environment provided by the region server
* @param get the Get request
* @return the possibly transformed Get object by coprocessor
* @throws IOException if an error occurred on the coprocessor
*/
public Get preGet(final CoprocessorEnvironment e, final Get get)
throws IOException;
/**
* Called after the client perform a get()
* @param e the environment provided by the region server
* @param get the Get request
* @param results the result list
* @return the possibly transformed result list to return to client
* @throws IOException if an error occurred on the coprocessor
*/
public List<KeyValue> postGet(final CoprocessorEnvironment e, final Get get,
final List<KeyValue> results)
throws IOException;
/**
* Called before the client tests for existence using a Get.
* @param e the environment provided by the region server
* @param get the Get request
* @return the possibly transformed Get object by coprocessor
* @throws IOException if an error occurred on the coprocessor
*/
public Get preExists(final CoprocessorEnvironment e, final Get get)
throws IOException;
/**
* Called after the client tests for existence using a Get.
* @param e the environment provided by the region server
* @param get the Get request
* @param exists the result returned by the region server
* @return the result to return to the client
* @throws IOException if an error occurred on the coprocessor
*/
public boolean postExists(final CoprocessorEnvironment e, final Get get,
final boolean exists)
throws IOException;
/**
* Called before the client stores a value.
* @param e the environment provided by the region server
* @param familyMap map of family to edits for the given family.
* @return the possibly transformed map to actually use
* @throws IOException if an error occurred on the coprocessor
*/
public Map<byte[], List<KeyValue>> prePut(final CoprocessorEnvironment e,
final Map<byte[], List<KeyValue>> familyMap)
throws IOException;
/**
* Called after the client stores a value.
* @param e the environment provided by the region server
* @param familyMap map of family to edits for the given family.
* @throws IOException if an error occurred on the coprocessor
*/
public void postPut(final CoprocessorEnvironment e, final Map<byte[],
List<KeyValue>> familyMap)
throws IOException;
/**
* Called before the client deletes a value.
* @param e the environment provided by the region server
* @param familyMap map of family to edits for the given family.
* @return the possibly transformed map to actually use
* @throws IOException if an error occurred on the coprocessor
*/
public Map<byte[], List<KeyValue>> preDelete(final CoprocessorEnvironment e,
final Map<byte[], List<KeyValue>> familyMap)
throws IOException;
/**
* Called after the client deletes a value.
* @param e the environment provided by the region server
* @param familyMap map of family to edits for the given family.
* @throws IOException if an error occurred on the coprocessor
*/
public void postDelete(final CoprocessorEnvironment e,
final Map<byte[], List<KeyValue>> familyMap)
throws IOException;
/**
* Called before checkAndPut
* @param e the environment provided by the region server
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param value the expected value
* @param put data to put if check succeeds
* @return the possibly transformed map to actually use
* @throws IOException if an error occurred on the coprocessor
*/
public Put preCheckAndPut(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Put put)
throws IOException;
/**
* Called after checkAndPut
* @param e the environment provided by the region server
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param value the expected value
* @param put data to put if check succeeds
* @param result from the checkAndPut
* @return the possibly transformed value to return to client
* @throws IOException if an error occurred on the coprocessor
*/
public boolean postCheckAndPut(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Put put, final boolean result)
throws IOException;
/**
* Called before checkAndPut
* @param e the environment provided by the region server
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param value the expected value
* @param delete delete to commit if check succeeds
* @return the possibly transformed map to actually use
* @throws IOException if an error occurred on the coprocessor
*/
public Delete preCheckAndDelete(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Delete delete)
throws IOException;
/**
* Called after checkAndDelete
* @param e the environment provided by the region server
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param value the expected value
* @param delete delete to commit if check succeeds
* @param result from the CheckAndDelete
* @return the possibly transformed value to return to client
* @throws IOException if an error occurred on the coprocessor
*/
public boolean postCheckAndDelete(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Delete delete, final boolean result)
throws IOException;
/**
* Called before incrementColumnValue
* @param e the environment provided by the region server
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param amount long amount to increment
* @param writeToWAL whether to write the increment to the WAL
* @return new amount to increment
* @throws IOException if an error occurred on the coprocessor
*/
public long preIncrementColumnValue(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final long amount, final boolean writeToWAL)
throws IOException;
/**
* Called after incrementColumnValue
* @param e the environment provided by the region server
* @param row row to check
* @param family column family
* @param qualifier column qualifier
* @param amount long amount to increment
* @param writeToWAL whether to write the increment to the WAL
* @param result the result returned by incrementColumnValue
* @return the result to return to the client
* @throws IOException if an error occurred on the coprocessor
*/
public long postIncrementColumnValue(final CoprocessorEnvironment e,
final byte [] row, final byte [] family, final byte [] qualifier,
final long amount, final boolean writeToWAL, final long result)
throws IOException;
/**
* Called before incrementColumnValue
* @param e the environment provided by the region server
* @param increment increment object
* @param writeToWAL whether to write the increment to the WAL
* @return new Increment instance
* @throws IOException if an error occurred on the coprocessor
*/
public Increment preIncrement(final CoprocessorEnvironment e,
final Increment increment)
throws IOException;
/**
* Called after increment
* @param e the environment provided by the region server
* @param increment increment object
* @param writeToWAL whether to write the increment to the WAL
* @param result the result returned by increment
* @return the result to return to the client
* @throws IOException if an error occurred on the coprocessor
*/
public Result postIncrement(final CoprocessorEnvironment e,
final Increment increment, final Result result)
throws IOException;
/**
* Called before the client opens a new scanner.
* @param e the environment provided by the region server
* @param scan the Scan specification
* @return the possibly transformed Scan to actually use
* @throws IOException if an error occurred on the coprocessor
*/
public Scan preScannerOpen(final CoprocessorEnvironment e, final Scan scan)
throws IOException;
/**
* Called after the client opens a new scanner.
* @param e the environment provided by the region server
* @param scan the Scan specification
* @param scannerId the scanner id allocated by the region server
* @throws IOException if an error occurred on the coprocessor
*/
public void postScannerOpen(final CoprocessorEnvironment e, final Scan scan,
final long scannerId)
throws IOException;
/**
* Called before the client asks for the next row on a scanner.
* @param e the environment provided by the region server
* @param scannerId the scanner id
* @param results the result set returned by the region server
* @return the possibly transformed result set to actually return
* @throws IOException if an error occurred on the coprocessor
*/
public void preScannerNext(final CoprocessorEnvironment e,
final long scannerId)
throws IOException;
/**
* Called after the client asks for the next row on a scanner.
* @param e the environment provided by the region server
* @param scannerId the scanner id
* @param results the result set returned by the region server
* @return the possibly transformed result set to actually return
* @throws IOException if an error occurred on the coprocessor
*/
public List<KeyValue> postScannerNext(final CoprocessorEnvironment e,
final long scannerId, final List<KeyValue> results)
throws IOException;
/**
* Called before the client closes a scanner.
* @param e the environment provided by the region server
* @param scannerId the scanner id
* @throws IOException if an error occurred on the coprocessor
*/
public void preScannerClose(final CoprocessorEnvironment e,
final long scannerId)
throws IOException;
/**
* Called after the client closes a scanner.
* @param e the environment provided by the region server
* @param scannerId the scanner id
* @throws IOException if an error occurred on the coprocessor
*/
public void postScannerClose(final CoprocessorEnvironment e,
final long scannerId)
throws IOException;
}

View File

@ -0,0 +1,367 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
<h2>Table of Contents</h2>
<ul>
<li><a href="#overview">Overview</a></li>
<li><a href="#coprocessor">Coprocessor</a></li>
<li><a href="#regionobserver">RegionObserver</a></li>
<li><a href="#commandtarget">Endpoint</a></li>
<li><a href="#load">Coprocessor loading</a></li>
</ul>
<h2><a name="overview">Overview</a></h2>
Coprocessors are code that runs in-process on each region server. Regions
contain references to the coprocessor implementation classes associated
with them. Coprocessor classes can be loaded either from local
jars on the region server's classpath or via the HDFS classloader.
<p>
Multiple types of coprocessors are provided to provide sufficient flexibility
for potential use cases. Right now there are:
<p>
<ul>
<li>Coprocessor: provides region lifecycle management hooks, e.g., region
open/close/split/flush/compact operations.</li>
<li>RegionObserver: provides hook for monitor table operations from
client side, such as table get/put/scan/delete, etc.</li>
<li>Endpoint: provides on demand triggers for any arbitrary function
executed at a region. One use case is column aggregation at region
server. </li>
</ul>
<h2><a name="coprocessor">Coprocessor</a></h2>
A coprocessor is required to
implement <code>Coprocessor</code> interface so that coprocessor framework
can manage it internally.
<p>
Another design goal of this interface is to provide simple features for
making coprocessors useful, while exposing no more internal state or
control actions of the region server than necessary and not exposing them
directly.
<p>
Over the lifecycle of a region, the methods of this interface are invoked
when the corresponding events happen. The master transitions regions
through the following states:
<p>
&nbsp;&nbsp;&nbsp;
unassigned -> pendingOpen -> open -> pendingClose -> closed.
<p>
Coprocessors have opportunity to intercept and handle events in
pendingOpen, open, and pendingClose states.
<p>
<h3>PendingOpen</h3>
<p>
The region server is opening a region to bring it online. Coprocessors
can piggyback or fail this process.
<p>
<ul>
<li>preOpen, postOpen: Called before and after the region is reported as
online to the master.</li><p>
</ul>
<p>
<h3>Open</h3>
The region is open on the region server and is processing both client
requests (get, put, scan, etc.) and administrative actions (flush, compact,
split, etc.). Coprocessors can piggyback administrative actions via:
<p>
<ul>
<li>preFlush, postFlush: Called before and after the memstore is flushed
into a new store file.</li><p>
<li>preCompact, postCompact: Called before and after compaction.</li><p>
<li>preSplit, postSplit: Called after the region is split.</li><p>
</ul>
<p>
<h3>PendingClose</h3>
The region server is closing the region. This can happen as part of normal
operations or may happen when the region server is aborting due to fatal
conditions such as OOME, health check failure, or fatal filesystem
problems. Coprocessors can piggyback this event. If the server is aborting
an indication to this effect will be passed as an argument.
<p>
<ul>
<li>preClose and postClose: Called before and after the region is
reported as closed to the master.</li><p>
</ul>
<p>
<h2><a name="regionobserver">RegionObserver</a></h2>
If the coprocessor implements the <code>RegionObserver</code> interface it can
observe and mediate client actions on the region:
<p>
<ul>
<li>preGet, postGet: Called before and after a client makes a Get
request.</li><p>
<li>preExists, postExists: Called before and after the client tests
for existence using a Get.</li><p>
<li>prePut and postPut: Called before and after the client stores a value.
</li><p>
<li>preDelete and postDelete: Called before and after the client
deletes a value.</li><p>
<li>preScannerOpen postScannerOpen: Called before and after the client
opens a new scanner.</li><p>
<li>preScannerNext, postScannerNext: Called before and after the client
asks for the next row on a scanner.</li><p>
<li>preScannerClose, postScannerClose: Called before and after the client
closes a scanner.</li><p>
<li>preCheckAndPut, postCheckAndPut: Called before and after the client
calls checkAndPut().</li><p>
<li>preCheckAndDelete, postCheckAndDelete: Called before and after the client
calls checkAndDelete().</li><p>
</ul>
You can also extend abstract class <code>BaseRegionObserverCoprocessor</code>
which
implements both <code>Coprocessor</code> and <code>RegionObserver</code>.
In addition, it overrides all methods with default behaviors so you don't
have to override all of them.
<p>
Here's an example of what a simple RegionObserver might look like. This
example shows how to implement access control for HBase. This
coprocessor checks user information for a given client request, e.g.,
Get/Put/Delete/Scan by injecting code at certain
<code>RegionObserver</code>
preXXX hooks. If the user is not allowed to access the resource, a
CoprocessorException will be thrown. And the client request will be
denied by receiving this exception.
<div style="background-color: #cccccc; padding: 2px">
<blockquote><pre>
package org.apache.hadoop.hbase.coprocessor;
import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
// Sample access-control coprocessor. It utilizes RegionObserver
// and intercept preXXX() method to check user privilege for the given table
// and column family.
public class AccessControlCoprocessor extends BaseRegionObserverCoprocessor {
@Override
public Get preGet(CoprocessorEnvironment e, Get get)
throws CoprocessorException {
// check permissions..
if (access_not_allowed) {
throw new AccessDeniedException(&quot;User is not allowed to access.&quot;);
}
return get;
}
// override prePut(), preDelete(), etc.
}
</pre></blockquote>
</div>
<h2><a name="commandtarget">Endpoint</a></h2>
<code>Coprocessor</code> and <code>RegionObserver</code> provide certain hooks
for injecting user code running at each region. The user code will be triggerd
by existing <code>HTable</code> and <code>HBaseAdmin</code> operations at
the certain hook points.
<p>
Through Endpoint and dynamic RPC protocol, you can define your own
interface communicated between client and region server,
i.e., you can create a new method, specify passed parameters and return types
for this new method.
And the new Endpoint methods can be triggered by
calling client side dynamic RPC functions -- <code>HTable.coprocessorExec(...)
</code>.
<p>
To implement a Endpoint, you need to:
<ul>
<li>Extend <code>CoprocessorProtocol</code>: the interface defines
communication protocol for the new Endpoint, and will be
served as the RPC protocol between client and region server.</li>
<li>Extend both <code>BaseEndpointCoprocessor</code> abstract class,
and the above extended <code>CoprocessorProtocol</code> interface:
the actually implemented class running at region server.</li>
</ul>
<p>
Here's an example of performing column aggregation at region server:
<div style="background-color: #cccccc; padding: 2px">
<blockquote><pre>
// A sample protocol for performing aggregation at regions.
public static interface ColumnAggregationProtocol
extends CoprocessorProtocol {
// Perform aggregation for a given column at the region. The aggregation
// will include all the rows inside the region. It can be extended to
// allow passing start and end rows for a fine-grained aggregation.
public int sum(byte[] family, byte[] qualifier) throws IOException;
}
// Aggregation implementation at a region.
public static class ColumnAggregationEndpoint extends BaseEndpointCoprocessor
implements ColumnAggregationProtocol {
@Override
// Scan the region by the given family and qualifier. Return the aggregation
// result.
public int sum(byte[] family, byte[] qualifier)
throws IOException {
// aggregate at each region
Scan scan = new Scan();
scan.addColumn(family, qualifier);
int sumResult = 0;
// use an internal scanner to perform scanning.
InternalScanner scanner = getEnvironment().getRegion().getScanner(scan);
try {
List&lt;KeyValue&gt; curVals = new ArrayList&lt;KeyValue&gt;();
boolean done = false;
do {
curVals.clear();
done = scanner.next(curVals);
KeyValue kv = curVals.get(0);
sumResult += Bytes.toInt(kv.getValue());
} while (done);
} finally {
scanner.close();
}
return sumResult;
}
}
</pre></blockquote>
</div>
<p>
Client invocations are performed through <code>HTable</code>,
which has the following methods added by dynamic RPC protocol:
<div style="background-color: #cccccc; padding: 2px">
<blockquote><pre>
public &lt;T extends CoprocessorProtocol&gt; T coprocessorProxy(Class&lt;T&gt; protocol, Row row)
public &lt;T extends CoprocessorProtocol, R&gt; void coprocessorExec(
Class&lt;T&gt; protocol, List&lt;? extends Row&gt; rows,
BatchCall&lt;T,R&gt; callable, BatchCallback&lt;R&gt; callback)
public &lt;T extends CoprocessorProtocol, R&gt; void coprocessorExec(
Class&lt;T&gt; protocol, RowRange range,
BatchCall&lt;T,R&gt; callable, BatchCallback&lt;R&gt; callback)
</pre></blockquote>
</div>
<p>
Here is a client side example of invoking
<code>ColumnAggregationEndpoint</code>:
<div style="background-color: #cccccc; padding: 2px">
<blockquote><pre>
HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
Scan scan;
Map&lt;byte[], Integer&gt; results;
// scan: for all regions
scan = new Scan();
results = table.coprocessorExec(ColumnAggregationProtocol.class, scan,
new BatchCall&lt;ColumnAggregationProtocol,Integer&gt;() {
public Integer call(ColumnAggregationProtocol instance) throws IOException{
return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
}
});
int sumResult = 0;
int expectedResult = 0;
for (Map.Entry&lt;byte[], Integer&gt; e : results.entrySet()) {
sumResult += e.getValue();
}
</pre></blockquote>
</div>
<h2><a name="load">Coprocess loading</a></h2>
A customized coprocessor can be loaded by two different ways, by configuration,
or by <code>HTableDescriptor</code> for a newly created table.
<p>
(Currently we don't really have an on demand coprocessor loading machanism for
opened regions.)
<h3>Load from configuration</h3>
Whenever a region is opened, it will read coprocessor class names from
<code>hbase.coprocessor.default.classes</code> from <code>Configuration</code>.
Coprocessor framework will automatically load the configured classes as
default coprocessors. The classes must be included in the classpath already.
<p>
<div style="background-color: #cccccc; padding: 2px">
<blockquote><pre>
&lt;property&gt;
&lt;name&gt;hbase.coprocessor.default.classes&lt;/name&gt;
&lt;value&gt;org.apache.hadoop.hbase.coprocessor.AccessControllCoprocessor, org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol&lt;/value&gt;
&lt;description&gt;A comma-separated list of Coprocessors that are loaded by
default. For any override coprocessor method from RegionObservor or
Coprocessor, these classes' implementation will be called
in order. After implement your own
Coprocessor, just put it in HBase's classpath and add the fully
qualified class name here.
&lt;/description&gt;
&lt;/property&gt;
</pre></blockquote>
</div>
<p>
The first defined coprocessor will be assigned
<code>Coprocessor.Priority.SYSTEM</code> as priority. And each following
coprocessor's priority will be incremented by one. Coprocessors are executed
in order according to the natural ordering of the int.
<h3>Load from table attribute</h3>
Coprocessor classes can also be configured at table attribute. The
attribute key must start with "Coprocessor" and values of the form is
&lt;path&gt;:&lt;class&gt;:&lt;priority&gt;, so that the framework can
recognize and load it.
<p>
<div style="background-color: #cccccc; padding: 2px">
<blockquote><pre>
&#39;COPROCESSOR$1&#39; =&gt; &#39;hdfs://localhost:8020/hbase/coprocessors/test.jar:Test:1000&#39;
&#39;COPROCESSOR$2&#39; =&gt; &#39;/hbase/coprocessors/test2.jar:AnotherTest:1001&#39;
</pre></blockquote>
</div>
<p>
&lt;path&gt; must point to a jar, can be on any filesystem supported by the
Hadoop </code>FileSystem</code> object.
<p>
&lt;class&gt; is the coprocessor implementation class. A jar can contain
more than one coprocessor implementation, but only one can be specified
at a time in each table attribute.
<p>
&lt;priority&gt; is an integer. Coprocessors are executed in order according
to the natural ordering of the int. Coprocessors can optionally abort
actions. So typically one would want to put authoritative CPs (security
policy implementations, perhaps) ahead of observers.
<p>
<div style="background-color: #cccccc; padding: 2px">
<blockquote><pre>
Path path = new Path(fs.getUri() + Path.SEPARATOR +
"TestClassloading.jar");
// create a table that references the jar
HTableDescriptor htd = new HTableDescriptor(getClass().getName());
htd.addFamily(new HColumnDescriptor("test"));
htd.setValue("Coprocessor$1",
path.toString() +
":" + classFullName +
":" + Coprocessor.Priority.USER);
HBaseAdmin admin = new HBaseAdmin(this.conf);
admin.createTable(htd);
<h3>Chain of RegionObservers</h3>
As described above, multiple coprocessors can be loaded at one region at the
same time. In case of RegionObserver, you can have more than one
RegionObservers register to one same hook point, i.e, preGet(), etc.
When a region reach the
hook point, the framework will invoke each registered RegionObserver by the
order of assigned priority.
</pre></blockquote>
</div>
*/
package org.apache.hadoop.hbase.coprocessor;

File diff suppressed because it is too large Load Diff

View File

@ -74,6 +74,7 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowLock;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
import org.apache.hadoop.hbase.io.HeapSize;
@ -231,8 +232,8 @@ public class HRegion implements HeapSize { // , Writable{
final long memstoreFlushSize;
private volatile long lastFlushTime;
final RegionServerServices rsServices;
private List<Pair<Long,Long>> recentFlushes = new ArrayList<Pair<Long,Long>>();
final FlushRequester flushRequester;
private final long blockingMemStoreSize;
final long threadWakeFrequency;
// Used to guard closes
@ -247,6 +248,9 @@ public class HRegion implements HeapSize { // , Writable{
private final ReadWriteConsistencyControl rwcc =
new ReadWriteConsistencyControl();
// Coprocessor host
private CoprocessorHost coprocessorHost;
/**
* Name of the region info file that resides just under the region directory.
*/
@ -259,13 +263,14 @@ public class HRegion implements HeapSize { // , Writable{
this.tableDir = null;
this.blockingMemStoreSize = 0L;
this.conf = null;
this.flushRequester = null;
this.rsServices = null;
this.fs = null;
this.memstoreFlushSize = 0L;
this.log = null;
this.regiondir = null;
this.regionInfo = null;
this.threadWakeFrequency = 0L;
this.coprocessorHost = null;
}
/**
@ -287,19 +292,19 @@ public class HRegion implements HeapSize { // , Writable{
* @param conf is global configuration settings.
* @param regionInfo - HRegionInfo that describes the region
* is new), then read them from the supplied path.
* @param flushRequester an object that implements {@link FlushRequester} or null
* @param rsServices reference to {@link RegionServerServices} or null
*
* @see HRegion#newHRegion(Path, HLog, FileSystem, Configuration, org.apache.hadoop.hbase.HRegionInfo, FlushRequester)
*/
public HRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf,
HRegionInfo regionInfo, FlushRequester flushRequester) {
HRegionInfo regionInfo, RegionServerServices rsServices) {
this.tableDir = tableDir;
this.comparator = regionInfo.getComparator();
this.log = log;
this.fs = fs;
this.conf = conf;
this.regionInfo = regionInfo;
this.flushRequester = flushRequester;
this.rsServices = rsServices;
this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY,
10 * 1000);
String encodedNameStr = this.regionInfo.getEncodedName();
@ -312,6 +317,11 @@ public class HRegion implements HeapSize { // , Writable{
this.memstoreFlushSize = flushSize;
this.blockingMemStoreSize = this.memstoreFlushSize *
conf.getLong("hbase.hregion.memstore.block.multiplier", 2);
// don't initialize coprocessors if not running within a regionserver
// TODO: revisit if coprocessors should load in other cases
if (rsServices != null) {
this.coprocessorHost = new CoprocessorHost(this, rsServices, conf);
}
if (LOG.isDebugEnabled()) {
// Write out region name as string and its encoded name.
LOG.debug("Instantiated " + this);
@ -336,6 +346,9 @@ public class HRegion implements HeapSize { // , Writable{
*/
public long initialize(final Progressable reporter)
throws IOException {
if (coprocessorHost != null) {
coprocessorHost.preOpen();
}
// A region can be reopened if failed a split; reset flags
this.closing.set(false);
this.closed.set(false);
@ -376,6 +389,10 @@ public class HRegion implements HeapSize { // , Writable{
// (particularly if no recovered edits, seqid will be -1).
long nextSeqid = maxSeqId + 1;
LOG.info("Onlined " + this.toString() + "; next sequenceid=" + nextSeqid);
if (coprocessorHost != null) {
coprocessorHost.postOpen();
}
return nextSeqid;
}
@ -498,6 +515,11 @@ public class HRegion implements HeapSize { // , Writable{
LOG.warn("Region " + this + " already closed");
return null;
}
if (coprocessorHost != null) {
this.coprocessorHost.preClose(abort);
}
boolean wasFlushing = false;
synchronized (writestate) {
// Disable compacting and flushing by background threads for this
@ -543,6 +565,10 @@ public class HRegion implements HeapSize { // , Writable{
result.addAll(store.close());
}
this.closed.set(true);
if (coprocessorHost != null) {
this.coprocessorHost.postClose(abort);
}
LOG.info("Closed " + this);
return result;
} finally {
@ -734,15 +760,18 @@ public class HRegion implements HeapSize { // , Writable{
}
lock.readLock().lock();
this.lastCompactInfo = null;
byte [] splitRow = null;
try {
if (this.closed.get()) {
LOG.debug("Skipping compaction on " + this + " because closed");
return null;
}
byte [] splitRow = null;
if (this.closed.get()) {
return splitRow;
}
if (coprocessorHost != null) {
coprocessorHost.preCompact(false);
}
try {
synchronized (writestate) {
if (!writestate.compacting && writestate.writesEnabled) {
@ -789,10 +818,13 @@ public class HRegion implements HeapSize { // , Writable{
writestate.notifyAll();
}
}
return splitRow;
if (coprocessorHost != null) {
coprocessorHost.postCompact(splitRow != null);
}
} finally {
lock.readLock().unlock();
}
return splitRow;
}
/**
@ -827,6 +859,9 @@ public class HRegion implements HeapSize { // , Writable{
LOG.debug("Skipping flush on " + this + " because closed");
return false;
}
if (coprocessorHost != null) {
coprocessorHost.preFlush();
}
try {
synchronized (writestate) {
if (!writestate.flushing && writestate.writesEnabled) {
@ -841,7 +876,11 @@ public class HRegion implements HeapSize { // , Writable{
return false;
}
}
return internalFlushcache();
boolean result = internalFlushcache();
if (coprocessorHost != null) {
coprocessorHost.postFlush();
}
return result;
} finally {
synchronized (writestate) {
writestate.flushing = false;
@ -1063,22 +1102,29 @@ public class HRegion implements HeapSize { // , Writable{
*/
public Result getClosestRowBefore(final byte [] row, final byte [] family)
throws IOException {
Result result = null;
// look across all the HStores for this region and determine what the
// closest key is across all column families, since the data may be sparse
KeyValue key = null;
checkRow(row);
startRegionOperation();
if (coprocessorHost != null) {
coprocessorHost.preGetClosestRowBefore(row, family);
}
try {
Store store = getStore(family);
KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
// get the closest key. (HStore.getRowKeyAtOrBefore can return null)
key = store.getRowKeyAtOrBefore(kv);
if (key == null) {
return null;
}
if (key != null) {
Get get = new Get(key.getRow());
get.addFamily(family);
return get(get, null);
result = get(get, null);
}
if (coprocessorHost != null) {
result = coprocessorHost.postGetClosestRowBefore(row, family, result);
}
return result;
} finally {
closeRegionOperation();
}
@ -1120,7 +1166,14 @@ public class HRegion implements HeapSize { // , Writable{
}
protected InternalScanner instantiateInternalScanner(Scan scan, List<KeyValueScanner> additionalScanners) throws IOException {
return new RegionScanner(scan, additionalScanners);
if (coprocessorHost != null) {
coprocessorHost.preScannerOpen(scan);
}
InternalScanner s = new RegionScanner(scan, additionalScanners);
if (coprocessorHost != null) {
coprocessorHost.postScannerOpen(scan, s.hashCode());
}
return s;
}
/*
@ -1186,8 +1239,10 @@ public class HRegion implements HeapSize { // , Writable{
boolean flush = false;
updatesLock.readLock().lock();
try {
if (coprocessorHost != null) {
familyMap = coprocessorHost.preDelete(familyMap);
}
for (Map.Entry<byte[], List<KeyValue>> e : familyMap.entrySet()) {
@ -1214,7 +1269,7 @@ public class HRegion implements HeapSize { // , Writable{
get.setMaxVersions(count);
get.addColumn(family, qual);
List<KeyValue> result = get(get);
List<KeyValue> result = get(get, false);
if (result.size() < count) {
// Nothing to delete
@ -1251,10 +1306,13 @@ public class HRegion implements HeapSize { // , Writable{
// Now make changes to the memstore.
long addedSize = applyFamilyMapToMemstore(familyMap);
flush = isFlushSize(memstoreSize.addAndGet(addedSize));
if (coprocessorHost != null) {
coprocessorHost.postDelete(familyMap);
}
} finally {
this.updatesLock.readLock().unlock();
}
if (flush) {
// Request a cache flush. Do it outside update lock.
requestFlush();
@ -1315,6 +1373,7 @@ public class HRegion implements HeapSize { // , Writable{
try {
// All edits for the given row (across all column families) must happen atomically.
// Coprocessor interception happens in put(Map,boolean)
put(put.getFamilyMap(), writeToWAL);
} finally {
if(lockid == null) releaseRowLock(lid);
@ -1393,6 +1452,9 @@ public class HRegion implements HeapSize { // , Writable{
/** Keep track of the locks we hold so we can release them in finally clause */
List<Integer> acquiredLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
// reference family maps directly so coprocessors can mutate them if desired
Map<byte[],List<KeyValue>>[] familyMaps =
new Map[batchOp.operations.length];
// We try to set up a batch in the range [firstIndex,lastIndexExclusive)
int firstIndex = batchOp.nextIndexToProcess;
int lastIndexExclusive = firstIndex;
@ -1408,9 +1470,19 @@ public class HRegion implements HeapSize { // , Writable{
Put put = nextPair.getFirst();
Integer providedLockId = nextPair.getSecond();
Map<byte[], List<KeyValue>> familyMap = put.getFamilyMap();
// Check any loaded coprocessors
/* TODO: we should catch any throws coprocessor exceptions here to allow the
rest of the batch to continue. This means fixing HBASE-2898 */
if (coprocessorHost != null) {
familyMap = coprocessorHost.prePut(familyMap);
}
// store the family map reference to allow for mutations
familyMaps[lastIndexExclusive] = familyMap;
// Check the families in the put. If bad, skip this one.
try {
checkFamilies(put.getFamilyMap().keySet());
checkFamilies(familyMap.keySet());
} catch (NoSuchColumnFamilyException nscf) {
LOG.warn("No such column family in batch put", nscf);
batchOp.retCodes[lastIndexExclusive] = OperationStatusCode.BAD_FAMILY;
@ -1442,8 +1514,11 @@ public class HRegion implements HeapSize { // , Writable{
// STEP 2. Update any LATEST_TIMESTAMP timestamps
// ----------------------------------
for (int i = firstIndex; i < lastIndexExclusive; i++) {
// skip invalid
if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
updateKVTimestamps(
batchOp.operations[i].getFirst().getFamilyMap().values(),
familyMaps[i].values(),
byteNow);
}
@ -1457,7 +1532,7 @@ public class HRegion implements HeapSize { // , Writable{
Put p = batchOp.operations[i].getFirst();
if (!p.getWriteToWAL()) continue;
addFamilyMapToWALEdit(p.getFamilyMap(), walEdit);
addFamilyMapToWALEdit(familyMaps[i], walEdit);
}
// Append the edit to WAL
@ -1471,9 +1546,13 @@ public class HRegion implements HeapSize { // , Writable{
for (int i = firstIndex; i < lastIndexExclusive; i++) {
if (batchOp.retCodes[i] != OperationStatusCode.NOT_RUN) continue;
Put p = batchOp.operations[i].getFirst();
addedSize += applyFamilyMapToMemstore(p.getFamilyMap());
addedSize += applyFamilyMapToMemstore(familyMaps[i]);
batchOp.retCodes[i] = OperationStatusCode.SUCCESS;
// execute any coprocessor post-hooks
if (coprocessorHost != null) {
coprocessorHost.postDelete(familyMaps[i]);
}
}
success = true;
return addedSize;
@ -1529,7 +1608,7 @@ public class HRegion implements HeapSize { // , Writable{
Integer lid = getLock(lockId, get.getRow(), true);
List<KeyValue> result = new ArrayList<KeyValue>();
try {
result = get(get);
result = get(get, false);
boolean matches = false;
if (result.size() == 0 &&
@ -1633,9 +1712,11 @@ public class HRegion implements HeapSize { // , Writable{
* @praram now
* @throws IOException
*/
private void put(final byte [] family, final List<KeyValue> edits)
private void put(byte [] family, List<KeyValue> edits)
throws IOException {
Map<byte[], List<KeyValue>> familyMap = new HashMap<byte[], List<KeyValue>>();
Map<byte[], List<KeyValue>> familyMap;
familyMap = new HashMap<byte[], List<KeyValue>>();
familyMap.put(family, edits);
this.put(familyMap, true);
}
@ -1647,13 +1728,18 @@ public class HRegion implements HeapSize { // , Writable{
* @param writeToWAL if true, then we should write to the log
* @throws IOException
*/
private void put(final Map<byte [], List<KeyValue>> familyMap,
private void put(Map<byte [], List<KeyValue>> familyMap,
boolean writeToWAL) throws IOException {
long now = EnvironmentEdgeManager.currentTimeMillis();
byte[] byteNow = Bytes.toBytes(now);
boolean flush = false;
this.updatesLock.readLock().lock();
try {
if (coprocessorHost != null) {
familyMap = coprocessorHost.prePut(familyMap);
}
checkFamilies(familyMap.keySet());
updateKVTimestamps(familyMap.values(), byteNow);
// write/sync to WAL should happen before we touch memstore.
@ -1670,6 +1756,10 @@ public class HRegion implements HeapSize { // , Writable{
long addedSize = applyFamilyMapToMemstore(familyMap);
flush = isFlushSize(memstoreSize.addAndGet(addedSize));
if (coprocessorHost != null) {
coprocessorHost.postPut(familyMap);
}
} finally {
this.updatesLock.readLock().unlock();
}
@ -1737,7 +1827,7 @@ public class HRegion implements HeapSize { // , Writable{
}
private void requestFlush() {
if (this.flushRequester == null) {
if (this.rsServices == null) {
return;
}
synchronized (writestate) {
@ -1747,7 +1837,7 @@ public class HRegion implements HeapSize { // , Writable{
writestate.flushRequested = true;
}
// Make request outside of synchronize block; HBASE-818.
this.flushRequester.requestFlush(this);
this.rsServices.getFlushRequester().requestFlush(this);
if (LOG.isDebugEnabled()) {
LOG.debug("Flush requested on " + this);
}
@ -2103,7 +2193,7 @@ public class HRegion implements HeapSize { // , Writable{
* Release the row lock!
* @param lockid The lock ID to release.
*/
void releaseRowLock(final Integer lockid) {
public void releaseRowLock(final Integer lockid) {
synchronized (lockedRows) {
byte[] row = lockIds.remove(lockid);
lockedRows.remove(row);
@ -2262,8 +2352,17 @@ public class HRegion implements HeapSize { // , Writable{
ReadWriteConsistencyControl.setThreadReadPoint(this.readPt);
results.clear();
if (coprocessorHost != null) {
coprocessorHost.preScannerNext(hashCode());
}
boolean returnResult = nextInternal(limit);
if (coprocessorHost != null) {
results = coprocessorHost.postScannerNext(hashCode(), results);
}
outResults.addAll(results);
resetFilters();
if (isFilterDone()) {
@ -2369,12 +2468,18 @@ public class HRegion implements HeapSize { // , Writable{
currentRow, 0, currentRow.length) <= isScan);
}
public synchronized void close() {
public synchronized void close() throws IOException {
if (coprocessorHost != null) {
coprocessorHost.preScannerClose(hashCode());
}
if (storeHeap != null) {
storeHeap.close();
storeHeap = null;
}
this.filterClosed = true;
if (coprocessorHost != null) {
coprocessorHost.postScannerClose(hashCode());
}
}
}
@ -2395,13 +2500,12 @@ public class HRegion implements HeapSize { // , Writable{
* @param conf is global configuration settings.
* @param regionInfo - HRegionInfo that describes the region
* is new), then read them from the supplied path.
* @param flushListener an object that implements CacheFlushListener or null
* making progress to master -- otherwise master might think region deploy
* failed. Can be null.
* @param rsServices
* @return the new instance
*/
public static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf,
HRegionInfo regionInfo, FlushRequester flushListener) {
public static HRegion newHRegion(Path tableDir, HLog log, FileSystem fs,
Configuration conf, HRegionInfo regionInfo,
RegionServerServices rsServices) {
try {
@SuppressWarnings("unchecked")
Class<? extends HRegion> regionClass =
@ -2409,9 +2513,9 @@ public class HRegion implements HeapSize { // , Writable{
Constructor<? extends HRegion> c =
regionClass.getConstructor(Path.class, HLog.class, FileSystem.class,
Configuration.class, HRegionInfo.class, FlushRequester.class);
Configuration.class, HRegionInfo.class, RegionServerServices.class);
return c.newInstance(tableDir, log, fs, conf, regionInfo, flushListener);
return c.newInstance(tableDir, log, fs, conf, regionInfo, rsServices);
} catch (Throwable e) {
// todo: what should I throw here?
throw new IllegalStateException("Could not instantiate a region instance.", e);
@ -2480,7 +2584,7 @@ public class HRegion implements HeapSize { // , Writable{
* @throws IOException
*/
public static HRegion openHRegion(final HRegionInfo info, final HLog wal,
final Configuration conf, final FlushRequester flusher,
final Configuration conf, final RegionServerServices rsServices,
final Progressable reporter)
throws IOException {
if (LOG.isDebugEnabled()) {
@ -2492,7 +2596,7 @@ public class HRegion implements HeapSize { // , Writable{
Path dir = HTableDescriptor.getTableDir(FSUtils.getRootDir(conf),
info.getTableDesc().getName());
HRegion r = HRegion.newHRegion(dir, wal, FileSystem.get(conf), conf, info,
flusher);
rsServices);
return r.openHRegion(reporter);
}
@ -2854,9 +2958,9 @@ public class HRegion implements HeapSize { // , Writable{
get.addFamily(family);
}
}
List<KeyValue> result = get(get);
return new Result(result);
List<KeyValue> results = new ArrayList<KeyValue>();
results = get(get, true);
return new Result(results);
}
/**
@ -2943,24 +3047,46 @@ public class HRegion implements HeapSize { // , Writable{
/*
* Do a get based on the get parameter.
* @param withCoprocessor invoke coprocessor or not. We don't want to
* always invoke cp for this private method.
*/
private List<KeyValue> get(final Get get) throws IOException {
private List<KeyValue> get(Get get, boolean withCoprocessor)
throws IOException {
Scan scan = new Scan(get);
List<KeyValue> results = new ArrayList<KeyValue>();
List<KeyValue> results = null;
List<KeyValue> getResults = new ArrayList<KeyValue>();
// pre-get CP hook
if (withCoprocessor && (coprocessorHost != null)) {
get = coprocessorHost.preGet(get);
}
InternalScanner scanner = null;
try {
scanner = getScanner(scan);
scanner.next(results);
scanner.next(getResults);
} finally {
if (scanner != null)
scanner.close();
}
// append get results to pre-get results
if (results != null){
results.addAll(getResults);
}
else {
results = getResults;
}
// post-get CP hook
if (withCoprocessor && (coprocessorHost != null)) {
results = coprocessorHost.postGet(get, results);
}
return results;
}
/**
*
* Perform one or more increment operations on a row.
* <p>
* Increments performed are done under row lock but reads do not take locks
@ -3059,7 +3185,6 @@ public class HRegion implements HeapSize { // , Writable{
}
/**
*
* @param row
* @param family
* @param qualifier
@ -3085,6 +3210,8 @@ public class HRegion implements HeapSize { // , Writable{
Get get = new Get(row);
get.addColumn(family, qualifier);
// we don't want to invoke coprocessor in this case; ICV is wrapped
// in HRegionServer, so we leave getLastIncrement alone
List<KeyValue> results = getLastIncrement(get);
if (!results.isEmpty()) {
@ -3146,7 +3273,7 @@ public class HRegion implements HeapSize { // , Writable{
public static final long FIXED_OVERHEAD = ClassSize.align(
(4 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN +
(21 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
(22 * ClassSize.REFERENCE) + ClassSize.OBJECT + Bytes.SIZEOF_INT);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
ClassSize.OBJECT + (2 * ClassSize.ATOMIC_BOOLEAN) +
@ -3375,6 +3502,16 @@ public class HRegion implements HeapSize { // , Writable{
return false;
}
/** @return the coprocessor host */
public CoprocessorHost getCoprocessorHost() {
return coprocessorHost;
}
/** @param coprocessorHost the new coprocessor host */
public void setCoprocessorHost(final CoprocessorHost coprocessorHost) {
this.coprocessorHost = coprocessorHost;
}
/**
* This method needs to be called before any public call that reads or
* modifies data. It has to be called just before a try.

View File

@ -1550,8 +1550,15 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
requestCount.incrementAndGet();
try {
HRegion region = getRegion(regionName);
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().preExists(get);
}
Result r = region.get(get, getLockFromId(get.getLockId()));
return r != null && !r.isEmpty();
boolean result = r != null && !r.isEmpty();
if (region.getCoprocessorHost() != null) {
result = region.getCoprocessorHost().postExists(get, result);
}
return result;
} catch (Throwable t) {
throw convertThrowableToIOE(cleanup(t));
}
@ -1640,8 +1647,23 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
public boolean checkAndPut(final byte[] regionName, final byte[] row,
final byte[] family, final byte[] qualifier, final byte[] value,
final Put put) throws IOException {
return checkAndMutate(regionName, row, family, qualifier, value, put,
getLockFromId(put.getLockId()));
checkOpen();
if (regionName == null) {
throw new IOException("Invalid arguments to checkAndPut "
+ "regionName is null");
}
HRegion region = getRegion(regionName);
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().preCheckAndPut(row, family, qualifier,
value, put);
}
boolean result = checkAndMutate(regionName, row, family, qualifier,
value, put, getLockFromId(put.getLockId()));
if (region.getCoprocessorHost() != null) {
result = region.getCoprocessorHost().postCheckAndPut(row, family,
qualifier, value, put, result);
}
return result;
}
/**
@ -1659,8 +1681,24 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
public boolean checkAndDelete(final byte[] regionName, final byte[] row,
final byte[] family, final byte[] qualifier, final byte[] value,
final Delete delete) throws IOException {
return checkAndMutate(regionName, row, family, qualifier, value, delete,
getLockFromId(delete.getLockId()));
checkOpen();
if (regionName == null) {
throw new IOException("Invalid arguments to checkAndDelete "
+ "regionName is null");
}
HRegion region = getRegion(regionName);
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier,
value, delete);
}
boolean result = checkAndMutate(regionName, row, family, qualifier, value,
delete, getLockFromId(delete.getLockId()));
if (region.getCoprocessorHost() != null) {
result = region.getCoprocessorHost().postCheckAndDelete(row, family,
qualifier, value, delete, result);
}
return result;
}
//
@ -2341,8 +2379,17 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
requestCount.incrementAndGet();
try {
HRegion region = getRegion(regionName);
return region.increment(increment, getLockFromId(increment.getLockId()),
Increment incVal = increment;
Result resVal;
if (region.getCoprocessorHost() != null) {
incVal = region.getCoprocessorHost().preIncrement(incVal);
}
resVal = region.increment(incVal, getLockFromId(increment.getLockId()),
increment.getWriteToWAL());
if (region.getCoprocessorHost() != null) {
resVal = region.getCoprocessorHost().postIncrement(incVal, resVal);
}
return resVal;
} catch (IOException e) {
checkFileSystem();
throw e;
@ -2362,9 +2409,17 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
requestCount.incrementAndGet();
try {
HRegion region = getRegion(regionName);
long amountVal = amount;
if (region.getCoprocessorHost() != null) {
amountVal = region.getCoprocessorHost().preIncrementColumnValue(row,
family, qualifier, amountVal, writeToWAL);
}
long retval = region.incrementColumnValue(row, family, qualifier, amount,
writeToWAL);
if (region.getCoprocessorHost() != null) {
retval = region.getCoprocessorHost().postIncrementColumnValue(row,
family, qualifier, amountVal, writeToWAL, retval);
}
return retval;
} catch (IOException e) {
checkFileSystem();
@ -2560,6 +2615,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
return this.compactSplitThread;
}
@Override
public ZooKeeperWatcher getZooKeeperWatcher() {
return this.zooKeeper;
}
//
// Main program and support routines
//

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/**
@ -38,6 +39,16 @@ public interface RegionServerServices extends OnlineRegions {
/** @return the HLog */
public HLog getWAL();
/**
* @return Implementation of {@link CatalogTracker} or null.
*/
public CatalogTracker getCatalogTracker();
/*
* @return Implementation of {@link ZooKeeperWatcher} or null.
*/
public ZooKeeperWatcher getZooKeeperWatcher();
/**
* @return Implementation of {@link CompactionRequestor} or null.
*/

View File

@ -64,7 +64,7 @@ import org.apache.zookeeper.KeeperException;
* <p>This class is not thread safe. Caller needs ensure split is run by
* one thread only.
*/
class SplitTransaction {
public class SplitTransaction {
private static final Log LOG = LogFactory.getLog(SplitTransaction.class);
private static final String SPLITDIR = "splits";
@ -120,7 +120,7 @@ class SplitTransaction {
* @param r Region to split
* @param splitrow Row to split around
*/
SplitTransaction(final HRegion r, final byte [] splitrow) {
public SplitTransaction(final HRegion r, final byte [] splitrow) {
this.parent = r;
this.splitrow = splitrow;
this.splitdir = getSplitDir(this.parent);
@ -177,7 +177,7 @@ class SplitTransaction {
* @return Regions created
* @see #rollback(OnlineRegions)
*/
PairOfSameType<HRegion> execute(final Server server,
public PairOfSameType<HRegion> execute(final Server server,
final RegionServerServices services)
throws IOException {
LOG.info("Starting split of region " + this.parent);
@ -187,6 +187,11 @@ class SplitTransaction {
}
assert !this.parent.lock.writeLock().isHeldByCurrentThread() : "Unsafe to hold write lock while performing RPCs";
// Coprocessor callback
if (this.parent.getCoprocessorHost() != null) {
this.parent.getCoprocessorHost().preSplit();
}
// If true, no cluster to write meta edits into.
boolean testing = server == null? true:
server.getConfiguration().getBoolean("hbase.testing.nocluster", false);
@ -215,11 +220,11 @@ class SplitTransaction {
// stuff in fs that needs cleanup -- a storefile or two. Thats why we
// add entry to journal BEFORE rather than AFTER the change.
this.journal.add(JournalEntry.STARTED_REGION_A_CREATION);
HRegion a = createDaughterRegion(this.hri_a, this.parent.flushRequester);
HRegion a = createDaughterRegion(this.hri_a, this.parent.rsServices);
// Ditto
this.journal.add(JournalEntry.STARTED_REGION_B_CREATION);
HRegion b = createDaughterRegion(this.hri_b, this.parent.flushRequester);
HRegion b = createDaughterRegion(this.hri_b, this.parent.rsServices);
// Edit parent in meta
if (!testing) {
@ -249,6 +254,11 @@ class SplitTransaction {
}
}
// Coprocessor callback
if (this.parent.getCoprocessorHost() != null) {
this.parent.getCoprocessorHost().postSplit(a,b);
}
// Leaving here, the splitdir with its dross will be in place but since the
// split was successful, just leave it; it'll be cleaned when parent is
// deleted and cleaned up.
@ -393,7 +403,7 @@ class SplitTransaction {
* @see #cleanupDaughterRegion(FileSystem, Path, HRegionInfo)
*/
HRegion createDaughterRegion(final HRegionInfo hri,
final FlushRequester flusher)
final RegionServerServices rsServices)
throws IOException {
// Package private so unit tests have access.
FileSystem fs = this.parent.getFilesystem();
@ -401,7 +411,7 @@ class SplitTransaction {
this.splitdir, hri);
HRegion r = HRegion.newHRegion(this.parent.getTableDir(),
this.parent.getLog(), fs, this.parent.getConf(),
hri, flusher);
hri, rsServices);
HRegion.moveInitialFilesIntoPlace(fs, regionDir, r.getRegionDir());
return r;
}

View File

@ -94,7 +94,7 @@ public class OpenRegionHandler extends EventHandler {
try {
// Instantiate the region. This also periodically updates OPENING.
region = HRegion.openHRegion(regionInfo, this.rsServices.getWAL(),
server.getConfiguration(), this.rsServices.getFlushRequester(),
server.getConfiguration(), this.rsServices,
new Progressable() {
public void progress() {
try {

View File

@ -484,6 +484,18 @@
</description>
</property>
<property>
<name>hbase.coprocessor.default.classes</name>
<value></value>
<description>A comma-separated list of Coprocessors that are loaded by
default. For any override coprocessor method, these classes will be called
in order. After implement your own
Coprocessor, just put it in HBase's classpath and add the fully
qualified class name here.
A coprocessor can also be loaded on demand by setting HTableDescriptor.
</description>
</property>
<!--
The following three properties are used together to create the list of
host:peer_port:leader_port quorum servers for ZooKeeper.

View File

@ -0,0 +1,61 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
/**
* The aggregation implementation at a region.
*/
public class ColumnAggregationEndpoint extends BaseEndpointCoprocessor
implements ColumnAggregationProtocol {
@Override
public long sum(byte[] family, byte[] qualifier)
throws IOException {
// aggregate at each region
Scan scan = new Scan();
scan.addColumn(family, qualifier);
int sumResult = 0;
InternalScanner scanner = getEnvironment().getRegion().getScanner(scan);
try {
List<KeyValue> curVals = new ArrayList<KeyValue>();
boolean done = false;
do {
curVals.clear();
done = scanner.next(curVals);
KeyValue kv = curVals.get(0);
sumResult += Bytes.toInt(kv.getValue());
} while (done);
} finally {
scanner.close();
}
return sumResult;
}
}

View File

@ -0,0 +1,39 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import java.io.IOException;
/**
* A sample protocol for performing aggregation at regions.
*/
public interface ColumnAggregationProtocol extends CoprocessorProtocol {
/**
* Perform aggregation for a given column at the region. The aggregation
* will include all the rows inside the region. It can be extended to
* allow passing start and end rows for a fine-grained aggregation.
* @param family family
* @param qualifier qualifier
* @return Aggregation of the column.
* @throws exception.
*/
public long sum(byte[] family, byte[] qualifier) throws IOException;
}

View File

@ -0,0 +1,274 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import static org.junit.Assert.*;
/**
* A sample region observer that tests the RegionObserver interface.
* It works with TestRegionObserverInterface to provide the test case.
*/
public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
static final Log LOG = LogFactory.getLog(TestRegionObserverInterface.class);
boolean beforeDelete = true;
boolean scannerOpened = false;
boolean hadPreGet = false;
boolean hadPostGet = false;
boolean hadPrePut = false;
boolean hadPostPut = false;
boolean hadPreDeleted = false;
boolean hadPostDeleted = false;
boolean hadPreGetClosestRowBefore = false;
boolean hadPostGetClosestRowBefore = false;
boolean hadPreIncrement = false;
boolean hadPostIncrement = false;
// Overriden RegionObserver methods
@Override
public Get preGet(CoprocessorEnvironment e, Get get) {
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
hadPreGet = true;
assertNotNull(e);
assertNotNull(e.getRegion());
}
return get;
}
@Override
public List<KeyValue> postGet(CoprocessorEnvironment e, Get get,
List<KeyValue> results) {
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
boolean foundA = false;
boolean foundB = false;
boolean foundC = false;
for (KeyValue kv: results) {
if (Bytes.equals(kv.getFamily(), TestRegionObserverInterface.A)) {
foundA = true;
}
if (Bytes.equals(kv.getFamily(), TestRegionObserverInterface.B)) {
foundB = true;
}
if (Bytes.equals(kv.getFamily(), TestRegionObserverInterface.C)) {
foundC = true;
}
}
assertTrue(foundA);
assertTrue(foundB);
assertTrue(foundC);
hadPostGet = true;
}
return results;
}
@Override
public Map<byte[], List<KeyValue>> prePut(CoprocessorEnvironment e,
Map<byte[], List<KeyValue>> familyMap) {
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
List<KeyValue> kvs = familyMap.get(TestRegionObserverInterface.A);
assertNotNull(kvs);
assertNotNull(kvs.get(0));
assertTrue(Bytes.equals(kvs.get(0).getQualifier(),
TestRegionObserverInterface.A));
kvs = familyMap.get(TestRegionObserverInterface.B);
assertNotNull(kvs);
assertNotNull(kvs.get(0));
assertTrue(Bytes.equals(kvs.get(0).getQualifier(),
TestRegionObserverInterface.B));
kvs = familyMap.get(TestRegionObserverInterface.C);
assertNotNull(kvs);
assertNotNull(kvs.get(0));
assertTrue(Bytes.equals(kvs.get(0).getQualifier(),
TestRegionObserverInterface.C));
hadPrePut = true;
}
return familyMap;
}
@Override
public void postPut(CoprocessorEnvironment e,
Map<byte[], List<KeyValue>> familyMap) {
List<KeyValue> kvs = familyMap.get(TestRegionObserverInterface.A);
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
assertNotNull(kvs);
assertNotNull(kvs.get(0));
assertTrue(Bytes.equals(kvs.get(0).getQualifier(),
TestRegionObserverInterface.A));
kvs = familyMap.get(TestRegionObserverInterface.B);
assertNotNull(kvs);
assertNotNull(kvs.get(0));
assertTrue(Bytes.equals(kvs.get(0).getQualifier(),
TestRegionObserverInterface.B));
kvs = familyMap.get(TestRegionObserverInterface.C);
assertNotNull(kvs);
assertNotNull(kvs.get(0));
assertTrue(Bytes.equals(kvs.get(0).getQualifier(),
TestRegionObserverInterface.C));
hadPostPut = true;
}
}
@Override
public Map<byte[], List<KeyValue>> preDelete(CoprocessorEnvironment e,
Map<byte[], List<KeyValue>> familyMap) {
if (beforeDelete && e.getRegion().getTableDesc().getName().equals(
TestRegionObserverInterface.TEST_TABLE)) {
hadPreDeleted = true;
}
return familyMap;
}
@Override
public void postDelete(CoprocessorEnvironment e,
Map<byte[], List<KeyValue>> familyMap) {
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
beforeDelete = false;
hadPostDeleted = true;
}
}
@Override
public void preGetClosestRowBefore(final CoprocessorEnvironment e,
final byte[] row, final byte[] family) {
if (beforeDelete && e.getRegion().getTableDesc().getName().equals(
TestRegionObserverInterface.TEST_TABLE)) {
hadPreGetClosestRowBefore = true;
}
}
@Override
public Result postGetClosestRowBefore(final CoprocessorEnvironment e,
final byte[] row, final byte[] family, Result result) {
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE)) {
hadPostGetClosestRowBefore = true;
}
return result;
}
@Override
public Scan preScannerOpen(CoprocessorEnvironment e, Scan scan) {
// not tested -- need to go through the RS to get here
return scan;
}
@Override
public void postScannerOpen(CoprocessorEnvironment e, Scan scan,
long scannerId) {
// not tested -- need to go through the RS to get here
}
@Override
public void preScannerNext(final CoprocessorEnvironment e,
final long scannerId) {
// not tested -- need to go through the RS to get here
}
@Override
public List<KeyValue> postScannerNext(final CoprocessorEnvironment e,
final long scannerId, List<KeyValue> results) {
// not tested -- need to go through the RS to get here
return results;
}
@Override
public void preScannerClose(final CoprocessorEnvironment e,
final long scannerId) {
// not tested -- need to go through the RS to get here
}
@Override
public void postScannerClose(final CoprocessorEnvironment e,
final long scannerId) {
// not tested -- need to go through the RS to get here
}
@Override
public Increment preIncrement(CoprocessorEnvironment e, Increment increment)
throws IOException {
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE_2)) {
hadPreIncrement = true;
}
return increment;
}
@Override
public Result postIncrement(CoprocessorEnvironment e, Increment increment,
Result result) throws IOException {
if (Arrays.equals(e.getRegion().getTableDesc().getName(),
TestRegionObserverInterface.TEST_TABLE_2)) {
hadPostIncrement = true;
}
return result;
}
boolean hadPreGet() {
return hadPreGet;
}
boolean hadPostGet() {
return hadPostGet;
}
boolean hadPrePut() {
return hadPrePut;
}
boolean hadPostPut() {
return hadPostPut;
}
boolean hadDelete() {
return !beforeDelete;
}
boolean hadPreIncrement() {
return hadPreIncrement;
}
boolean hadPostIncrement() {
return hadPostIncrement;
}
}

View File

@ -0,0 +1,138 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.*;
import org.apache.hadoop.conf.Configuration;
import static org.junit.Assert.*;
import java.util.Map;
import java.io.IOException;
/**
* TestEndpoint: test cases to verify coprocessor Endpoint
*/
public class TestCoprocessorEndpoint {
private static final byte[] TEST_TABLE = Bytes.toBytes("TestTable");
private static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
private static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
private static byte [] ROW = Bytes.toBytes("testRow");
private static final int ROWSIZE = 20;
private static final int rowSeperator1 = 5;
private static final int rowSeperator2 = 12;
private static byte [][] ROWS = makeN(ROW, ROWSIZE);
private static HBaseTestingUtility util = new HBaseTestingUtility();
private static MiniHBaseCluster cluster = null;
@BeforeClass
public static void setupBeforeClass() throws Exception {
// set configure to indicate which cp should be loaded
Configuration conf = util.getConfiguration();
conf.set("hbase.coprocessor.default.classes",
"org.apache.hadoop.hbase.coprocessor.ColumnAggregationEndpoint");
util.startMiniCluster(2);
cluster = util.getMiniHBaseCluster();
HTable table = util.createTable(TEST_TABLE, TEST_FAMILY);
util.createMultiRegions(util.getConfiguration(), table, TEST_FAMILY,
new byte[][]{ HConstants.EMPTY_BYTE_ARRAY, ROWS[rowSeperator1],
ROWS[rowSeperator2]});
for(int i = 0; i < ROWSIZE; i++) {
Put put = new Put(ROWS[i]);
put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
table.put(put);
}
// sleep here is an ugly hack to allow region transitions to finish
Thread.sleep(5000);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();
}
@Test
public void testAggregation() throws Throwable {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE);
Scan scan;
Map<byte[], Long> results;
// scan: for all regions
results = table.coprocessorExec(ColumnAggregationProtocol.class,
ROWS[rowSeperator1 - 1],
ROWS[rowSeperator2 + 1],
new Batch.Call<ColumnAggregationProtocol,Long>() {
public Long call(ColumnAggregationProtocol instance)
throws IOException{
return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
}
});
int sumResult = 0;
int expectedResult = 0;
for (Map.Entry<byte[], Long> e : results.entrySet()) {
sumResult += e.getValue();
}
for(int i = 0;i < ROWSIZE; i++) {
expectedResult += i;
}
assertEquals("Invalid result", sumResult, expectedResult);
results.clear();
// scan: for region 2 and region 3
results = table.coprocessorExec(ColumnAggregationProtocol.class,
ROWS[rowSeperator1 + 1],
ROWS[rowSeperator2 + 1],
new Batch.Call<ColumnAggregationProtocol,Long>() {
public Long call(ColumnAggregationProtocol instance)
throws IOException{
return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
}
});
sumResult = 0;
expectedResult = 0;
for (Map.Entry<byte[], Long> e : results.entrySet()) {
sumResult += e.getValue();
}
for(int i = rowSeperator1;i < ROWSIZE; i++) {
expectedResult += i;
}
assertEquals("Invalid result", sumResult, expectedResult);
}
private static byte [][] makeN(byte [] base, int n) {
byte [][] ret = new byte[n][];
for(int i=0;i<n;i++) {
ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
}
return ret;
}
}

View File

@ -0,0 +1,271 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.coprocessor.Coprocessor;
import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
import org.apache.hadoop.hbase.regionserver.CoprocessorHost;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.SplitTransaction;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.Server;
import org.mockito.Mockito;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import static org.mockito.Mockito.when;
public class TestCoprocessorInterface extends HBaseTestCase {
static final Log LOG = LogFactory.getLog(TestCoprocessorInterface.class);
static final String DIR = "test/build/data/TestCoprocessorInterface/";
private static final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
public static class CoprocessorImpl implements Coprocessor {
private boolean preOpenCalled;
private boolean postOpenCalled;
private boolean preCloseCalled;
private boolean postCloseCalled;
private boolean preCompactCalled;
private boolean postCompactCalled;
private boolean preFlushCalled;
private boolean postFlushCalled;
private boolean preSplitCalled;
private boolean postSplitCalled;
@Override
public void preOpen(CoprocessorEnvironment e) {
preOpenCalled = true;
}
@Override
public void postOpen(CoprocessorEnvironment e) {
postOpenCalled = true;
}
@Override
public void preClose(CoprocessorEnvironment e, boolean abortRequested) {
preCloseCalled = true;
}
@Override
public void postClose(CoprocessorEnvironment e, boolean abortRequested) {
postCloseCalled = true;
}
@Override
public void preCompact(CoprocessorEnvironment e, boolean willSplit) {
preCompactCalled = true;
}
@Override
public void postCompact(CoprocessorEnvironment e, boolean willSplit) {
postCompactCalled = true;
}
@Override
public void preFlush(CoprocessorEnvironment e) {
preFlushCalled = true;
}
@Override
public void postFlush(CoprocessorEnvironment e) {
postFlushCalled = true;
}
@Override
public void preSplit(CoprocessorEnvironment e) {
preSplitCalled = true;
}
@Override
public void postSplit(CoprocessorEnvironment e, HRegion l, HRegion r) {
postSplitCalled = true;
}
boolean wasOpened() {
return (preOpenCalled && postOpenCalled);
}
boolean wasClosed() {
return (preCloseCalled && postCloseCalled);
}
boolean wasFlushed() {
return (preFlushCalled && postFlushCalled);
}
boolean wasCompacted() {
return (preCompactCalled && postCompactCalled);
}
boolean wasSplit() {
return (preSplitCalled && postSplitCalled);
}
}
public void testCoprocessorInterface() throws IOException {
byte [] tableName = Bytes.toBytes("testtable");
byte [][] families = { fam1, fam2, fam3 };
Configuration hc = initSplit();
HRegion region = initHRegion(tableName, getName(), hc,
CoprocessorImpl.class, families);
addContent(region, fam3);
region.flushcache();
byte [] splitRow = region.compactStores();
assertNotNull(splitRow);
HRegion [] regions = split(region, splitRow);
for (int i = 0; i < regions.length; i++) {
regions[i] = reopenRegion(regions[i], CoprocessorImpl.class);
}
region.close();
region.getLog().closeAndDelete();
Coprocessor c = region.getCoprocessorHost()
.findCoprocessor(CoprocessorImpl.class.getName());
assertTrue(((CoprocessorImpl)c).wasOpened());
assertTrue(((CoprocessorImpl)c).wasClosed());
assertTrue(((CoprocessorImpl)c).wasFlushed());
assertTrue(((CoprocessorImpl)c).wasCompacted());
assertTrue(((CoprocessorImpl)c).wasSplit());
for (int i = 0; i < regions.length; i++) {
regions[i].close();
regions[i].getLog().closeAndDelete();
c = region.getCoprocessorHost()
.findCoprocessor(CoprocessorImpl.class.getName());
assertTrue(((CoprocessorImpl)c).wasOpened());
assertTrue(((CoprocessorImpl)c).wasClosed());
assertTrue(((CoprocessorImpl)c).wasCompacted());
}
}
HRegion reopenRegion(final HRegion closedRegion, Class<?> implClass)
throws IOException {
HRegion r = new HRegion(closedRegion.getRegionDir(), closedRegion.getLog(),
closedRegion.getFilesystem(), closedRegion.getConf(),
closedRegion.getRegionInfo(), null);
r.initialize();
// this following piece is a hack. currently a coprocessorHost
// is secretly loaded at OpenRegionHandler. we don't really
// start a region server here, so just manually create cphost
// and set it to region.
CoprocessorHost host = new CoprocessorHost(r, null, conf);
r.setCoprocessorHost(host);
host.load(implClass, Priority.USER);
// we need to manually call pre- and postOpen here since the
// above load() is not the real case for CP loading. A CP is
// expected to be loaded by default from 1) configuration; or 2)
// HTableDescriptor. If it's loaded after HRegion initialized,
// the pre- and postOpen() won't be triggered automatically.
// Here we have to call pre and postOpen explicitly.
host.preOpen();
host.postOpen();
return r;
}
HRegion initHRegion (byte [] tableName, String callingMethod,
Configuration conf, Class<?> implClass, byte [] ... families)
throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName);
for(byte [] family : families) {
htd.addFamily(new HColumnDescriptor(family));
}
HRegionInfo info = new HRegionInfo(htd, null, null, false);
Path path = new Path(DIR + callingMethod);
HRegion r = HRegion.createHRegion(info, path, conf);
// this following piece is a hack.
CoprocessorHost host = new CoprocessorHost(r, null, conf);
r.setCoprocessorHost(host);
host.load(implClass, Priority.USER);
// Here we have to call pre and postOpen explicitly.
host.preOpen();
host.postOpen();
return r;
}
Configuration initSplit() {
// Always compact if there is more than one store file.
TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 2);
// Make lease timeout longer, lease checks less frequent
TEST_UTIL.getConfiguration().setInt(
"hbase.master.lease.thread.wakefrequency", 5 * 1000);
TEST_UTIL.getConfiguration().setInt(
"hbase.regionserver.lease.period", 10 * 1000);
// Increase the amount of time between client retries
TEST_UTIL.getConfiguration().setLong("hbase.client.pause", 15 * 1000);
// This size should make it so we always split using the addContent
// below. After adding all data, the first region is 1.3M
TEST_UTIL.getConfiguration().setLong("hbase.hregion.max.filesize",
1024 * 128);
TEST_UTIL.getConfiguration().setBoolean("hbase.testing.nocluster",
true);
return TEST_UTIL.getConfiguration();
}
private HRegion [] split(final HRegion r, final byte [] splitRow)
throws IOException {
HRegion[] regions = new HRegion[2];
SplitTransaction st = new SplitTransaction(r, splitRow);
int i = 0;
if (!st.prepare()) {
// test fails.
assertTrue(false);
}
try {
Server mockServer = Mockito.mock(Server.class);
when(mockServer.getConfiguration()).thenReturn(
TEST_UTIL.getConfiguration());
PairOfSameType<HRegion> daughters = st.execute(mockServer, null);
for (HRegion each_daughter: daughters) {
regions[i] = each_daughter;
i++;
}
}
catch (IOException ioe) {
LOG.info("Split transaction of " + r.getRegionNameAsString() +
" failed:" + ioe.getMessage());
assertTrue(false);
}
catch (RuntimeException e) {
LOG.info("Failed rollback of failed split of " +
r.getRegionNameAsString() + e.getMessage());
}
assertTrue(i == 2);
return regions;
}
}

View File

@ -0,0 +1,195 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
import org.apache.hadoop.hbase.regionserver.CoprocessorHost;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import static org.junit.Assert.*;
public class TestRegionObserverInterface {
static final Log LOG = LogFactory.getLog(TestRegionObserverInterface.class);
static final String DIR = "test/build/data/TestRegionObserver/";
public static final byte[] TEST_TABLE = Bytes.toBytes("TestTable");
public static final byte[] TEST_TABLE_2 = Bytes.toBytes("TestTable2");
public static final byte[] TEST_FAMILY = Bytes.toBytes("TestFamily");
public static final byte[] TEST_QUALIFIER = Bytes.toBytes("TestQualifier");
public final static byte[] A = Bytes.toBytes("a");
public final static byte[] B = Bytes.toBytes("b");
public final static byte[] C = Bytes.toBytes("c");
public final static byte[] ROW = Bytes.toBytes("testrow");
public final static byte[] ROW1 = Bytes.toBytes("testrow1");
public final static byte[] ROW2 = Bytes.toBytes("testrow2");
private static final int ROWSIZE = 20;
private static byte [][] ROWS = makeN(ROW, ROWSIZE);
private static HBaseTestingUtility util = new HBaseTestingUtility();
private static MiniHBaseCluster cluster = null;
@BeforeClass
public static void setupBeforeClass() throws Exception {
// set configure to indicate which cp should be loaded
Configuration conf = util.getConfiguration();
conf.set("hbase.coprocessor.default.classes",
"org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver");
util.startMiniCluster(2);
cluster = util.getMiniHBaseCluster();
HTable table = util.createTable(TEST_TABLE_2, TEST_FAMILY);
for(int i = 0; i < ROWSIZE; i++) {
Put put = new Put(ROWS[i]);
put.add(TEST_FAMILY, TEST_QUALIFIER, Bytes.toBytes(i));
table.put(put);
}
// sleep here is an ugly hack to allow region transitions to finish
Thread.sleep(5000);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
util.shutdownMiniCluster();
}
HRegion initHRegion (byte [] tableName, String callingMethod,
Configuration conf, Class<?> implClass, byte [] ... families)
throws IOException{
HTableDescriptor htd = new HTableDescriptor(tableName);
for(byte [] family : families) {
htd.addFamily(new HColumnDescriptor(family));
}
HRegionInfo info = new HRegionInfo(htd, null, null, false);
Path path = new Path(DIR + callingMethod);
// this following piece is a hack. currently a coprocessorHost
// is secretly loaded at OpenRegionHandler. we don't really
// start a region server here, so just manually create cphost
// and set it to region.
HRegion r = HRegion.createHRegion(info, path, conf);
CoprocessorHost host = new CoprocessorHost(r, null, conf);
r.setCoprocessorHost(host);
host.load(implClass, Priority.USER);
return r;
}
@Test
public void testRegionObserver() throws IOException {
byte[] TABLE = Bytes.toBytes(getClass().getName());
byte[][] FAMILIES = new byte[][] { A, B, C } ;
Put put = new Put(ROW);
put.add(A, A, A);
put.add(B, B, B);
put.add(C, C, C);
Get get = new Get(ROW);
get.addColumn(A, A);
get.addColumn(B, B);
get.addColumn(C, C);
Delete delete = new Delete(ROW);
delete.deleteColumn(A, A);
delete.deleteColumn(B, B);
delete.deleteColumn(C, C);
for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) {
for (HRegionInfo r : t.getRegionServer().getOnlineRegions()) {
if (!Arrays.equals(r.getTableDesc().getName(), TEST_TABLE)) {
continue;
}
CoprocessorHost cph = t.getRegionServer().getOnlineRegion(r.getRegionName()).
getCoprocessorHost();
Coprocessor c = cph.findCoprocessor(SimpleRegionObserver.class.getName());
assertNotNull(c);
assertTrue(((SimpleRegionObserver)c).hadPreGet());
assertTrue(((SimpleRegionObserver)c).hadPostGet());
assertTrue(((SimpleRegionObserver)c).hadPrePut());
assertTrue(((SimpleRegionObserver)c).hadPostPut());
assertTrue(((SimpleRegionObserver)c).hadDelete());
}
}
}
// TODO: add tests for other methods which need to be tested
// at region servers.
@Test
public void testIncrementHook() throws IOException {
HTable table = new HTable(util.getConfiguration(), TEST_TABLE_2);
Increment inc = new Increment(Bytes.toBytes(0));
inc.addColumn(TEST_FAMILY, TEST_QUALIFIER, 1);
table.increment(inc);
for (JVMClusterUtil.RegionServerThread t : cluster.getRegionServerThreads()) {
for (HRegionInfo r : t.getRegionServer().getOnlineRegions()) {
if (!Arrays.equals(r.getTableDesc().getName(), TEST_TABLE_2)) {
continue;
}
CoprocessorHost cph = t.getRegionServer().getOnlineRegion(r.getRegionName()).
getCoprocessorHost();
Coprocessor c = cph.findCoprocessor(SimpleRegionObserver.class.getName());
assertTrue(((SimpleRegionObserver)c).hadPreIncrement());
assertTrue(((SimpleRegionObserver)c).hadPostIncrement());
}
}
}
private static byte [][] makeN(byte [] base, int n) {
byte [][] ret = new byte[n][];
for(int i=0;i<n;i++) {
ret[i] = Bytes.add(base, Bytes.toBytes(String.format("%02d", i)));
}
return ret;
}
}

View File

@ -0,0 +1,134 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
import org.apache.hadoop.hbase.coprocessor.CoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.CoprocessorHost;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import junit.framework.TestCase;
public class TestRegionObserverStacking extends TestCase {
static final String DIR = "test/build/data/TestRegionObserverStacking/";
public static class ObserverA extends BaseRegionObserverCoprocessor {
long id;
@Override
public void postPut(final CoprocessorEnvironment e,
Map<byte[], List<KeyValue>> familyMap) {
id = System.currentTimeMillis();
try {
Thread.sleep(10);
} catch (InterruptedException ex) {
}
}
}
public static class ObserverB extends BaseRegionObserverCoprocessor {
long id;
@Override
public void postPut(final CoprocessorEnvironment e,
Map<byte[], List<KeyValue>> familyMap) {
id = System.currentTimeMillis();
try {
Thread.sleep(10);
} catch (InterruptedException ex) {
}
}
}
public static class ObserverC extends BaseRegionObserverCoprocessor {
long id;
@Override
public void postPut(final CoprocessorEnvironment e,
Map<byte[], List<KeyValue>> familyMap) {
id = System.currentTimeMillis();
try {
Thread.sleep(10);
} catch (InterruptedException ex) {
}
}
}
HRegion initHRegion (byte [] tableName, String callingMethod,
Configuration conf, byte [] ... families) throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName);
for(byte [] family : families) {
htd.addFamily(new HColumnDescriptor(family));
}
HRegionInfo info = new HRegionInfo(htd, null, null, false);
Path path = new Path(DIR + callingMethod);
HRegion r = HRegion.createHRegion(info, path, conf);
// this following piece is a hack. currently a coprocessorHost
// is secretly loaded at OpenRegionHandler. we don't really
// start a region server here, so just manually create cphost
// and set it to region.
CoprocessorHost host = new CoprocessorHost(r, null, conf);
r.setCoprocessorHost(host);
return r;
}
public void testRegionObserverStacking() throws Exception {
byte[] ROW = Bytes.toBytes("testRow");
byte[] TABLE = Bytes.toBytes(getClass().getName());
byte[] A = Bytes.toBytes("A");
byte[][] FAMILIES = new byte[][] { A } ;
HRegion region = initHRegion(TABLE, getClass().getName(),
HBaseConfiguration.create(), FAMILIES);
CoprocessorHost h = region.getCoprocessorHost();
h.load(ObserverA.class, Priority.HIGHEST);
h.load(ObserverB.class, Priority.USER);
h.load(ObserverC.class, Priority.LOWEST);
Put put = new Put(ROW);
put.add(A, A, A);
int lockid = region.obtainRowLock(ROW);
region.put(put, lockid);
region.releaseRowLock(lockid);
Coprocessor c = h.findCoprocessor(ObserverA.class.getName());
long idA = ((ObserverA)c).id;
c = h.findCoprocessor(ObserverB.class.getName());
long idB = ((ObserverB)c).id;
c = h.findCoprocessor(ObserverC.class.getName());
long idC = ((ObserverC)c).id;
assertTrue(idA < idB);
assertTrue(idB < idC);
}
}