HBASE-9749: Custom threadpool for Coprocessor obtained HTables
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1533187 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
968f83b2ff
commit
bcff872bf1
|
@ -16,6 +16,7 @@
|
|||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -50,4 +51,11 @@ public interface CoprocessorEnvironment {
|
|||
* @throws IOException
|
||||
*/
|
||||
HTableInterface getTable(TableName tableName) throws IOException;
|
||||
|
||||
/**
|
||||
* @return an interface for accessing the given table using the passed executor to run batch
|
||||
* operations
|
||||
* @throws IOException
|
||||
*/
|
||||
public HTableInterface getTable(TableName tableName, ExecutorService service) throws IOException;
|
||||
}
|
||||
|
|
|
@ -210,7 +210,7 @@ public class HTable implements HTableInterface {
|
|||
this.finishSetup();
|
||||
}
|
||||
|
||||
private static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
|
||||
public static ThreadPoolExecutor getDefaultExecutor(Configuration conf) {
|
||||
int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
|
||||
if (maxThreads == 0) {
|
||||
maxThreads = 1; // is there a better default?
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.Set;
|
|||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -376,9 +377,10 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
private HTable table;
|
||||
private HConnection connection;
|
||||
|
||||
public HTableWrapper(TableName tableName, HConnection connection) throws IOException {
|
||||
public HTableWrapper(TableName tableName, HConnection connection, ExecutorService pool)
|
||||
throws IOException {
|
||||
this.tableName = tableName;
|
||||
this.table = new HTable(tableName, connection);
|
||||
this.table = new HTable(tableName, connection, pool);
|
||||
this.connection = connection;
|
||||
openTables.add(this);
|
||||
}
|
||||
|
@ -709,7 +711,19 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
|
|||
*/
|
||||
@Override
|
||||
public HTableInterface getTable(TableName tableName) throws IOException {
|
||||
return new HTableWrapper(tableName, CoprocessorHConnection.getConnectionForEnvironment(this));
|
||||
return this.getTable(tableName, HTable.getDefaultExecutor(getConfiguration()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Open a table from within the Coprocessor environment
|
||||
* @param tableName the table name
|
||||
* @return an interface for manipulating the table
|
||||
* @exception java.io.IOException Exception
|
||||
*/
|
||||
@Override
|
||||
public HTableInterface getTable(TableName tableName, ExecutorService pool) throws IOException {
|
||||
return new HTableWrapper(tableName, CoprocessorHConnection.getConnectionForEnvironment(this),
|
||||
pool);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,12 +22,17 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MediumTests;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
|
@ -37,7 +42,10 @@ import org.apache.hadoop.hbase.client.Result;
|
|||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
|
@ -47,53 +55,113 @@ import org.junit.experimental.categories.Category;
|
|||
@Category(MediumTests.class)
|
||||
public class TestOpenTableInCoprocessor {
|
||||
|
||||
private static final TableName otherTable =
|
||||
TableName.valueOf("otherTable");
|
||||
private static final TableName otherTable = TableName.valueOf("otherTable");
|
||||
private static final TableName primaryTable = TableName.valueOf("primary");
|
||||
private static final byte[] family = new byte[] { 'f' };
|
||||
|
||||
private static boolean completed = false;
|
||||
|
||||
private static boolean[] completed = new boolean[1];
|
||||
/**
|
||||
* Custom coprocessor that just copies the write to another table.
|
||||
*/
|
||||
public static class SendToOtherTableCoprocessor extends BaseRegionObserver {
|
||||
|
||||
@Override
|
||||
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
|
||||
final Durability durability) throws IOException {
|
||||
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
|
||||
final WALEdit edit, final Durability durability) throws IOException {
|
||||
HTableInterface table = e.getEnvironment().getTable(otherTable);
|
||||
Put p = new Put(new byte[] { 'a' });
|
||||
p.add(family, null, new byte[] { 'a' });
|
||||
table.put(put);
|
||||
table.flushCommits();
|
||||
completed = true;
|
||||
completed[0] = true;
|
||||
table.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
private static boolean[] completedWithPool = new boolean[1];
|
||||
/**
|
||||
* Coprocessor that creates an HTable with a pool to write to another table
|
||||
*/
|
||||
public static class CustomThreadPoolCoprocessor extends BaseRegionObserver {
|
||||
|
||||
/**
|
||||
* Get a pool that has only ever one thread. A second action added to the pool (running
|
||||
* concurrently), will cause an exception.
|
||||
* @return
|
||||
*/
|
||||
private ExecutorService getPool() {
|
||||
int maxThreads = 1;
|
||||
long keepAliveTime = 60;
|
||||
ThreadPoolExecutor pool =
|
||||
new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
|
||||
new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("hbase-table"));
|
||||
pool.allowCoreThreadTimeOut(true);
|
||||
return pool;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
|
||||
final WALEdit edit, final Durability durability) throws IOException {
|
||||
HTableInterface table = e.getEnvironment().getTable(otherTable, getPool());
|
||||
Put p = new Put(new byte[] { 'a' });
|
||||
p.add(family, null, new byte[] { 'a' });
|
||||
try {
|
||||
table.batch(Collections.singletonList(put));
|
||||
} catch (InterruptedException e1) {
|
||||
throw new IOException(e1);
|
||||
}
|
||||
completedWithPool[0] = true;
|
||||
table.close();
|
||||
}
|
||||
}
|
||||
|
||||
private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
@BeforeClass
|
||||
public static void setupCluster() throws Exception {
|
||||
UTIL.startMiniCluster();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanupTestTable() throws Exception {
|
||||
UTIL.getHBaseAdmin().disableTable(primaryTable);
|
||||
UTIL.getHBaseAdmin().deleteTable(primaryTable);
|
||||
|
||||
UTIL.getHBaseAdmin().disableTable(otherTable);
|
||||
UTIL.getHBaseAdmin().deleteTable(otherTable);
|
||||
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void cleanup() throws Exception {
|
||||
UTIL.getHBaseAdmin().close();
|
||||
public static void teardownCluster() throws Exception {
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoprocessorCanCreateConnectionToRemoteTable() throws Throwable {
|
||||
HTableDescriptor primary = new HTableDescriptor(TableName.valueOf("primary"));
|
||||
runCoprocessorConnectionToRemoteTable(SendToOtherTableCoprocessor.class, completed);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoprocessorCanCreateConnectionToRemoteTableWithCustomPool() throws Throwable {
|
||||
runCoprocessorConnectionToRemoteTable(CustomThreadPoolCoprocessor.class, completedWithPool);
|
||||
}
|
||||
|
||||
private void runCoprocessorConnectionToRemoteTable(Class<? extends BaseRegionObserver> clazz,
|
||||
boolean[] completeCheck) throws Throwable {
|
||||
HTableDescriptor primary = new HTableDescriptor(primaryTable);
|
||||
primary.addFamily(new HColumnDescriptor(family));
|
||||
// add our coprocessor
|
||||
primary.addCoprocessor(SendToOtherTableCoprocessor.class.getName());
|
||||
primary.addCoprocessor(clazz.getName());
|
||||
|
||||
HTableDescriptor other = new HTableDescriptor(otherTable);
|
||||
other.addFamily(new HColumnDescriptor(family));
|
||||
UTIL.startMiniCluster();
|
||||
|
||||
|
||||
HBaseAdmin admin = UTIL.getHBaseAdmin();
|
||||
admin.createTable(primary);
|
||||
admin.createTable(other);
|
||||
admin.close();
|
||||
|
||||
HTable table = new HTable(UTIL.getConfiguration(), "primary");
|
||||
Put p = new Put(new byte[] { 'a' });
|
||||
|
@ -103,11 +171,9 @@ public class TestOpenTableInCoprocessor {
|
|||
table.close();
|
||||
|
||||
HTable target = new HTable(UTIL.getConfiguration(), otherTable);
|
||||
assertTrue("Didn't complete update to target table!", completed);
|
||||
assertTrue("Didn't complete update to target table!", completeCheck[0]);
|
||||
assertEquals("Didn't find inserted row", 1, getKeyValueCount(target));
|
||||
target.close();
|
||||
|
||||
UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.security.PrivilegedExceptionAction;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -215,6 +216,12 @@ public class TestTokenAuthentication {
|
|||
@Override
|
||||
public HTableInterface getTable(TableName tableName) throws IOException
|
||||
{ return null; }
|
||||
|
||||
@Override
|
||||
public HTableInterface getTable(TableName tableName, ExecutorService service)
|
||||
throws IOException {
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
||||
started = true;
|
||||
|
|
Loading…
Reference in New Issue