HBASE-21679 Port HBASE-6028 (Start/Stop compactions at region server level) to branch-1
HBASE-6028 Start/Stop compactions at region server level Add switching on/off of compactions. Switching off compactions will also interrupt any currently ongoing compactions. Adds a "compaction_switch" to hbase shell. Switching off compactions will interrupt any currently ongoing compactions. State set from shell will be lost on restart. To persist the changes across region servers modify hbase.regionserver.compaction.enabled in hbase-site.xml and restart.
This commit is contained in:
parent
e7ff91f35e
commit
d6a3e6b404
|
@ -633,6 +633,19 @@ public interface Admin extends Abortable, Closeable {
|
|||
void majorCompactRegion(final byte[] regionName, final byte[] columnFamily)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Turn the compaction on or off. Disabling compactions will also interrupt any currently ongoing
|
||||
* compactions. It is ephemeral. This setting will be lost on restart of the server. Compaction
|
||||
* can also be enabled/disabled by modifying configuration hbase.regionserver.compaction.enabled
|
||||
* in hbase-site.xml.
|
||||
*
|
||||
* @param switchState Set to <code>true</code> to enable, <code>false</code> to disable.
|
||||
* @param serverNamesList list of region servers.
|
||||
* @return Previous compaction states for region servers
|
||||
*/
|
||||
Map<ServerName, Boolean> compactionSwitch(boolean switchState, List<String> serverNamesList)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Compact all regions on the region server
|
||||
* @param sn the region server name
|
||||
|
|
|
@ -83,6 +83,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
|
|||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactionSwitchRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactionSwitchResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
|
||||
|
@ -2052,6 +2054,51 @@ public class HBaseAdmin implements Admin {
|
|||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public Map<ServerName, Boolean> compactionSwitch(boolean switchState, List<String>
|
||||
serverNamesList) throws IOException {
|
||||
List<ServerName> serverList = new ArrayList<>();
|
||||
if (serverNamesList.isEmpty()) {
|
||||
ClusterStatus status = getClusterStatus();
|
||||
serverList.addAll(status.getServers());
|
||||
} else {
|
||||
for (String regionServerName : serverNamesList) {
|
||||
ServerName serverName = null;
|
||||
try {
|
||||
serverName = ServerName.valueOf(regionServerName);
|
||||
} catch (Exception e) {
|
||||
throw new IllegalArgumentException(String.format("Invalid format for regionserver:%s ",
|
||||
regionServerName));
|
||||
}
|
||||
if (serverName == null) {
|
||||
throw new IllegalArgumentException(String.format("Invalid regionserver:%s ",
|
||||
regionServerName));
|
||||
}
|
||||
serverList.add(serverName);
|
||||
}
|
||||
}
|
||||
HBaseRpcController controller = rpcControllerFactory.newController();
|
||||
Map<ServerName, Boolean> res = new HashMap<>(serverList.size());
|
||||
for (ServerName serverName : serverList) {
|
||||
AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
|
||||
try {
|
||||
CompactionSwitchRequest request =
|
||||
CompactionSwitchRequest.newBuilder().setEnabled(switchState).build();
|
||||
CompactionSwitchResponse compactionSwitchResponse =
|
||||
admin.compactionSwitch(controller, request);
|
||||
boolean prev_state = compactionSwitchResponse.getPrevState();
|
||||
res.put(serverName, prev_state);
|
||||
} catch (ServiceException se) {
|
||||
throw ProtobufUtil.getRemoteException(se);
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* {@inheritDoc}
|
||||
*/
|
||||
@Override
|
||||
public void compactRegionServer(final ServerName sn, boolean major)
|
||||
throws IOException, InterruptedException {
|
||||
for (HRegionInfo region : getOnlineRegions(sn)) {
|
||||
|
|
|
@ -720,6 +720,13 @@ possible configurations would overwhelm and obscure the important.
|
|||
is run to rewrite all HStoreFiles files as one. Larger numbers
|
||||
put off compaction but when it runs, it takes longer to complete.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.regionserver.compaction.enabled</name>
|
||||
<value>true</value>
|
||||
<description>Enable/disable compactions on by setting true/false.
|
||||
We can further switch compactions dynamically with the
|
||||
compaction_switch shell command.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.hstore.flusher.count</name>
|
||||
<value>2</value>
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -164,6 +164,14 @@ message CompactRegionRequest {
|
|||
message CompactRegionResponse {
|
||||
}
|
||||
|
||||
message CompactionSwitchRequest {
|
||||
required bool enabled = 1;
|
||||
}
|
||||
|
||||
message CompactionSwitchResponse {
|
||||
required bool prev_state = 1;
|
||||
}
|
||||
|
||||
message UpdateFavoredNodesRequest {
|
||||
repeated RegionUpdateInfo update_info = 1;
|
||||
|
||||
|
@ -281,6 +289,9 @@ service AdminService {
|
|||
rpc SplitRegion(SplitRegionRequest)
|
||||
returns(SplitRegionResponse);
|
||||
|
||||
rpc CompactionSwitch(CompactionSwitchRequest)
|
||||
returns(CompactionSwitchResponse);
|
||||
|
||||
rpc CompactRegion(CompactRegionRequest)
|
||||
returns(CompactRegionResponse);
|
||||
|
||||
|
|
|
@ -79,18 +79,22 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
|
||||
public static final String REGION_SERVER_REGION_SPLIT_LIMIT =
|
||||
"hbase.regionserver.regionSplitLimit";
|
||||
public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT= 1000;
|
||||
public static final int DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT = 1000;
|
||||
public static final String HBASE_REGION_SERVER_ENABLE_COMPACTION =
|
||||
"hbase.regionserver.compaction.enabled";
|
||||
|
||||
private final HRegionServer server;
|
||||
private final Configuration conf;
|
||||
|
||||
private final ThreadPoolExecutor longCompactions;
|
||||
private final ThreadPoolExecutor shortCompactions;
|
||||
private final ThreadPoolExecutor splits;
|
||||
private volatile ThreadPoolExecutor longCompactions;
|
||||
private volatile ThreadPoolExecutor shortCompactions;
|
||||
private volatile ThreadPoolExecutor splits;
|
||||
private final ThreadPoolExecutor mergePool;
|
||||
|
||||
private volatile ThroughputController compactionThroughputController;
|
||||
|
||||
private volatile boolean compactionsEnabled;
|
||||
|
||||
/**
|
||||
* Splitting should not take place if the total number of regions exceed this.
|
||||
* This is not a hard limit to the number of regions but it is a guideline to
|
||||
|
@ -103,53 +107,12 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
super();
|
||||
this.server = server;
|
||||
this.conf = server.getConfiguration();
|
||||
this.regionSplitLimit = conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT,
|
||||
DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
|
||||
|
||||
int largeThreads = Math.max(1, conf.getInt(
|
||||
LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
|
||||
int smallThreads = conf.getInt(
|
||||
SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
|
||||
|
||||
int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
|
||||
|
||||
// if we have throttle threads, make sure the user also specified size
|
||||
Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
|
||||
this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION, true);
|
||||
createCompactionExecutors();
|
||||
createSplitExcecutors();
|
||||
|
||||
final String n = Thread.currentThread().getName();
|
||||
|
||||
StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<>();
|
||||
this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
|
||||
60, TimeUnit.SECONDS, stealJobQueue,
|
||||
new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
String name = n + "-longCompactions-" + System.currentTimeMillis();
|
||||
return new Thread(r, name);
|
||||
}
|
||||
});
|
||||
this.longCompactions.setRejectedExecutionHandler(new Rejection());
|
||||
this.longCompactions.prestartAllCoreThreads();
|
||||
this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
|
||||
60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
|
||||
new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
String name = n + "-shortCompactions-" + System.currentTimeMillis();
|
||||
return new Thread(r, name);
|
||||
}
|
||||
});
|
||||
this.shortCompactions
|
||||
.setRejectedExecutionHandler(new Rejection());
|
||||
this.splits = (ThreadPoolExecutor)
|
||||
Executors.newFixedThreadPool(splitThreads,
|
||||
new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
String name = n + "-splits-" + System.currentTimeMillis();
|
||||
return new Thread(r, name);
|
||||
}
|
||||
});
|
||||
int mergeThreads = conf.getInt(MERGE_THREADS, MERGE_THREADS_DEFAULT);
|
||||
this.mergePool = (ThreadPoolExecutor) Executors.newFixedThreadPool(
|
||||
mergeThreads, new ThreadFactory() {
|
||||
|
@ -165,6 +128,56 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
CompactionThroughputControllerFactory.create(server, conf);
|
||||
}
|
||||
|
||||
private void createSplitExcecutors() {
|
||||
final String n = Thread.currentThread().getName();
|
||||
int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
|
||||
this.splits =
|
||||
(ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads, new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
String name = n + "-splits-" + System.currentTimeMillis();
|
||||
return new Thread(r, name);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void createCompactionExecutors() {
|
||||
this.regionSplitLimit =
|
||||
conf.getInt(REGION_SERVER_REGION_SPLIT_LIMIT, DEFAULT_REGION_SERVER_REGION_SPLIT_LIMIT);
|
||||
|
||||
int largeThreads =
|
||||
Math.max(1, conf.getInt(LARGE_COMPACTION_THREADS, LARGE_COMPACTION_THREADS_DEFAULT));
|
||||
int smallThreads = conf.getInt(SMALL_COMPACTION_THREADS, SMALL_COMPACTION_THREADS_DEFAULT);
|
||||
|
||||
// if we have throttle threads, make sure the user also specified size
|
||||
Preconditions.checkArgument(largeThreads > 0 && smallThreads > 0);
|
||||
|
||||
final String n = Thread.currentThread().getName();
|
||||
|
||||
StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>();
|
||||
this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60,
|
||||
TimeUnit.SECONDS, stealJobQueue,
|
||||
new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
String name = n + "-longCompactions-" + System.currentTimeMillis();
|
||||
return new Thread(r, name);
|
||||
}
|
||||
});
|
||||
this.longCompactions.setRejectedExecutionHandler(new Rejection());
|
||||
this.longCompactions.prestartAllCoreThreads();
|
||||
this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60,
|
||||
TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
|
||||
new ThreadFactory() {
|
||||
@Override
|
||||
public Thread newThread(Runnable r) {
|
||||
String name = n + "-shortCompactions-" + System.currentTimeMillis();
|
||||
return new Thread(r, name);
|
||||
}
|
||||
});
|
||||
this.shortCompactions.setRejectedExecutionHandler(new Rejection());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "compaction_queue=("
|
||||
|
@ -330,6 +343,30 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
|
||||
}
|
||||
|
||||
private void reInitializeCompactionsExecutors() {
|
||||
createCompactionExecutors();
|
||||
}
|
||||
|
||||
private void interrupt() {
|
||||
longCompactions.shutdownNow();
|
||||
shortCompactions.shutdownNow();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void switchCompaction(boolean onOrOff) {
|
||||
if (onOrOff) {
|
||||
// re-create executor pool if compactions are disabled.
|
||||
if (!isCompactionsEnabled()) {
|
||||
LOG.info("Re-Initializing compactions because user switched on compactions");
|
||||
reInitializeCompactionsExecutors();
|
||||
}
|
||||
} else {
|
||||
LOG.info("Interrupting running compactions because user switched off compactions");
|
||||
interrupt();
|
||||
}
|
||||
setCompactionsEnabled(onOrOff);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param r region store belongs to
|
||||
* @param s Store to request compaction on
|
||||
|
@ -368,6 +405,13 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
|
||||
private CompactionContext selectCompaction(final Region r, final Store s,
|
||||
int priority, CompactionRequest request, User user) throws IOException {
|
||||
// don't even select for compaction if disableCompactions is set to true
|
||||
if (!isCompactionsEnabled()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("User has disabled compactions");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
CompactionContext compaction = s.requestCompaction(priority, request, user);
|
||||
if (compaction == null) {
|
||||
if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
|
||||
|
@ -738,4 +782,27 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
|
|||
void shutdownLongCompactions(){
|
||||
this.longCompactions.shutdown();
|
||||
}
|
||||
|
||||
public boolean isCompactionsEnabled() {
|
||||
return compactionsEnabled;
|
||||
}
|
||||
|
||||
public void setCompactionsEnabled(boolean compactionsEnabled) {
|
||||
this.compactionsEnabled = compactionsEnabled;
|
||||
this.conf.set(HBASE_REGION_SERVER_ENABLE_COMPACTION,String.valueOf(compactionsEnabled));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the longCompactions thread pool executor
|
||||
*/
|
||||
ThreadPoolExecutor getLongCompactions() {
|
||||
return longCompactions;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the shortCompactions thread pool executor
|
||||
*/
|
||||
ThreadPoolExecutor getShortCompactions() {
|
||||
return shortCompactions;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,4 +96,9 @@ public interface CompactionRequestor {
|
|||
CompactionRequest requestCompaction(
|
||||
final Region r, final Store s, final String why, int pri, CompactionRequest request, User user
|
||||
) throws IOException;
|
||||
|
||||
/**
|
||||
* on/off compaction
|
||||
*/
|
||||
void switchCompaction(boolean onOrOff);
|
||||
}
|
||||
|
|
|
@ -58,7 +58,6 @@ import java.util.NavigableSet;
|
|||
import java.util.RandomAccess;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CompletionService;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -5793,7 +5792,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
* Determines whether multiple column families are present
|
||||
* Precondition: familyPaths is not null
|
||||
*
|
||||
* @param familyPaths List of Pair<byte[] column family, String hfilePath>
|
||||
* @param familyPaths
|
||||
*/
|
||||
private static boolean hasMultipleColumnFamilies(Collection<Pair<byte[], String>> familyPaths) {
|
||||
boolean multipleFamilies = false;
|
||||
|
@ -6591,9 +6590,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
|
||||
/**
|
||||
* This function is to maintain backward compatibility for 0.94 filters. HBASE-6429 combines
|
||||
* both filterRow & filterRow(List<KeyValue> kvs) functions. While 0.94 code or older, it may
|
||||
* both filterRow & filterRow(List<KeyValue> kvs) functions. While 0.94 code or older, it may
|
||||
* not implement hasFilterRow as HBase-6429 expects because 0.94 hasFilterRow() only returns
|
||||
* true when filterRow(List<KeyValue> kvs) is overridden not the filterRow(). Therefore, the
|
||||
* true when filterRow(List<KeyValue> kvs) is overridden not the filterRow(). Therefore, the
|
||||
* filterRow() will be skipped.
|
||||
*/
|
||||
private boolean filterRow() throws IOException {
|
||||
|
@ -7738,20 +7737,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param cell
|
||||
* @param tags
|
||||
* @return The passed-in List<Tag> but with the tags from <code>cell</code> added.
|
||||
*/
|
||||
private static List<Tag> carryForwardTags(final Cell cell, final List<Tag> tags) {
|
||||
if (cell.getTagsLength() <= 0) return tags;
|
||||
List<Tag> newTags = tags == null? new ArrayList<Tag>(): /*Append Tags*/tags;
|
||||
Iterator<Tag> i =
|
||||
CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength());
|
||||
while (i.hasNext()) newTags.add(i.next());
|
||||
return newTags;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a Get against passed in <code>store</code> on passed <code>row</code>, etc.
|
||||
* @param store
|
||||
|
@ -8668,7 +8653,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
break;
|
||||
}
|
||||
if (op == Operation.MERGE_REGION || op == Operation.SPLIT_REGION
|
||||
|| op == Operation.COMPACT_REGION) {
|
||||
|| op == Operation.COMPACT_REGION || op == Operation.COMPACT_SWITCH) {
|
||||
// split, merge or compact region doesn't need to check the closing/closed state or lock the
|
||||
// region
|
||||
return;
|
||||
|
|
|
@ -112,6 +112,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest
|
|||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactionSwitchRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactionSwitchResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
|
||||
|
@ -1517,6 +1519,26 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompactionSwitchResponse compactionSwitch(RpcController controller,
|
||||
CompactionSwitchRequest request) throws ServiceException {
|
||||
try {
|
||||
checkOpen();
|
||||
requestCount.increment();
|
||||
boolean prevState = regionServer.compactSplitThread.isCompactionsEnabled();
|
||||
CompactionSwitchResponse response =
|
||||
CompactionSwitchResponse.newBuilder().setPrevState(prevState).build();
|
||||
if (prevState == request.getEnabled()) {
|
||||
// passed in requested state is same as current state. No action required
|
||||
return response;
|
||||
}
|
||||
regionServer.compactSplitThread.switchCompaction(request.getEnabled());
|
||||
return response;
|
||||
} catch (IOException ie) {
|
||||
throw new ServiceException(ie);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush a region on the region server.
|
||||
*
|
||||
|
|
|
@ -221,7 +221,7 @@ public interface Region extends ConfigurationObserver {
|
|||
*/
|
||||
enum Operation {
|
||||
ANY, GET, PUT, DELETE, SCAN, APPEND, INCREMENT, SPLIT_REGION, MERGE_REGION, BATCH_MUTATE,
|
||||
REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT, SNAPSHOT
|
||||
REPLAY_BATCH_MUTATE, COMPACT_REGION, REPLAY_EVENT, SNAPSHOT, COMPACT_SWITCH
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -56,6 +56,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest
|
|||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactionSwitchRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactionSwitchResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionResponse;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
|
||||
|
@ -508,6 +510,12 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompactionSwitchResponse compactionSwitch(RpcController controller,
|
||||
CompactionSwitchRequest request) throws ServiceException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompactRegionResponse compactRegion(RpcController controller,
|
||||
CompactRegionRequest request) throws ServiceException {
|
||||
|
|
|
@ -38,8 +38,6 @@ import java.util.List;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
|
@ -87,7 +85,6 @@ import org.mockito.stubbing.Answer;
|
|||
@Category(MediumTests.class)
|
||||
public class TestCompaction {
|
||||
@Rule public TestName name = new TestName();
|
||||
private static final Log LOG = LogFactory.getLog(TestCompaction.class.getName());
|
||||
private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
|
||||
protected Configuration conf = UTIL.getConfiguration();
|
||||
|
||||
|
@ -356,6 +353,73 @@ public class TestCompaction {
|
|||
postFailedCount > preFailedCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test no new Compaction requests are generated after calling stop compactions
|
||||
*/
|
||||
@Test
|
||||
public void testStopStartCompaction() throws IOException {
|
||||
// setup a compact/split thread on a mock server
|
||||
HRegionServer mockServer = Mockito.mock(HRegionServer.class);
|
||||
Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
|
||||
CompactSplitThread thread = new CompactSplitThread(mockServer);
|
||||
Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
|
||||
// setup a region/store with some files
|
||||
Store store = r.getStore(COLUMN_FAMILY);
|
||||
createStoreFile(r);
|
||||
for (int i = 0; i < HStore.DEFAULT_BLOCKING_STOREFILE_COUNT - 1; i++) {
|
||||
createStoreFile(r);
|
||||
}
|
||||
thread.switchCompaction(false);
|
||||
thread.requestCompaction(r, store, "test", Store.PRIORITY_USER, new CompactionRequest(), null);
|
||||
assertEquals(false, thread.isCompactionsEnabled());
|
||||
assertEquals(0, thread.getLongCompactions().getActiveCount() + thread.getShortCompactions()
|
||||
.getActiveCount());
|
||||
thread.switchCompaction(true);
|
||||
assertEquals(true, thread.isCompactionsEnabled());
|
||||
thread.requestCompaction(r, store, "test", Store.PRIORITY_USER, new CompactionRequest(), null);
|
||||
assertEquals(1, thread.getLongCompactions().getActiveCount() + thread.getShortCompactions()
|
||||
.getActiveCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInterruptingRunningCompactions() throws Exception {
|
||||
// setup a compact/split thread on a mock server
|
||||
conf.set(CompactionThroughputControllerFactory.HBASE_THROUGHPUT_CONTROLLER_KEY,
|
||||
WaitThroughPutController.class.getName());
|
||||
HRegionServer mockServer = Mockito.mock(HRegionServer.class);
|
||||
Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
|
||||
CompactSplitThread thread = new CompactSplitThread(mockServer);
|
||||
|
||||
Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
|
||||
|
||||
// setup a region/store with some files
|
||||
Store store = r.getStore(COLUMN_FAMILY);
|
||||
int jmax = (int) Math.ceil(15.0 / compactionThreshold);
|
||||
byte[] pad = new byte[1000]; // 1 KB chunk
|
||||
for (int i = 0; i < compactionThreshold; i++) {
|
||||
HRegionIncommon loader = new HRegionIncommon(r);
|
||||
Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
|
||||
p.setDurability(Durability.SKIP_WAL);
|
||||
for (int j = 0; j < jmax; j++) {
|
||||
p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
|
||||
}
|
||||
HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY));
|
||||
loader.put(p);
|
||||
r.flush(true);
|
||||
}
|
||||
Store s = r.getStore(COLUMN_FAMILY);
|
||||
int initialFiles = s.getStorefilesCount();
|
||||
|
||||
thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER,
|
||||
new CompactionRequest(), null);
|
||||
|
||||
Thread.sleep(3000);
|
||||
thread.switchCompaction(false);
|
||||
assertEquals(initialFiles, s.getStorefilesCount());
|
||||
//don't mess up future tests
|
||||
thread.switchCompaction(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* HBASE-7947: Regression test to ensure adding to the correct list in the
|
||||
* {@link CompactSplitThread}
|
||||
|
@ -715,4 +779,20 @@ public class TestCompaction {
|
|||
this.done.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
|
||||
* finishes.
|
||||
*/
|
||||
public static class WaitThroughPutController extends NoLimitThroughputController{
|
||||
|
||||
public WaitThroughPutController() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public long control(String compactionName, long size) throws InterruptedException {
|
||||
Thread.sleep(6000000);
|
||||
return 6000000;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -344,6 +344,12 @@ public class TestReplicator extends TestReplicationBase {
|
|||
return delegate.splitRegion(controller, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompactionSwitchResponse compactionSwitch(RpcController controller,
|
||||
CompactionSwitchRequest request) throws ServiceException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompactRegionResponse compactRegion(RpcController controller,
|
||||
CompactRegionRequest request) throws ServiceException {
|
||||
|
|
|
@ -66,6 +66,19 @@ module Hbase
|
|||
end
|
||||
end
|
||||
|
||||
#----------------------------------------------------------------------------------------------
|
||||
# Switch compaction on/off at runtime on a region server
|
||||
def compaction_switch(on_or_off, regionserver_names)
|
||||
region_servers = regionserver_names.flatten.compact
|
||||
servers = java.util.ArrayList.new
|
||||
if region_servers.any?
|
||||
region_servers.each do |s|
|
||||
servers.add(s)
|
||||
end
|
||||
end
|
||||
@admin.compactionSwitch(java.lang.Boolean.valueOf(on_or_off), servers)
|
||||
end
|
||||
|
||||
#----------------------------------------------------------------------------------------------
|
||||
# Gets compaction state for specified table
|
||||
def getCompactionState(table_name)
|
||||
|
|
|
@ -332,6 +332,7 @@ Shell.load_command_group(
|
|||
is_in_maintenance_mode
|
||||
close_region
|
||||
compact
|
||||
compaction_switch
|
||||
flush
|
||||
major_compact
|
||||
move
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
#
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
module Shell
|
||||
module Commands
|
||||
# Switch compaction for a region server
|
||||
class CompactionSwitch < Command
|
||||
def help
|
||||
return <<-EOF
|
||||
Turn the compaction on or off on regionservers. Disabling compactions will also interrupt
|
||||
any currently ongoing compactions. This state is ephemeral. The setting will be lost on
|
||||
restart of the server. Compaction can also be enabled/disabled by modifying configuration
|
||||
hbase.regionserver.compaction.enabled in hbase-site.xml.
|
||||
Examples:
|
||||
To enable compactions on all region servers
|
||||
hbase> compaction_switch true
|
||||
To disable compactions on all region servers
|
||||
hbase> compaction_switch false
|
||||
To enable compactions on specific region servers
|
||||
hbase> compaction_switch true 'server2','server1'
|
||||
To disable compactions on specific region servers
|
||||
hbase> compaction_switch false 'server2','server1'
|
||||
NOTE: A server name is its host, port plus startcode. For example:
|
||||
host187.example.com,60020,1289493121758
|
||||
EOF
|
||||
end
|
||||
|
||||
def command(enable_disable, *server)
|
||||
now = Time.now
|
||||
formatter.header(%w(['SERVER' 'PREV_STATE']))
|
||||
prev_state = admin.compaction_switch(enable_disable, server)
|
||||
prev_state.each { |k, v| formatter.row([k.getServerName, java.lang.String.valueOf(v)]) }
|
||||
formatter.footer(now, prev_state.size)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1667,6 +1667,14 @@ See <<managed.compactions>>.
|
|||
Compactions do not perform region merges.
|
||||
See <<ops.regionmgt.merge>> for more information on region merging.
|
||||
|
||||
.Compaction Switch
|
||||
We can switch on and off the compactions at region servers. Switching off compactions will also
|
||||
interrupt any currently ongoing compactions. It can be done dynamically using the "compaction_switch"
|
||||
command from hbase shell. If done from the command line, this setting will be lost on restart of the
|
||||
server. To persist the changes across region servers modify the configuration hbase.regionserver
|
||||
.compaction.enabled in hbase-site.xml and restart HBase.
|
||||
|
||||
|
||||
[[compaction.file.selection]]
|
||||
===== Compaction Policy - HBase 0.96.x and newer
|
||||
|
||||
|
|
Loading…
Reference in New Issue