HBASE-3308 SplitTransaction.splitStoreFiles slows splits a lot
HBASE-3318 Split rollback leaves parent with writesEnabled=false git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1043688 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fcebfcfc31
commit
ceaf3ee3d7
@ -30,6 +30,8 @@ Release 0.91.0 - Unreleased
|
|||||||
HBASE-3316 Add support for Java Serialization to HbaseObjectWritable
|
HBASE-3316 Add support for Java Serialization to HbaseObjectWritable
|
||||||
(Ed Kohlwey via Stack)
|
(Ed Kohlwey via Stack)
|
||||||
HBASE-1861 Multi-Family support for bulk upload tools
|
HBASE-1861 Multi-Family support for bulk upload tools
|
||||||
|
HBASE-3308 SplitTransaction.splitStoreFiles slows splits a lot
|
||||||
|
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
HBASE-3287 Add option to cache blocks on hfile write and evict blocks on
|
HBASE-3287 Add option to cache blocks on hfile write and evict blocks on
|
||||||
@ -759,6 +761,8 @@ Release 0.90.0 - Unreleased
|
|||||||
HBASE-3314 [shell] 'move' is broken
|
HBASE-3314 [shell] 'move' is broken
|
||||||
HBASE-3315 Add debug output for when balancer makes bad balance
|
HBASE-3315 Add debug output for when balancer makes bad balance
|
||||||
HBASE-3278 AssertionError in LoadBalancer
|
HBASE-3278 AssertionError in LoadBalancer
|
||||||
|
HBASE-3318 Split rollback leaves parent with writesEnabled=false
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
@ -376,10 +376,7 @@ public class HRegion implements HeapSize { // , Writable{
|
|||||||
SplitTransaction.cleanupAnySplitDetritus(this);
|
SplitTransaction.cleanupAnySplitDetritus(this);
|
||||||
FSUtils.deleteDirectory(this.fs, new Path(regiondir, MERGEDIR));
|
FSUtils.deleteDirectory(this.fs, new Path(regiondir, MERGEDIR));
|
||||||
|
|
||||||
// See if region is meant to run read-only.
|
this.writestate.setReadOnly(this.regionInfo.getTableDesc().isReadOnly());
|
||||||
if (this.regionInfo.getTableDesc().isReadOnly()) {
|
|
||||||
this.writestate.setReadOnly(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.writestate.compacting = false;
|
this.writestate.compacting = false;
|
||||||
this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
|
this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
|
||||||
|
@ -23,7 +23,15 @@ import java.io.IOException;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.ListIterator;
|
import java.util.ListIterator;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -75,6 +83,7 @@ public class SplitTransaction {
|
|||||||
private HRegionInfo hri_a;
|
private HRegionInfo hri_a;
|
||||||
private HRegionInfo hri_b;
|
private HRegionInfo hri_b;
|
||||||
private Path splitdir;
|
private Path splitdir;
|
||||||
|
private long fileSplitTimeout = 30000;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Row to split around
|
* Row to split around
|
||||||
@ -186,6 +195,8 @@ public class SplitTransaction {
|
|||||||
throw new IOException("Server is stopped or stopping");
|
throw new IOException("Server is stopped or stopping");
|
||||||
}
|
}
|
||||||
assert !this.parent.lock.writeLock().isHeldByCurrentThread() : "Unsafe to hold write lock while performing RPCs";
|
assert !this.parent.lock.writeLock().isHeldByCurrentThread() : "Unsafe to hold write lock while performing RPCs";
|
||||||
|
this.fileSplitTimeout = server.getConfiguration().getLong(
|
||||||
|
"hbase.regionserver.fileSplitTimeout", this.fileSplitTimeout);
|
||||||
|
|
||||||
// Coprocessor callback
|
// Coprocessor callback
|
||||||
if (this.parent.getCoprocessorHost() != null) {
|
if (this.parent.getCoprocessorHost() != null) {
|
||||||
@ -394,11 +405,52 @@ public class SplitTransaction {
|
|||||||
// Could be null because close didn't succeed -- for now consider it fatal
|
// Could be null because close didn't succeed -- for now consider it fatal
|
||||||
throw new IOException("Close returned empty list of StoreFiles");
|
throw new IOException("Close returned empty list of StoreFiles");
|
||||||
}
|
}
|
||||||
|
// The following code sets up a thread pool executor with as many slots as
|
||||||
|
// there's files to split. It then fires up everything, waits for
|
||||||
|
// completion and finally checks for any exception
|
||||||
|
int nbFiles = hstoreFilesToSplit.size();
|
||||||
|
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
|
||||||
|
builder.setNameFormat("StoreFileSplitter-%1$d");
|
||||||
|
ThreadFactory factory = builder.build();
|
||||||
|
ThreadPoolExecutor threadPool =
|
||||||
|
(ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory);
|
||||||
|
List<Future<Void>> futures = new ArrayList<Future<Void>>(nbFiles);
|
||||||
|
|
||||||
// Split each store file.
|
// Split each store file.
|
||||||
for (StoreFile sf: hstoreFilesToSplit) {
|
for (StoreFile sf: hstoreFilesToSplit) {
|
||||||
splitStoreFile(sf, splitdir);
|
//splitStoreFile(sf, splitdir);
|
||||||
}
|
StoreFileSplitter sfs = new StoreFileSplitter(sf, splitdir);
|
||||||
|
futures.add(threadPool.submit(sfs));
|
||||||
|
}
|
||||||
|
// Shutdown the pool
|
||||||
|
threadPool.shutdown();
|
||||||
|
|
||||||
|
// Wait for all the tasks to finish
|
||||||
|
try {
|
||||||
|
boolean stillRunning = !threadPool.awaitTermination(
|
||||||
|
this.fileSplitTimeout, TimeUnit.MILLISECONDS);
|
||||||
|
if (stillRunning) {
|
||||||
|
threadPool.shutdownNow();
|
||||||
|
throw new IOException("Took too long to split the" +
|
||||||
|
" files and create the references, aborting split");
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new IOException("Interrupted while waiting for file splitters", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Look for any exception
|
||||||
|
for (Future future : futures) {
|
||||||
|
try {
|
||||||
|
future.get();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new IOException(
|
||||||
|
"Interrupted while trying to get the results of file splitters", e);
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void splitStoreFile(final StoreFile sf, final Path splitdir)
|
private void splitStoreFile(final StoreFile sf, final Path splitdir)
|
||||||
@ -413,6 +465,31 @@ public class SplitTransaction {
|
|||||||
StoreFile.split(fs, storedir, sf, this.splitrow, Range.top);
|
StoreFile.split(fs, storedir, sf, this.splitrow, Range.top);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility class used to do the file splitting / reference writing
|
||||||
|
* in parallel instead of sequentially.
|
||||||
|
*/
|
||||||
|
class StoreFileSplitter implements Callable<Void> {
|
||||||
|
|
||||||
|
private final StoreFile sf;
|
||||||
|
private final Path splitdir;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Constructor that takes what it needs to split
|
||||||
|
* @param sf which file
|
||||||
|
* @param splitdir where the splitting is done
|
||||||
|
*/
|
||||||
|
public StoreFileSplitter(final StoreFile sf, final Path splitdir) {
|
||||||
|
this.sf = sf;
|
||||||
|
this.splitdir = splitdir;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Void call() throws IOException {
|
||||||
|
splitStoreFile(sf, splitdir);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param hri Spec. for daughter region to open.
|
* @param hri Spec. for daughter region to open.
|
||||||
* @param flusher Flusher this region should use.
|
* @param flusher Flusher this region should use.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user