HBASE-3624 Only one coprocessor of each priority can be loaded for a table
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1090500 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
daf0bb52cf
commit
acbe32c4a7
@ -69,6 +69,7 @@ Release 0.91.0 - Unreleased
|
|||||||
HMasterInterface and HMasterRegionInterface versions
|
HMasterInterface and HMasterRegionInterface versions
|
||||||
HBASE-3723 Major compact should be done when there is only one storefile
|
HBASE-3723 Major compact should be done when there is only one storefile
|
||||||
and some keyvalue is outdated (Zhou Shuaifeng via Stack)
|
and some keyvalue is outdated (Zhou Shuaifeng via Stack)
|
||||||
|
HBASE-3624 Only one coprocessor of each priority can be loaded for a table
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
|
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)
|
||||||
|
@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.coprocessor;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.client.HTableInterface;
|
import org.apache.hadoop.hbase.client.HTableInterface;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Coprocessor environment state.
|
* Coprocessor environment state.
|
||||||
@ -39,6 +37,9 @@ public interface CoprocessorEnvironment {
|
|||||||
/** @return the priority assigned to the loaded coprocessor */
|
/** @return the priority assigned to the loaded coprocessor */
|
||||||
public Coprocessor.Priority getPriority();
|
public Coprocessor.Priority getPriority();
|
||||||
|
|
||||||
|
/** @return the load sequence number */
|
||||||
|
public int getLoadSequence();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return an interface for accessing the given table
|
* @return an interface for accessing the given table
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
@ -61,6 +61,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||||||
protected Configuration conf;
|
protected Configuration conf;
|
||||||
// unique file prefix to use for local copies of jars when classloading
|
// unique file prefix to use for local copies of jars when classloading
|
||||||
protected String pathPrefix;
|
protected String pathPrefix;
|
||||||
|
protected volatile int loadSequence;
|
||||||
|
|
||||||
public CoprocessorHost() {
|
public CoprocessorHost() {
|
||||||
pathPrefix = UUID.randomUUID().toString();
|
pathPrefix = UUID.randomUUID().toString();
|
||||||
@ -195,7 +196,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
// create the environment
|
// create the environment
|
||||||
E env = createEnvironment(implClass, impl, priority);
|
E env = createEnvironment(implClass, impl, priority, ++loadSequence);
|
||||||
if (env instanceof Environment) {
|
if (env instanceof Environment) {
|
||||||
((Environment)env).startup();
|
((Environment)env).startup();
|
||||||
}
|
}
|
||||||
@ -206,7 +207,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||||||
* Called when a new Coprocessor class is loaded
|
* Called when a new Coprocessor class is loaded
|
||||||
*/
|
*/
|
||||||
public abstract E createEnvironment(Class<?> implClass, Coprocessor instance,
|
public abstract E createEnvironment(Class<?> implClass, Coprocessor instance,
|
||||||
Coprocessor.Priority priority);
|
Coprocessor.Priority priority, int sequence);
|
||||||
|
|
||||||
public void shutdown(CoprocessorEnvironment e) {
|
public void shutdown(CoprocessorEnvironment e) {
|
||||||
if (e instanceof Environment) {
|
if (e instanceof Environment) {
|
||||||
@ -244,6 +245,11 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||||||
} else if (env1.getPriority().intValue() > env2.getPriority().intValue()) {
|
} else if (env1.getPriority().intValue() > env2.getPriority().intValue()) {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
if (env1.getLoadSequence() < env2.getLoadSequence()) {
|
||||||
|
return -1;
|
||||||
|
} else if (env1.getLoadSequence() > env2.getLoadSequence()) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -436,6 +442,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||||||
/** Accounting for tables opened by the coprocessor */
|
/** Accounting for tables opened by the coprocessor */
|
||||||
protected List<HTableInterface> openTables =
|
protected List<HTableInterface> openTables =
|
||||||
Collections.synchronizedList(new ArrayList<HTableInterface>());
|
Collections.synchronizedList(new ArrayList<HTableInterface>());
|
||||||
|
private int seq;
|
||||||
static final ThreadLocal<Boolean> bypass = new ThreadLocal<Boolean>() {
|
static final ThreadLocal<Boolean> bypass = new ThreadLocal<Boolean>() {
|
||||||
@Override protected Boolean initialValue() {
|
@Override protected Boolean initialValue() {
|
||||||
return Boolean.FALSE;
|
return Boolean.FALSE;
|
||||||
@ -452,10 +459,11 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||||||
* @param impl the coprocessor instance
|
* @param impl the coprocessor instance
|
||||||
* @param priority chaining priority
|
* @param priority chaining priority
|
||||||
*/
|
*/
|
||||||
public Environment(final Coprocessor impl, Coprocessor.Priority priority) {
|
public Environment(final Coprocessor impl, Coprocessor.Priority priority, int seq) {
|
||||||
this.impl = impl;
|
this.impl = impl;
|
||||||
this.priority = priority;
|
this.priority = priority;
|
||||||
this.state = Coprocessor.State.INSTALLED;
|
this.state = Coprocessor.State.INSTALLED;
|
||||||
|
this.seq = seq;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Initialize the environment */
|
/** Initialize the environment */
|
||||||
@ -523,6 +531,11 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||||||
return priority;
|
return priority;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getLoadSequence() {
|
||||||
|
return seq;
|
||||||
|
}
|
||||||
|
|
||||||
/** @return the coprocessor environment version */
|
/** @return the coprocessor environment version */
|
||||||
@Override
|
@Override
|
||||||
public int getVersion() {
|
public int getVersion() {
|
||||||
|
@ -43,8 +43,8 @@ public class MasterCoprocessorHost
|
|||||||
private MasterServices masterServices;
|
private MasterServices masterServices;
|
||||||
|
|
||||||
public MasterEnvironment(Class<?> implClass, Coprocessor impl,
|
public MasterEnvironment(Class<?> implClass, Coprocessor impl,
|
||||||
Coprocessor.Priority priority, MasterServices services) {
|
Coprocessor.Priority priority, int seq, MasterServices services) {
|
||||||
super(impl, priority);
|
super(impl, priority, seq);
|
||||||
this.masterServices = services;
|
this.masterServices = services;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -63,8 +63,8 @@ public class MasterCoprocessorHost
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MasterEnvironment createEnvironment(Class<?> implClass,
|
public MasterEnvironment createEnvironment(Class<?> implClass,
|
||||||
Coprocessor instance, Coprocessor.Priority priority) {
|
Coprocessor instance, Coprocessor.Priority priority, int seq) {
|
||||||
return new MasterEnvironment(implClass, instance, priority, masterServices);
|
return new MasterEnvironment(implClass, instance, priority, seq, masterServices);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Implementation of hooks for invoking MasterObservers */
|
/* Implementation of hooks for invoking MasterObservers */
|
||||||
|
@ -67,9 +67,9 @@ public class RegionCoprocessorHost
|
|||||||
* @param priority chaining priority
|
* @param priority chaining priority
|
||||||
*/
|
*/
|
||||||
public RegionEnvironment(final Coprocessor impl,
|
public RegionEnvironment(final Coprocessor impl,
|
||||||
Coprocessor.Priority priority, final HRegion region,
|
final Coprocessor.Priority priority, final int seq, final HRegion region,
|
||||||
final RegionServerServices services) {
|
final RegionServerServices services) {
|
||||||
super(impl, priority);
|
super(impl, priority, seq);
|
||||||
this.region = region;
|
this.region = region;
|
||||||
this.rsServices = services;
|
this.rsServices = services;
|
||||||
}
|
}
|
||||||
@ -152,7 +152,7 @@ public class RegionCoprocessorHost
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RegionEnvironment createEnvironment(
|
public RegionEnvironment createEnvironment(
|
||||||
Class<?> implClass, Coprocessor instance, Coprocessor.Priority priority) {
|
Class<?> implClass, Coprocessor instance, Coprocessor.Priority priority, int seq) {
|
||||||
// Check if it's an Endpoint.
|
// Check if it's an Endpoint.
|
||||||
// Due to current dynamic protocol design, Endpoint
|
// Due to current dynamic protocol design, Endpoint
|
||||||
// uses a different way to be registered and executed.
|
// uses a different way to be registered and executed.
|
||||||
@ -165,7 +165,7 @@ public class RegionCoprocessorHost
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return new RegionEnvironment(instance, priority, region, rsServices);
|
return new RegionEnvironment(instance, priority, seq, region, rsServices);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -23,13 +23,9 @@ package org.apache.hadoop.hbase.regionserver.wal;
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.coprocessor.*;
|
import org.apache.hadoop.hbase.coprocessor.*;
|
||||||
import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
|
import org.apache.hadoop.hbase.coprocessor.Coprocessor.Priority;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -39,8 +35,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||||||
public class WALCoprocessorHost
|
public class WALCoprocessorHost
|
||||||
extends CoprocessorHost<WALCoprocessorHost.WALEnvironment> {
|
extends CoprocessorHost<WALCoprocessorHost.WALEnvironment> {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(WALCoprocessorHost.class);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encapsulation of the environment of each coprocessor
|
* Encapsulation of the environment of each coprocessor
|
||||||
*/
|
*/
|
||||||
@ -58,10 +52,12 @@ public class WALCoprocessorHost
|
|||||||
* Constructor
|
* Constructor
|
||||||
* @param impl the coprocessor instance
|
* @param impl the coprocessor instance
|
||||||
* @param priority chaining priority
|
* @param priority chaining priority
|
||||||
|
* @param seq load sequence
|
||||||
|
* @param hlog HLog
|
||||||
*/
|
*/
|
||||||
public WALEnvironment(Class<?> implClass, final Coprocessor impl,
|
public WALEnvironment(Class<?> implClass, final Coprocessor impl,
|
||||||
Coprocessor.Priority priority, final HLog hlog) {
|
final Coprocessor.Priority priority, final int seq, final HLog hlog) {
|
||||||
super(impl, priority);
|
super(impl, priority, seq);
|
||||||
this.wal = hlog;
|
this.wal = hlog;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -81,9 +77,9 @@ public class WALCoprocessorHost
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public WALEnvironment createEnvironment(Class<?> implClass,
|
public WALEnvironment createEnvironment(Class<?> implClass,
|
||||||
Coprocessor instance, Priority priority) {
|
Coprocessor instance, Priority priority, int seq) {
|
||||||
// TODO Auto-generated method stub
|
// TODO Auto-generated method stub
|
||||||
return new WALEnvironment(implClass, instance, priority, this.wal);
|
return new WALEnvironment(implClass, instance, priority, seq, this.wal);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
x
Reference in New Issue
Block a user