HBASE-6505 Allow shared RegionObserver state

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1369516 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2012-08-05 02:37:35 +00:00
parent ffe24f3c97
commit 103b7b3eee
4 changed files with 177 additions and 20 deletions

View File

@ -213,6 +213,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
paths.add(file.toURL());
}
}
jarFile.close();
StringTokenizer st = new StringTokenizer(cp, File.pathSeparator);
while (st.hasMoreTokens()) {

View File

@ -20,6 +20,8 @@
package org.apache.hadoop.hbase.coprocessor;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
@ -35,4 +37,7 @@ public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment {
/** @return reference to the region server services */
public RegionServerServices getRegionServerServices();
/** @return shared data between all instances of this coprocessor */
public ConcurrentMap<String, Object> getSharedData();
}

View File

@ -20,21 +20,39 @@
package org.apache.hadoop.hbase.regionserver;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import org.apache.commons.collections.map.AbstractReferenceMap;
import org.apache.commons.collections.map.ReferenceMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.*;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@ -45,9 +63,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.util.*;
import java.util.regex.Matcher;
import com.google.common.collect.ImmutableList;
/**
* Implements the coprocessor environment and runtime support for coprocessors
@ -57,6 +73,9 @@ public class RegionCoprocessorHost
extends CoprocessorHost<RegionCoprocessorHost.RegionEnvironment> {
private static final Log LOG = LogFactory.getLog(RegionCoprocessorHost.class);
// The shared data map
private static ReferenceMap sharedDataMap =
new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK);
/**
* Encapsulation of the environment of each coprocessor
@ -66,6 +85,7 @@ public class RegionCoprocessorHost
private HRegion region;
private RegionServerServices rsServices;
ConcurrentMap<String, Object> sharedData;
/**
* Constructor
@ -74,10 +94,11 @@ public class RegionCoprocessorHost
*/
public RegionEnvironment(final Coprocessor impl, final int priority,
final int seq, final Configuration conf, final HRegion region,
final RegionServerServices services) {
final RegionServerServices services, final ConcurrentMap<String, Object> sharedData) {
super(impl, priority, seq, conf);
this.region = region;
this.rsServices = services;
this.sharedData = sharedData;
}
/** @return the region */
@ -95,6 +116,11 @@ public class RegionCoprocessorHost
public void shutdown() {
super.shutdown();
}
@Override
public ConcurrentMap<String, Object> getSharedData() {
return sharedData;
}
}
/** The region server services */
@ -194,8 +220,19 @@ public class RegionCoprocessorHost
break;
}
}
ConcurrentMap<String, Object> classData;
// make sure only one thread can add maps
synchronized (sharedDataMap) {
// as long as at least one RegionEnvironment holds on to its classData it will
// remain in this map
classData = (ConcurrentMap<String, Object>)sharedDataMap.get(implClass.getName());
if (classData == null) {
classData = new ConcurrentHashMap<String, Object>();
sharedDataMap.put(implClass.getName(), classData);
}
}
return new RegionEnvironment(instance, priority, seq, conf, region,
rsServices);
rsServices, classData);
}
@Override

View File

