HBASE-3587 Eliminate use of read-write lock to guard loaded coprocessor collection

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1089685 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Helmling 2011-04-06 23:59:34 +00:00
parent 1596f98835
commit 9e6f97fd61
7 changed files with 722 additions and 773 deletions

View File

@ -139,6 +139,8 @@ Release 0.91.0 - Unreleased
(Doug Meil via Stack)
HBASE-3738 Book.xml - expanding Architecture Client section
(Doug Meil via Stack)
HBASE-3587 Eliminate use of read-write lock to guard loaded
coprocessor collection
TASK
HBASE-3559 Move report of split to master OFF the heartbeat channel

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.SortedCopyOnWriteSet;
import org.apache.hadoop.hbase.util.VersionInfo;
import java.io.File;
@ -38,7 +39,6 @@ import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Provides the common setup framework and runtime services for coprocessor
@ -56,9 +56,9 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
private static final Log LOG = LogFactory.getLog(CoprocessorHost.class);
/** Ordered set of loaded coprocessors with lock */
protected final ReentrantReadWriteLock coprocessorLock = new ReentrantReadWriteLock();
protected Set<E> coprocessors =
new TreeSet<E>(new EnvironmentPriorityComparator());
protected SortedSet<E> coprocessors =
new SortedCopyOnWriteSet<E>(new EnvironmentPriorityComparator());
protected Configuration conf;
// unique file prefix to use for local copies of jars when classloading
protected String pathPrefix;
@ -79,6 +79,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
return;
StringTokenizer st = new StringTokenizer(defaultCPClasses, ",");
int priority = Coprocessor.Priority.SYSTEM.intValue();
List<E> configured = new ArrayList<E>();
while (st.hasMoreTokens()) {
String className = st.nextToken();
if (findCoprocessor(className) != null) {
@ -88,7 +89,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
Thread.currentThread().setContextClassLoader(cl);
try {
implClass = cl.loadClass(className);
load(implClass, Coprocessor.Priority.SYSTEM);
configured.add(loadInstance(implClass, Coprocessor.Priority.SYSTEM));
LOG.info("System coprocessor " + className + " was loaded " +
"successfully with priority (" + priority++ + ").");
} catch (ClassNotFoundException e) {
@ -99,6 +100,9 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
e.getMessage());
}
}
// add entire set to the collection for COW efficiency
coprocessors.addAll(configured);
}
/**
@ -109,7 +113,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
* @throws java.io.IOException Exception
*/
@SuppressWarnings("deprecation")
public void load(Path path, String className, Coprocessor.Priority priority)
public E load(Path path, String className, Coprocessor.Priority priority)
throws IOException {
Class<?> implClass = null;
@ -163,7 +167,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
}
}
load(implClass, priority);
return loadInstance(implClass, priority);
}
/**
@ -173,6 +177,12 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
*/
public void load(Class<?> implClass, Coprocessor.Priority priority)
throws IOException {
E env = loadInstance(implClass, priority);
coprocessors.add(env);
}
public E loadInstance(Class<?> implClass, Coprocessor.Priority priority)
throws IOException {
// create the instance
Coprocessor impl;
Object o = null;
@ -189,13 +199,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
if (env instanceof Environment) {
((Environment)env).startup();
}
try {
coprocessorLock.writeLock().lock();
coprocessors.add(env);
} finally {
coprocessorLock.writeLock().unlock();
}
return env;
}
/**
@ -220,8 +224,6 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
*/
public Coprocessor findCoprocessor(String className) {
// initialize the coprocessors
try {
coprocessorLock.readLock().lock();
for (E env: coprocessors) {
if (env.getInstance().getClass().getName().equals(className) ||
env.getInstance().getClass().getSimpleName().equals(className)) {
@ -229,9 +231,6 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
}
}
return null;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**

View File

@ -70,8 +70,6 @@ public class MasterCoprocessorHost
/* Implementation of hooks for invoking MasterObservers */
void preCreateTable(HTableDescriptor desc, byte[][] splitKeys)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).preCreateTable(env, desc, splitKeys);
@ -80,14 +78,9 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
void postCreateTable(HRegionInfo[] regions, boolean sync) throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).postCreateTable(env, regions, sync);
@ -96,14 +89,9 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
void preDeleteTable(byte[] tableName) throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).preDeleteTable(env, tableName);
@ -112,14 +100,9 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
void postDeleteTable(byte[] tableName) throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).postDeleteTable(env, tableName);
@ -128,15 +111,10 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
void preModifyTable(final byte[] tableName, HTableDescriptor htd)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).preModifyTable(env, tableName, htd);
@ -145,15 +123,10 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
void postModifyTable(final byte[] tableName, HTableDescriptor htd)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).postModifyTable(env, tableName, htd);
@ -162,15 +135,10 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
void preAddColumn(byte [] tableName, HColumnDescriptor column)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).preAddColumn(env, tableName, column);
@ -179,15 +147,10 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
void postAddColumn(byte [] tableName, HColumnDescriptor column)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).postAddColumn(env, tableName, column);
@ -196,15 +159,10 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
void preModifyColumn(byte [] tableName, HColumnDescriptor descriptor)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).preModifyColumn(
@ -214,15 +172,10 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
void postModifyColumn(byte [] tableName, HColumnDescriptor descriptor)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).postModifyColumn(
@ -232,15 +185,10 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
void preDeleteColumn(final byte [] tableName, final byte [] c)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).preDeleteColumn(env, tableName, c);
@ -249,15 +197,10 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
void postDeleteColumn(final byte [] tableName, final byte [] c)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).postDeleteColumn(env, tableName, c);
@ -266,14 +209,9 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
void preEnableTable(final byte [] tableName) throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).preEnableTable(env, tableName);
@ -282,14 +220,9 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
void postEnableTable(final byte [] tableName) throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).postEnableTable(env, tableName);
@ -298,14 +231,9 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
void preDisableTable(final byte [] tableName) throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).preDisableTable(env, tableName);
@ -314,14 +242,9 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
void postDisableTable(final byte [] tableName) throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).postDisableTable(env, tableName);
@ -330,15 +253,10 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
void preMove(final HRegionInfo region, final HServerInfo srcServer, final HServerInfo destServer)
throws UnknownRegionException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).preMove(
@ -348,15 +266,10 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
void postMove(final HRegionInfo region, final HServerInfo srcServer, final HServerInfo destServer)
throws UnknownRegionException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).postMove(
@ -366,16 +279,11 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
boolean preAssign(final byte [] regionName, final boolean force)
throws IOException {
boolean bypass = false;
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).preAssign(env, regionName, force);
@ -385,15 +293,10 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
return bypass;
}
void postAssign(final HRegionInfo regionInfo) throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).postAssign(env, regionInfo);
@ -402,16 +305,11 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
boolean preUnassign(final byte [] regionName, final boolean force)
throws IOException {
boolean bypass = false;
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).preUnassign(
@ -422,16 +320,11 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
return bypass;
}
void postUnassign(final HRegionInfo regionInfo, final boolean force)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).postUnassign(
@ -441,15 +334,10 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
boolean preBalance() throws IOException {
try {
boolean bypass = false;
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).preBalance(env);
@ -460,14 +348,9 @@ public class MasterCoprocessorHost
}
}
return bypass;
} finally {
coprocessorLock.readLock().unlock();
}
}
void postBalance() throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).postBalance(env);
@ -476,15 +359,10 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
boolean preBalanceSwitch(final boolean b) throws IOException {
boolean balance = b;
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
balance = ((MasterObserver)env.getInstance()).preBalanceSwitch(
@ -494,16 +372,11 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
return balance;
}
void postBalanceSwitch(final boolean oldValue, final boolean newValue)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).postBalanceSwitch(
@ -513,14 +386,9 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
void preShutdown() throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).preShutdown(env);
@ -529,14 +397,9 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
void preStopMaster() throws IOException {
try {
coprocessorLock.readLock().lock();
for (MasterEnvironment env: coprocessors) {
if (env.getInstance() instanceof MasterObserver) {
((MasterObserver)env.getInstance()).preStopMaster(env);
@ -545,9 +408,6 @@ public class MasterCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
}

View File

@ -120,6 +120,7 @@ public class RegionCoprocessorHost
void loadTableCoprocessors () {
// scan the table attributes for coprocessor load specifications
// initialize the coprocessors
List<RegionEnvironment> configured = new ArrayList<RegionEnvironment>();
for (Map.Entry<ImmutableBytesWritable,ImmutableBytesWritable> e:
region.getTableDesc().getValues().entrySet()) {
String key = Bytes.toString(e.getKey().get());
@ -133,7 +134,7 @@ public class RegionCoprocessorHost
String className = matcher.group(2);
Coprocessor.Priority priority =
Coprocessor.Priority.valueOf(matcher.group(3));
load(path, className, priority);
configured.add(load(path, className, priority));
LOG.info("Load coprocessor " + className + " from HTD of " +
Bytes.toString(region.getTableDesc().getName()) +
" successfully.");
@ -145,6 +146,8 @@ public class RegionCoprocessorHost
}
}
}
// add together to coprocessor set for COW efficiency
coprocessors.addAll(configured);
}
@Override
@ -170,8 +173,6 @@ public class RegionCoprocessorHost
*/
public void preOpen() {
loadTableCoprocessors();
try {
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).preOpen(env);
@ -180,17 +181,12 @@ public class RegionCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
* Invoked after a region open
*/
public void postOpen() {
try {
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).postOpen(env);
@ -199,9 +195,6 @@ public class RegionCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -209,16 +202,11 @@ public class RegionCoprocessorHost
* @param abortRequested true if the server is aborting
*/
public void preClose(boolean abortRequested) {
try {
coprocessorLock.writeLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).preClose(env, abortRequested);
}
}
} finally {
coprocessorLock.writeLock().unlock();
}
}
/**
@ -226,17 +214,12 @@ public class RegionCoprocessorHost
* @param abortRequested true if the server is aborting
*/
public void postClose(boolean abortRequested) {
try {
coprocessorLock.writeLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).postClose(env, abortRequested);
}
shutdown(env);
}
} finally {
coprocessorLock.writeLock().unlock();
}
}
/**
@ -244,8 +227,6 @@ public class RegionCoprocessorHost
* @param willSplit true if the compaction is about to trigger a split
*/
public void preCompact(boolean willSplit) {
try {
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).preCompact(env, willSplit);
@ -254,9 +235,6 @@ public class RegionCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -264,8 +242,6 @@ public class RegionCoprocessorHost
* @param willSplit true if the compaction is about to trigger a split
*/
public void postCompact(boolean willSplit) {
try {
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).postCompact(env, willSplit);
@ -274,17 +250,12 @@ public class RegionCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
* Invoked before a memstore flush
*/
public void preFlush() {
try {
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).preFlush(env);
@ -293,17 +264,12 @@ public class RegionCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
* Invoked after a memstore flush
*/
public void postFlush() {
try {
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).postFlush(env);
@ -312,17 +278,12 @@ public class RegionCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
* Invoked just before a split
*/
public void preSplit() {
try {
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).preSplit(env);
@ -331,9 +292,6 @@ public class RegionCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -342,8 +300,6 @@ public class RegionCoprocessorHost
* @param r the new right-hand daughter region
*/
public void postSplit(HRegion l, HRegion r) {
try {
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).postSplit(env, l, r);
@ -352,9 +308,6 @@ public class RegionCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
// RegionObserver support
@ -368,9 +321,7 @@ public class RegionCoprocessorHost
*/
public boolean preGetClosestRowBefore(final byte[] row, final byte[] family,
final Result result) throws IOException {
try {
boolean bypass = false;
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).preGetClosestRowBefore(env, row, family,
@ -382,9 +333,6 @@ public class RegionCoprocessorHost
}
}
return bypass;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -395,8 +343,6 @@ public class RegionCoprocessorHost
*/
public void postGetClosestRowBefore(final byte[] row, final byte[] family,
final Result result) throws IOException {
try {
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).postGetClosestRowBefore(env, row, family,
@ -406,9 +352,6 @@ public class RegionCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -418,9 +361,7 @@ public class RegionCoprocessorHost
*/
public boolean preGet(final Get get, final List<KeyValue> results)
throws IOException {
try {
boolean bypass = false;
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).preGet(env, get, results);
@ -431,9 +372,6 @@ public class RegionCoprocessorHost
}
}
return bypass;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -444,8 +382,6 @@ public class RegionCoprocessorHost
*/
public void postGet(final Get get, final List<KeyValue> results)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).postGet(env, get, results);
@ -454,9 +390,6 @@ public class RegionCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -466,10 +399,8 @@ public class RegionCoprocessorHost
* @exception IOException Exception
*/
public Boolean preExists(final Get get) throws IOException {
try {
boolean bypass = false;
boolean exists = false;
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
exists = ((RegionObserver)env.getInstance()).preExists(env, get, exists);
@ -480,9 +411,6 @@ public class RegionCoprocessorHost
}
}
return bypass ? exists : null;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -493,8 +421,6 @@ public class RegionCoprocessorHost
*/
public boolean postExists(final Get get, boolean exists)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
exists = ((RegionObserver)env.getInstance()).postExists(env, get, exists);
@ -504,9 +430,6 @@ public class RegionCoprocessorHost
}
}
return exists;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -517,9 +440,7 @@ public class RegionCoprocessorHost
*/
public boolean prePut(final Map<byte[], List<KeyValue>> familyMap,
final boolean writeToWAL) throws IOException {
try {
boolean bypass = false;
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).prePut(env, familyMap, writeToWAL);
@ -530,9 +451,6 @@ public class RegionCoprocessorHost
}
}
return bypass;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -542,8 +460,6 @@ public class RegionCoprocessorHost
*/
public void postPut(final Map<byte[], List<KeyValue>> familyMap,
final boolean writeToWAL) throws IOException {
try {
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).postPut(env, familyMap, writeToWAL);
@ -552,9 +468,6 @@ public class RegionCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -565,9 +478,7 @@ public class RegionCoprocessorHost
*/
public boolean preDelete(final Map<byte[], List<KeyValue>> familyMap,
final boolean writeToWAL) throws IOException {
try {
boolean bypass = false;
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).preDelete(env, familyMap, writeToWAL);
@ -578,9 +489,6 @@ public class RegionCoprocessorHost
}
}
return bypass;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -590,8 +498,6 @@ public class RegionCoprocessorHost
*/
public void postDelete(final Map<byte[], List<KeyValue>> familyMap,
final boolean writeToWAL) throws IOException {
try {
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).postDelete(env, familyMap, writeToWAL);
@ -600,9 +506,6 @@ public class RegionCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -619,12 +522,9 @@ public class RegionCoprocessorHost
public Boolean preCheckAndPut(final byte [] row, final byte [] family,
final byte [] qualifier, final CompareOp compareOp,
final WritableByteArrayComparable comparator, Put put)
throws IOException
{
try {
throws IOException {
boolean bypass = false;
boolean result = false;
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
result = ((RegionObserver)env.getInstance()).preCheckAndPut(env, row, family,
@ -636,9 +536,6 @@ public class RegionCoprocessorHost
}
}
return bypass ? result : null;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -654,10 +551,7 @@ public class RegionCoprocessorHost
final byte [] qualifier, final CompareOp compareOp,
final WritableByteArrayComparable comparator, final Put put,
boolean result)
throws IOException
{
try {
coprocessorLock.readLock().lock();
throws IOException {
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
result = ((RegionObserver)env.getInstance()).postCheckAndPut(env, row,
@ -668,9 +562,6 @@ public class RegionCoprocessorHost
}
}
return result;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -688,10 +579,8 @@ public class RegionCoprocessorHost
final byte [] qualifier, final CompareOp compareOp,
final WritableByteArrayComparable comparator, Delete delete)
throws IOException {
try {
boolean bypass = false;
boolean result = false;
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
result = ((RegionObserver)env.getInstance()).preCheckAndDelete(env, row,
@ -703,9 +592,6 @@ public class RegionCoprocessorHost
}
}
return bypass ? result : null;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -721,10 +607,7 @@ public class RegionCoprocessorHost
final byte [] qualifier, final CompareOp compareOp,
final WritableByteArrayComparable comparator, final Delete delete,
boolean result)
throws IOException
{
try {
coprocessorLock.readLock().lock();
throws IOException {
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
result = ((RegionObserver)env.getInstance())
@ -736,9 +619,6 @@ public class RegionCoprocessorHost
}
}
return result;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -754,9 +634,7 @@ public class RegionCoprocessorHost
public Long preIncrementColumnValue(final byte [] row, final byte [] family,
final byte [] qualifier, long amount, final boolean writeToWAL)
throws IOException {
try {
boolean bypass = false;
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
amount = ((RegionObserver)env.getInstance()).preIncrementColumnValue(env,
@ -768,9 +646,6 @@ public class RegionCoprocessorHost
}
}
return bypass ? amount : null;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -786,8 +661,6 @@ public class RegionCoprocessorHost
public long postIncrementColumnValue(final byte [] row, final byte [] family,
final byte [] qualifier, final long amount, final boolean writeToWAL,
long result) throws IOException {
try {
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
result = ((RegionObserver)env.getInstance()).postIncrementColumnValue(env,
@ -797,9 +670,6 @@ public class RegionCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
return result;
}
@ -811,10 +681,8 @@ public class RegionCoprocessorHost
*/
public Result preIncrement(Increment increment)
throws IOException {
try {
boolean bypass = false;
Result result = new Result();
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).preIncrement(env, increment, result);
@ -825,9 +693,6 @@ public class RegionCoprocessorHost
}
}
return bypass ? result : null;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -837,8 +702,6 @@ public class RegionCoprocessorHost
*/
public void postIncrement(final Increment increment, Result result)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).postIncrement(env, increment, result);
@ -847,9 +710,6 @@ public class RegionCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -859,10 +719,8 @@ public class RegionCoprocessorHost
* @exception IOException Exception
*/
public InternalScanner preScannerOpen(Scan scan) throws IOException {
try {
boolean bypass = false;
InternalScanner s = null;
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
s = ((RegionObserver)env.getInstance()).preScannerOpen(env, scan, s);
@ -873,9 +731,6 @@ public class RegionCoprocessorHost
}
}
return bypass ? s : null;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -886,8 +741,6 @@ public class RegionCoprocessorHost
*/
public InternalScanner postScannerOpen(final Scan scan, InternalScanner s)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
s = ((RegionObserver)env.getInstance()).postScannerOpen(env, scan, s);
@ -897,9 +750,6 @@ public class RegionCoprocessorHost
}
}
return s;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -912,10 +762,8 @@ public class RegionCoprocessorHost
*/
public Boolean preScannerNext(final InternalScanner s,
final List<Result> results, int limit) throws IOException {
try {
boolean bypass = false;
boolean hasNext = false;
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
hasNext = ((RegionObserver)env.getInstance()).preScannerNext(env, s, results,
@ -927,9 +775,6 @@ public class RegionCoprocessorHost
}
}
return bypass ? hasNext : null;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -943,8 +788,6 @@ public class RegionCoprocessorHost
public boolean postScannerNext(final InternalScanner s,
final List<Result> results, final int limit, boolean hasMore)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
hasMore = ((RegionObserver)env.getInstance()).postScannerNext(env, s,
@ -955,9 +798,6 @@ public class RegionCoprocessorHost
}
}
return hasMore;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -967,9 +807,7 @@ public class RegionCoprocessorHost
*/
public boolean preScannerClose(final InternalScanner s)
throws IOException {
try {
boolean bypass = false;
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).preScannerClose(env, s);
@ -980,9 +818,6 @@ public class RegionCoprocessorHost
}
}
return bypass;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -991,8 +826,6 @@ public class RegionCoprocessorHost
*/
public void postScannerClose(final InternalScanner s)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).postScannerClose(env, s);
@ -1001,9 +834,6 @@ public class RegionCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -1015,9 +845,7 @@ public class RegionCoprocessorHost
*/
public boolean preWALRestore(HRegionInfo info, HLogKey logKey,
WALEdit logEdit) throws IOException {
try {
boolean bypass = false;
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).preWALRestore(env, info, logKey,
@ -1029,9 +857,6 @@ public class RegionCoprocessorHost
}
}
return bypass;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -1042,8 +867,6 @@ public class RegionCoprocessorHost
*/
public void postWALRestore(HRegionInfo info, HLogKey logKey,
WALEdit logEdit) throws IOException {
try {
coprocessorLock.readLock().lock();
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
((RegionObserver)env.getInstance()).postWALRestore(env, info,
@ -1053,8 +876,5 @@ public class RegionCoprocessorHost
break;
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
}

View File

@ -95,9 +95,7 @@ public class WALCoprocessorHost
*/
public boolean preWALWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit)
throws IOException {
try {
boolean bypass = false;
coprocessorLock.readLock().lock();
for (WALEnvironment env: coprocessors) {
if (env.getInstance() instanceof
org.apache.hadoop.hbase.coprocessor.WALObserver) {
@ -110,9 +108,6 @@ public class WALCoprocessorHost
}
}
return bypass;
} finally {
coprocessorLock.readLock().unlock();
}
}
/**
@ -123,8 +118,6 @@ public class WALCoprocessorHost
*/
public void postWALWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit)
throws IOException {
try {
coprocessorLock.readLock().lock();
for (WALEnvironment env: coprocessors) {
if (env.getInstance() instanceof
org.apache.hadoop.hbase.coprocessor.WALObserver) {
@ -135,8 +128,5 @@ public class WALCoprocessorHost
}
}
}
} finally {
coprocessorLock.readLock().unlock();
}
}
}

View File

@ -0,0 +1,175 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.SortedSet;
import java.util.TreeSet;
/**
* Simple {@link java.util.SortedSet} implementation that uses an internal
* {@link java.util.TreeSet} to provide ordering. All mutation operations
* create a new copy of the <code>TreeSet</code> instance, so are very
* expensive. This class is only intended for use on small, very rarely
* written collections that expect highly concurrent reads. Read operations
* are performed on a reference to the internal <code>TreeSet</code> at the
* time of invocation, so will not see any mutations to the collection during
* their operation.
*
* <p>Note that due to the use of a {@link java.util.TreeSet} internally,
* a {@link java.util.Comparator} instance must be provided, or collection
* elements must implement {@link java.lang.Comparable}.
* </p>
* @param <E> A class implementing {@link java.lang.Comparable} or able to be
* compared by a provided comparator.
*/
public class SortedCopyOnWriteSet<E> implements SortedSet<E> {
private SortedSet<E> internalSet;
public SortedCopyOnWriteSet() {
this.internalSet = new TreeSet<E>();
}
public SortedCopyOnWriteSet(Collection<? extends E> c) {
this.internalSet = new TreeSet<E>(c);
}
public SortedCopyOnWriteSet(Comparator<? super E> comparator) {
this.internalSet = new TreeSet<E>(comparator);
}
@Override
public int size() {
return internalSet.size();
}
@Override
public boolean isEmpty() {
return internalSet.isEmpty();
}
@Override
public boolean contains(Object o) {
return internalSet.contains(o);
}
@Override
public Iterator<E> iterator() {
return internalSet.iterator();
}
@Override
public Object[] toArray() {
return internalSet.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return internalSet.toArray(a);
}
@Override
public synchronized boolean add(E e) {
SortedSet<E> newSet = new TreeSet<E>(internalSet);
boolean added = newSet.add(e);
internalSet = newSet;
return added;
}
@Override
public synchronized boolean remove(Object o) {
SortedSet<E> newSet = new TreeSet<E>(internalSet);
boolean removed = newSet.remove(o);
internalSet = newSet;
return removed;
}
@Override
public boolean containsAll(Collection<?> c) {
return internalSet.containsAll(c);
}
@Override
public synchronized boolean addAll(Collection<? extends E> c) {
SortedSet<E> newSet = new TreeSet<E>(internalSet);
boolean changed = newSet.addAll(c);
internalSet = newSet;
return changed;
}
@Override
public synchronized boolean retainAll(Collection<?> c) {
SortedSet<E> newSet = new TreeSet<E>(internalSet);
boolean changed = newSet.retainAll(c);
internalSet = newSet;
return changed;
}
@Override
public synchronized boolean removeAll(Collection<?> c) {
SortedSet<E> newSet = new TreeSet<E>(internalSet);
boolean changed = newSet.removeAll(c);
internalSet = newSet;
return changed;
}
@Override
public synchronized void clear() {
Comparator<? super E> comparator = internalSet.comparator();
if (comparator != null) {
internalSet = new TreeSet<E>(comparator);
} else {
internalSet = new TreeSet<E>();
}
}
@Override
public Comparator<? super E> comparator() {
return internalSet.comparator();
}
@Override
public SortedSet<E> subSet(E fromElement, E toElement) {
return internalSet.subSet(fromElement, toElement);
}
@Override
public SortedSet<E> headSet(E toElement) {
return internalSet.headSet(toElement);
}
@Override
public SortedSet<E> tailSet(E fromElement) {
return internalSet.tailSet(fromElement);
}
@Override
public E first() {
return internalSet.first();
}
@Override
public E last() {
return internalSet.last();
}
}

View File

@ -0,0 +1,103 @@
/*
* Copyright 2011 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.junit.Assert.*;
import java.util.Arrays;
import java.util.Iterator;
import com.google.common.collect.Lists;
import org.junit.Test;
public class TestSortedCopyOnWriteSet {
@Test
public void testSorting() throws Exception {
SortedCopyOnWriteSet<String> set = new SortedCopyOnWriteSet<String>();
set.add("c");
set.add("d");
set.add("a");
set.add("b");
String[] expected = new String[]{"a", "b", "c", "d"};
String[] stored = set.toArray(new String[4]);
assertArrayEquals(expected, stored);
set.add("c");
assertEquals(4, set.size());
stored = set.toArray(new String[4]);
assertArrayEquals(expected, stored);
}
@Test
public void testIteratorIsolation() throws Exception {
SortedCopyOnWriteSet<String> set = new SortedCopyOnWriteSet<String>(
Lists.newArrayList("a", "b", "c", "d", "e"));
// isolation of remove()
Iterator<String> iter = set.iterator();
set.remove("c");
boolean found = false;
while (iter.hasNext() && !found) {
found = "c".equals(iter.next());
}
assertTrue(found);
iter = set.iterator();
found = false;
while (iter.hasNext() && !found) {
found = "c".equals(iter.next());
}
assertFalse(found);
// isolation of add()
iter = set.iterator();
set.add("f");
found = false;
while (iter.hasNext() && !found) {
String next = iter.next();
found = "f".equals(next);
}
assertFalse(found);
// isolation of addAll()
iter = set.iterator();
set.addAll(Lists.newArrayList("g", "h", "i"));
found = false;
while (iter.hasNext() && !found) {
String next = iter.next();
found = "g".equals(next) || "h".equals(next) || "i".equals(next);
}
assertFalse(found);
// isolation of clear()
iter = set.iterator();
set.clear();
assertEquals(0, set.size());
int size = 0;
while (iter.hasNext()) {
iter.next();
size++;
}
assertTrue(size > 0);
}
}