HBASE-7495 parallel seek in StoreScanner (Liang Xie)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1447740 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1934652769
commit
e0e0c44e34
|
@ -95,6 +95,7 @@ public class TestBulkDeleteProtocol {
|
||||||
rows++;
|
rows++;
|
||||||
}
|
}
|
||||||
assertEquals(0, rows);
|
assertEquals(0, rows);
|
||||||
|
ht.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -118,6 +119,7 @@ public class TestBulkDeleteProtocol {
|
||||||
rows++;
|
rows++;
|
||||||
}
|
}
|
||||||
assertEquals(0, rows);
|
assertEquals(0, rows);
|
||||||
|
ht.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
private long invokeBulkDeleteProtocol(byte[] tableName, final Scan scan, final int rowBatchSize,
|
private long invokeBulkDeleteProtocol(byte[] tableName, final Scan scan, final int rowBatchSize,
|
||||||
|
@ -147,6 +149,7 @@ public class TestBulkDeleteProtocol {
|
||||||
for (BulkDeleteResponse response : result.values()) {
|
for (BulkDeleteResponse response : result.values()) {
|
||||||
noOfDeletedRows += response.getRowsDeleted();
|
noOfDeletedRows += response.getRowsDeleted();
|
||||||
}
|
}
|
||||||
|
ht.close();
|
||||||
return noOfDeletedRows;
|
return noOfDeletedRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -177,6 +180,7 @@ public class TestBulkDeleteProtocol {
|
||||||
rows++;
|
rows++;
|
||||||
}
|
}
|
||||||
assertEquals(90, rows);
|
assertEquals(90, rows);
|
||||||
|
ht.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -205,6 +209,7 @@ public class TestBulkDeleteProtocol {
|
||||||
rows++;
|
rows++;
|
||||||
}
|
}
|
||||||
assertEquals(100, rows);
|
assertEquals(100, rows);
|
||||||
|
ht.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -235,6 +240,7 @@ public class TestBulkDeleteProtocol {
|
||||||
rows++;
|
rows++;
|
||||||
}
|
}
|
||||||
assertEquals(100, rows);
|
assertEquals(100, rows);
|
||||||
|
ht.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -282,6 +288,7 @@ public class TestBulkDeleteProtocol {
|
||||||
rows++;
|
rows++;
|
||||||
}
|
}
|
||||||
assertEquals(100, rows);
|
assertEquals(100, rows);
|
||||||
|
ht.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -328,6 +335,7 @@ public class TestBulkDeleteProtocol {
|
||||||
rows++;
|
rows++;
|
||||||
}
|
}
|
||||||
assertEquals(100, rows);
|
assertEquals(100, rows);
|
||||||
|
ht.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -412,6 +420,7 @@ public class TestBulkDeleteProtocol {
|
||||||
rows++;
|
rows++;
|
||||||
}
|
}
|
||||||
assertEquals(100, rows);
|
assertEquals(100, rows);
|
||||||
|
ht.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
private HTable createTable(byte[] tableName) throws IOException {
|
private HTable createTable(byte[] tableName) throws IOException {
|
||||||
|
@ -431,4 +440,4 @@ public class TestBulkDeleteProtocol {
|
||||||
put.add(FAMILY1, QUALIFIER3, value.getBytes());
|
put.add(FAMILY1, QUALIFIER3, value.getBytes());
|
||||||
return put;
|
return put;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,7 +76,7 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
|
||||||
private EventHandlerListener listener;
|
private EventHandlerListener listener;
|
||||||
|
|
||||||
// Time to wait for events to happen, should be kept short
|
// Time to wait for events to happen, should be kept short
|
||||||
protected final int waitingTimeForEvents;
|
protected int waitingTimeForEvents;
|
||||||
|
|
||||||
private final Span parent;
|
private final Span parent;
|
||||||
|
|
||||||
|
@ -144,7 +144,10 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
|
||||||
// Master controlled events to be executed on the master
|
// Master controlled events to be executed on the master
|
||||||
M_SERVER_SHUTDOWN (70, ExecutorType.MASTER_SERVER_OPERATIONS), // Master is processing shutdown of a RS
|
M_SERVER_SHUTDOWN (70, ExecutorType.MASTER_SERVER_OPERATIONS), // Master is processing shutdown of a RS
|
||||||
M_META_SERVER_SHUTDOWN (72, ExecutorType.MASTER_META_SERVER_OPERATIONS), // Master is processing shutdown of RS hosting a meta region (-ROOT- or .META.).
|
M_META_SERVER_SHUTDOWN (72, ExecutorType.MASTER_META_SERVER_OPERATIONS), // Master is processing shutdown of RS hosting a meta region (-ROOT- or .META.).
|
||||||
M_MASTER_RECOVERY (73, ExecutorType.MASTER_SERVER_OPERATIONS); // Master is processing recovery of regions found in ZK RIT
|
M_MASTER_RECOVERY (73, ExecutorType.MASTER_SERVER_OPERATIONS), // Master is processing recovery of regions found in ZK RIT
|
||||||
|
|
||||||
|
// RS controlled events to be executed on the RS
|
||||||
|
RS_PARALLEL_SEEK (80, ExecutorType.RS_PARALLEL_SEEK);
|
||||||
|
|
||||||
private final int code;
|
private final int code;
|
||||||
private final ExecutorService.ExecutorType executor;
|
private final ExecutorService.ExecutorType executor;
|
||||||
|
@ -191,8 +194,10 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
|
||||||
this.server = server;
|
this.server = server;
|
||||||
this.eventType = eventType;
|
this.eventType = eventType;
|
||||||
seqid = seqids.incrementAndGet();
|
seqid = seqids.incrementAndGet();
|
||||||
this.waitingTimeForEvents = server.getConfiguration().
|
if (server != null) {
|
||||||
getInt("hbase.master.event.waiting.time", 1000);
|
this.waitingTimeForEvents = server.getConfiguration().
|
||||||
|
getInt("hbase.master.event.waiting.time", 1000);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void run() {
|
public void run() {
|
||||||
|
|
|
@ -92,7 +92,8 @@ public class ExecutorService {
|
||||||
RS_OPEN_META (22),
|
RS_OPEN_META (22),
|
||||||
RS_CLOSE_REGION (23),
|
RS_CLOSE_REGION (23),
|
||||||
RS_CLOSE_ROOT (24),
|
RS_CLOSE_ROOT (24),
|
||||||
RS_CLOSE_META (25);
|
RS_CLOSE_META (25),
|
||||||
|
RS_PARALLEL_SEEK (26);
|
||||||
|
|
||||||
ExecutorType(int value) {}
|
ExecutorType(int value) {}
|
||||||
|
|
||||||
|
|
|
@ -29,14 +29,12 @@ import java.lang.reflect.Method;
|
||||||
import java.net.BindException;
|
import java.net.BindException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
@ -53,7 +51,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
import com.google.protobuf.Message;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -66,9 +63,9 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.FailedSanityCheckException;
|
import org.apache.hadoop.hbase.FailedSanityCheckException;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HealthCheckChore;
|
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HealthCheckChore;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
import org.apache.hadoop.hbase.OutOfOrderScannerNextException;
|
import org.apache.hadoop.hbase.OutOfOrderScannerNextException;
|
||||||
|
@ -214,6 +211,7 @@ import org.cliffc.high_scale_lib.Counter;
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
import com.google.protobuf.ByteString;
|
import com.google.protobuf.ByteString;
|
||||||
|
import com.google.protobuf.Message;
|
||||||
import com.google.protobuf.RpcController;
|
import com.google.protobuf.RpcController;
|
||||||
import com.google.protobuf.ServiceException;
|
import com.google.protobuf.ServiceException;
|
||||||
|
|
||||||
|
@ -1510,6 +1508,10 @@ public class HRegionServer implements ClientProtocol,
|
||||||
conf.getInt("hbase.regionserver.executor.closeroot.threads", 1));
|
conf.getInt("hbase.regionserver.executor.closeroot.threads", 1));
|
||||||
this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
|
this.service.startExecutorService(ExecutorType.RS_CLOSE_META,
|
||||||
conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
|
conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
|
||||||
|
if (conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false)) {
|
||||||
|
this.service.startExecutorService(ExecutorType.RS_PARALLEL_SEEK,
|
||||||
|
conf.getInt("hbase.storescanner.parallel.seek.threads", 10));
|
||||||
|
}
|
||||||
|
|
||||||
Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
|
Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller",
|
||||||
uncaughtExceptionHandler);
|
uncaughtExceptionHandler);
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||||
|
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
@ -92,4 +93,9 @@ public interface RegionServerServices extends OnlineRegions {
|
||||||
* @return The RegionServer's "Leases" service
|
* @return The RegionServer's "Leases" service
|
||||||
*/
|
*/
|
||||||
public Leases getLeases();
|
public Leases getLeases();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return hbase executor service
|
||||||
|
*/
|
||||||
|
public ExecutorService getExecutorService();
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,9 +20,11 @@
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.InterruptedIOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -31,8 +33,10 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||||
import org.apache.hadoop.hbase.filter.Filter;
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
import org.apache.hadoop.hbase.regionserver.ScanInfo;
|
import org.apache.hadoop.hbase.regionserver.ScanInfo;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.handler.ParallelSeekHandler;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
|
||||||
|
@ -59,6 +63,11 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
||||||
protected final boolean isGet;
|
protected final boolean isGet;
|
||||||
protected final boolean explicitColumnQuery;
|
protected final boolean explicitColumnQuery;
|
||||||
protected final boolean useRowColBloom;
|
protected final boolean useRowColBloom;
|
||||||
|
/**
|
||||||
|
* A flag that enables StoreFileScanner parallel-seeking
|
||||||
|
*/
|
||||||
|
protected boolean isParallelSeekEnabled = false;
|
||||||
|
protected ExecutorService executor;
|
||||||
protected final Scan scan;
|
protected final Scan scan;
|
||||||
protected final NavigableSet<byte[]> columns;
|
protected final NavigableSet<byte[]> columns;
|
||||||
protected final long oldestUnexpiredTS;
|
protected final long oldestUnexpiredTS;
|
||||||
|
@ -66,6 +75,8 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
||||||
|
|
||||||
/** We don't ever expect to change this, the constant is just for clarity. */
|
/** We don't ever expect to change this, the constant is just for clarity. */
|
||||||
static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
|
static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
|
||||||
|
public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
|
||||||
|
"hbase.storescanner.parallel.seek.enable";
|
||||||
|
|
||||||
/** Used during unit testing to ensure that lazy seek does save seek ops */
|
/** Used during unit testing to ensure that lazy seek does save seek ops */
|
||||||
protected static boolean lazySeekEnabledGlobally =
|
protected static boolean lazySeekEnabledGlobally =
|
||||||
|
@ -92,6 +103,17 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
||||||
// for multi-row (non-"get") scans because this is not done in
|
// for multi-row (non-"get") scans because this is not done in
|
||||||
// StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
|
// StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
|
||||||
useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
|
useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
|
||||||
|
// The parallel-seeking is on :
|
||||||
|
// 1) the config value is *true*
|
||||||
|
// 2) store has more than one store file
|
||||||
|
if (store != null && ((HStore)store).getHRegion() != null
|
||||||
|
&& store.getStorefilesCount() > 1) {
|
||||||
|
RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
|
||||||
|
if (rsService == null || !rsService.getConfiguration().getBoolean(
|
||||||
|
STORESCANNER_PARALLEL_SEEK_ENABLE, false)) return;
|
||||||
|
isParallelSeekEnabled = true;
|
||||||
|
executor = rsService.getExecutorService();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -127,8 +149,12 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
||||||
scanner.requestSeek(matcher.getStartKey(), false, true);
|
scanner.requestSeek(matcher.getStartKey(), false, true);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (KeyValueScanner scanner : scanners) {
|
if (!isParallelSeekEnabled) {
|
||||||
scanner.seek(matcher.getStartKey());
|
for (KeyValueScanner scanner : scanners) {
|
||||||
|
scanner.seek(matcher.getStartKey());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
parallelSeek(scanners, matcher.getStartKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,8 +192,12 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
||||||
scanners = selectScannersFrom(scanners);
|
scanners = selectScannersFrom(scanners);
|
||||||
|
|
||||||
// Seek all scanners to the initial key
|
// Seek all scanners to the initial key
|
||||||
for(KeyValueScanner scanner : scanners) {
|
if (!isParallelSeekEnabled) {
|
||||||
scanner.seek(matcher.getStartKey());
|
for (KeyValueScanner scanner : scanners) {
|
||||||
|
scanner.seek(matcher.getStartKey());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
parallelSeek(scanners, matcher.getStartKey());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Combine all seeked scanners with a heap
|
// Combine all seeked scanners with a heap
|
||||||
|
@ -193,8 +223,12 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
||||||
Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS);
|
Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS);
|
||||||
|
|
||||||
// Seek all scanners to the initial key
|
// Seek all scanners to the initial key
|
||||||
for (KeyValueScanner scanner : scanners) {
|
if (!isParallelSeekEnabled) {
|
||||||
scanner.seek(matcher.getStartKey());
|
for (KeyValueScanner scanner : scanners) {
|
||||||
|
scanner.seek(matcher.getStartKey());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
parallelSeek(scanners, matcher.getStartKey());
|
||||||
}
|
}
|
||||||
heap = new KeyValueHeap(scanners, scanInfo.getComparator());
|
heap = new KeyValueHeap(scanners, scanInfo.getComparator());
|
||||||
}
|
}
|
||||||
|
@ -513,8 +547,12 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
||||||
* could have done it now by storing the scan object from the constructor */
|
* could have done it now by storing the scan object from the constructor */
|
||||||
List<KeyValueScanner> scanners = getScannersNoCompaction();
|
List<KeyValueScanner> scanners = getScannersNoCompaction();
|
||||||
|
|
||||||
for(KeyValueScanner scanner : scanners) {
|
if (!isParallelSeekEnabled) {
|
||||||
scanner.seek(lastTopKey);
|
for (KeyValueScanner scanner : scanners) {
|
||||||
|
scanner.seek(lastTopKey);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
parallelSeek(scanners, lastTopKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Combine all seeked scanners with a heap
|
// Combine all seeked scanners with a heap
|
||||||
|
@ -546,9 +584,8 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
||||||
checkReseek();
|
checkReseek();
|
||||||
if (explicitColumnQuery && lazySeekEnabledGlobally) {
|
if (explicitColumnQuery && lazySeekEnabledGlobally) {
|
||||||
return heap.requestSeek(kv, true, useRowColBloom);
|
return heap.requestSeek(kv, true, useRowColBloom);
|
||||||
} else {
|
|
||||||
return heap.reseek(kv);
|
|
||||||
}
|
}
|
||||||
|
return heap.reseek(kv);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -556,6 +593,44 @@ public class StoreScanner extends NonLazyKeyValueScanner
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Seek storefiles in parallel to optimize IO latency as much as possible
|
||||||
|
* @param scanners the list {@link KeyValueScanner}s to be read from
|
||||||
|
* @param kv the KeyValue on which the operation is being requested
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private void parallelSeek(final List<? extends KeyValueScanner>
|
||||||
|
scanners, final KeyValue kv) throws IOException {
|
||||||
|
if (scanners.isEmpty()) return;
|
||||||
|
int storeFileScannerCount = scanners.size();
|
||||||
|
CountDownLatch latch = new CountDownLatch(storeFileScannerCount);
|
||||||
|
List<ParallelSeekHandler> handlers =
|
||||||
|
new ArrayList<ParallelSeekHandler>(storeFileScannerCount);
|
||||||
|
for (KeyValueScanner scanner : scanners) {
|
||||||
|
if (scanner instanceof StoreFileScanner) {
|
||||||
|
ParallelSeekHandler seekHandler = new ParallelSeekHandler(scanner, kv,
|
||||||
|
MultiVersionConsistencyControl.getThreadReadPoint(), latch);
|
||||||
|
executor.submit(seekHandler);
|
||||||
|
handlers.add(seekHandler);
|
||||||
|
} else {
|
||||||
|
scanner.seek(kv);
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
latch.await();
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
throw new InterruptedIOException(ie.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
for (ParallelSeekHandler handler : handlers) {
|
||||||
|
if (handler.getErr() != null) {
|
||||||
|
throw new IOException(handler.getErr());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used in testing.
|
* Used in testing.
|
||||||
* @return all scanners in no particular order
|
* @return all scanners in no particular order
|
||||||
|
|
|
@ -0,0 +1,73 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver.handler;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Handler to seek storefiles in parallel.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class ParallelSeekHandler extends EventHandler {
|
||||||
|
private static final Log LOG = LogFactory.getLog(ParallelSeekHandler.class);
|
||||||
|
private KeyValueScanner scanner;
|
||||||
|
private KeyValue keyValue;
|
||||||
|
private long readPoint;
|
||||||
|
private CountDownLatch latch;
|
||||||
|
private Throwable err = null;
|
||||||
|
|
||||||
|
public ParallelSeekHandler(KeyValueScanner scanner,KeyValue keyValue,
|
||||||
|
long readPoint, CountDownLatch latch) {
|
||||||
|
super(null, EventType.RS_PARALLEL_SEEK);
|
||||||
|
this.scanner = scanner;
|
||||||
|
this.keyValue = keyValue;
|
||||||
|
this.readPoint = readPoint;
|
||||||
|
this.latch = latch;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process() {
|
||||||
|
try {
|
||||||
|
MultiVersionConsistencyControl.setThreadReadPoint(readPoint);
|
||||||
|
scanner.seek(keyValue);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("", e);
|
||||||
|
setErr(e);
|
||||||
|
} finally {
|
||||||
|
latch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Throwable getErr() {
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setErr(Throwable err) {
|
||||||
|
this.err = err;
|
||||||
|
}
|
||||||
|
}
|
|
@ -441,6 +441,21 @@
|
||||||
Set to 0 to disable automated major compactions.
|
Set to 0 to disable automated major compactions.
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hbase.storescanner.parallel.seek.enable</name>
|
||||||
|
<value>false</value>
|
||||||
|
<description>
|
||||||
|
Enables StoreFileScanner parallel-seeking in StoreScanner,
|
||||||
|
a feature which can reduce response latency under special conditions.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hbase.storescanner.parallel.seek.threads</name>
|
||||||
|
<value>10</value>
|
||||||
|
<description>
|
||||||
|
The default thread pool size if parallel-seeking feature enabled.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>hbase.mapreduce.hfileoutputformat.blocksize</name>
|
<name>hbase.mapreduce.hfileoutputformat.blocksize</name>
|
||||||
<value>65536</value>
|
<value>65536</value>
|
||||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.ClientProtocol;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||||
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
|
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
|
||||||
|
@ -499,4 +500,9 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer
|
||||||
// TODO Auto-generated method stub
|
// TODO Auto-generated method stub
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ExecutorService getExecutorService() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -568,6 +568,7 @@ public class TestStoreScanner extends TestCase {
|
||||||
assertEquals(kvs[14], results.get(5));
|
assertEquals(kvs[14], results.get(5));
|
||||||
assertEquals(kvs[15], results.get(6));
|
assertEquals(kvs[15], results.get(6));
|
||||||
assertEquals(7, results.size());
|
assertEquals(7, results.size());
|
||||||
|
scanner.close();
|
||||||
}finally{
|
}finally{
|
||||||
EnvironmentEdgeManagerTestHelper.reset();
|
EnvironmentEdgeManagerTestHelper.reset();
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||||
|
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||||
import org.apache.hadoop.hbase.fs.HFileSystem;
|
import org.apache.hadoop.hbase.fs.HFileSystem;
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||||
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
|
import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
|
||||||
|
@ -172,4 +173,9 @@ public class MockRegionServerServices implements RegionServerServices {
|
||||||
// TODO Auto-generated method stub
|
// TODO Auto-generated method stub
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ExecutorService getExecutorService() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,7 +19,10 @@ package org.apache.hadoop.hbase.util;
|
||||||
// this is deliberately not in the o.a.h.h.regionserver package
|
// this is deliberately not in the o.a.h.h.regionserver package
|
||||||
// in order to make sure all required classes/method are available
|
// in order to make sure all required classes/method are available
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -59,7 +62,12 @@ import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
import org.junit.runners.Parameterized.Parameters;
|
||||||
|
|
||||||
@Category(MediumTests.class)
|
@Category(MediumTests.class)
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
public class TestCoprocessorScanPolicy {
|
public class TestCoprocessorScanPolicy {
|
||||||
final Log LOG = LogFactory.getLog(getClass());
|
final Log LOG = LogFactory.getLog(getClass());
|
||||||
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
@ -67,7 +75,6 @@ public class TestCoprocessorScanPolicy {
|
||||||
private static final byte[] Q = Bytes.toBytes("qual");
|
private static final byte[] Q = Bytes.toBytes("qual");
|
||||||
private static final byte[] R = Bytes.toBytes("row");
|
private static final byte[] R = Bytes.toBytes("row");
|
||||||
|
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
Configuration conf = TEST_UTIL.getConfiguration();
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
@ -81,9 +88,22 @@ public class TestCoprocessorScanPolicy {
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Parameters
|
||||||
|
public static Collection<Object[]> parameters() {
|
||||||
|
return HBaseTestingUtility.BOOLEAN_PARAMETERIZED;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TestCoprocessorScanPolicy(boolean parallelSeekEnable) {
|
||||||
|
TEST_UTIL.getMiniHBaseCluster().getConf()
|
||||||
|
.setBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, parallelSeekEnable);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBaseCases() throws Exception {
|
public void testBaseCases() throws Exception {
|
||||||
byte[] tableName = Bytes.toBytes("baseCases");
|
byte[] tableName = Bytes.toBytes("baseCases");
|
||||||
|
if (TEST_UTIL.getHBaseAdmin().tableExists(tableName)) {
|
||||||
|
TEST_UTIL.deleteTable(tableName);
|
||||||
|
}
|
||||||
HTable t = TEST_UTIL.createTable(tableName, F, 1);
|
HTable t = TEST_UTIL.createTable(tableName, F, 1);
|
||||||
// set the version override to 2
|
// set the version override to 2
|
||||||
Put p = new Put(R);
|
Put p = new Put(R);
|
||||||
|
@ -130,6 +150,9 @@ public class TestCoprocessorScanPolicy {
|
||||||
@Test
|
@Test
|
||||||
public void testTTL() throws Exception {
|
public void testTTL() throws Exception {
|
||||||
byte[] tableName = Bytes.toBytes("testTTL");
|
byte[] tableName = Bytes.toBytes("testTTL");
|
||||||
|
if (TEST_UTIL.getHBaseAdmin().tableExists(tableName)) {
|
||||||
|
TEST_UTIL.deleteTable(tableName);
|
||||||
|
}
|
||||||
HTableDescriptor desc = new HTableDescriptor(tableName);
|
HTableDescriptor desc = new HTableDescriptor(tableName);
|
||||||
HColumnDescriptor hcd = new HColumnDescriptor(F)
|
HColumnDescriptor hcd = new HColumnDescriptor(F)
|
||||||
.setMaxVersions(10)
|
.setMaxVersions(10)
|
||||||
|
|
Loading…
Reference in New Issue