@ -25,13 +25,18 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.collections.map.AbstractReferenceMap;
import org.apache.commons.collections.map.ReferenceMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@ -41,6 +46,8 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
@ -49,6 +56,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.SplitTransaction;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.junit.experimental.categories.Category;
@ -132,14 +140,19 @@ public class TestCoprocessorInterface extends HBaseTestCase {
private boolean postFlushCalled;
private boolean preSplitCalled;
private boolean postSplitCalled;
private ConcurrentMap<String, Object> sharedData;
@Override
public void start(CoprocessorEnvironment e) {
sharedData = ((RegionCoprocessorEnvironment)e).getSharedData();
// using new String here, so that there will be new object on each invocation
sharedData.putIfAbsent("test1", new Object());
startCalled = true;
}
@Override
public void stop(CoprocessorEnvironment e) {
sharedData = null;
stopCalled = true;
}
@ -214,6 +227,104 @@ public class TestCoprocessorInterface extends HBaseTestCase {
boolean wasSplit() {
return (preSplitCalled && postSplitCalled);
}
Map<String, Object> getSharedData() {
return sharedData;
}
}
public static class CoprocessorII extends BaseRegionObserver {
private ConcurrentMap<String, Object> sharedData;
@Override
public void start(CoprocessorEnvironment e) {
sharedData = ((RegionCoprocessorEnvironment)e).getSharedData();
sharedData.putIfAbsent("test2", new Object());
}
@Override
public void stop(CoprocessorEnvironment e) {
sharedData = null;
}
@Override
public void preGet(final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<KeyValue> results) throws IOException {
if (1/0 == 1) {
e.complete();
}
}
Map<String, Object> getSharedData() {
return sharedData;
}
}
public void testSharedData() throws IOException {
byte [] tableName = Bytes.toBytes("testtable");
byte [][] families = { fam1, fam2, fam3 };
Configuration hc = initSplit();
HRegion region = initHRegion(tableName, getName(), hc,
new Class<?>[]{}, families);
for (int i = 0; i < 3; i++) {
addContent(region, fam3);
region.flushcache();
}
region.compactStores();
byte [] splitRow = region.checkSplit();
assertNotNull(splitRow);
HRegion [] regions = split(region, splitRow);
for (int i = 0; i < regions.length; i++) {
regions[i] = reopenRegion(regions[i], CoprocessorImpl.class, CoprocessorII.class);
}
Coprocessor c = regions[0].getCoprocessorHost().
findCoprocessor(CoprocessorImpl.class.getName());
Coprocessor c2 = regions[0].getCoprocessorHost().
findCoprocessor(CoprocessorII.class.getName());
Object o = ((CoprocessorImpl)c).getSharedData().get("test1");
Object o2 = ((CoprocessorII)c2).getSharedData().get("test2");
assertNotNull(o);
assertNotNull(o2);
// to coprocessors get different sharedDatas
assertFalse(((CoprocessorImpl)c).getSharedData() == ((CoprocessorII)c2).getSharedData());
for (int i = 1; i < regions.length; i++) {
c = regions[i].getCoprocessorHost().
findCoprocessor(CoprocessorImpl.class.getName());
c2 = regions[i].getCoprocessorHost().
findCoprocessor(CoprocessorII.class.getName());
// make sure that all coprocessor of a class have identical sharedDatas
assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
assertTrue(((CoprocessorII)c2).getSharedData().get("test2") == o2);
}
// now have all Environments fail
for (int i = 0; i < regions.length; i++) {
try {
Get g = new Get(regions[i].getStartKey());
regions[i].get(g, null);
fail();
} catch (DoNotRetryIOException xc) {
}
assertNull(regions[i].getCoprocessorHost().
findCoprocessor(CoprocessorII.class.getName()));
}
c = regions[0].getCoprocessorHost().
findCoprocessor(CoprocessorImpl.class.getName());
assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
c = c2 = null;
// perform a GC
System.gc();
// reopen the region
region = reopenRegion(regions[0], CoprocessorImpl.class, CoprocessorII.class);
c = region.getCoprocessorHost().
findCoprocessor(CoprocessorImpl.class.getName());
// CPimpl is unaffected, still the same reference
assertTrue(((CoprocessorImpl)c).getSharedData().get("test1") == o);
c2 = region.getCoprocessorHost().
findCoprocessor(CoprocessorII.class.getName());
// new map and object created, hence the reference is different
// hence the old entry was indeed removed by the GC and new one has been created
assertFalse(((CoprocessorII)c2).getSharedData().get("test2") == o2);
}
public void testCoprocessorInterface() throws IOException {
@ -222,7 +333,7 @@ public class TestCoprocessorInterface extends HBaseTestCase {
Configuration hc = initSplit();
HRegion region = initHRegion(tableName, getName(), hc,
CoprocessorImpl.class, families);
new Class<?>[]{CoprocessorImpl.class}, families);
for (int i = 0; i < 3; i++) {
addContent(region, fam3);
region.flushcache();
@ -268,7 +379,7 @@ public class TestCoprocessorInterface extends HBaseTestCase {
}
}
HRegion reopenRegion(final HRegion closedRegion, Class<?> implClass)
HRegion reopenRegion(final HRegion closedRegion, Class<?> ... implClasses)
throws IOException {
//HRegionInfo info = new HRegionInfo(tableName, null, null, false);
HRegion r = new HRegion(closedRegion);
@ -281,7 +392,9 @@ public class TestCoprocessorInterface extends HBaseTestCase {
RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
r.setCoprocessorHost(host);
for (Class<?> implClass : implClasses) {
host.load(implClass, Coprocessor.PRIORITY_USER, conf);
}
// we need to manually call pre- and postOpen here since the
// above load() is not the real case for CP loading. A CP is
// expected to be loaded by default from 1) configuration; or 2)
@ -294,7 +407,7 @@ public class TestCoprocessorInterface extends HBaseTestCase {
}
HRegion initHRegion (byte [] tableName, String callingMethod,
Configuration conf, Class<?> implClass, byte [] ... families)
Configuration conf, Class<?> [] implClasses, byte [][] families)
throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName);
for(byte [] family : families) {
@ -308,10 +421,11 @@ public class TestCoprocessorInterface extends HBaseTestCase {
RegionCoprocessorHost host = new RegionCoprocessorHost(r, null, conf);
r.setCoprocessorHost(host);
for (Class<?> implClass : implClasses) {
host.load(implClass, Coprocessor.PRIORITY_USER, conf);
Coprocessor c = host.findCoprocessor(implClass.getName());
assertNotNull(c);
}
// Here we have to call pre and postOpen explicitly.
host.preOpen();