HBASE-14306 Refine RegionGroupingProvider: fix issues and make it more scalable (Yu Li)
This commit is contained in:
parent
ceff3e242b
commit
99df022f2c
|
@ -35,12 +35,11 @@ import org.apache.hadoop.hbase.ServerName;
|
|||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
|
||||
import org.apache.hadoop.hbase.io.hfile.CacheStats;
|
||||
import org.apache.hadoop.hbase.wal.BoundedRegionGroupingProvider;
|
||||
import org.apache.hadoop.hbase.mob.MobCacheConfig;
|
||||
import org.apache.hadoop.hbase.mob.MobFileCache;
|
||||
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
|
||||
import org.apache.hadoop.metrics2.MetricsExecutor;
|
||||
|
@ -660,10 +659,12 @@ class MetricsRegionServerWrapperImpl
|
|||
}
|
||||
lastRan = currentTime;
|
||||
|
||||
numWALFiles = DefaultWALProvider.getNumLogFiles(regionServer.walFactory) +
|
||||
BoundedRegionGroupingProvider.getNumLogFiles(regionServer.walFactory);
|
||||
walFileSize = DefaultWALProvider.getLogFileSize(regionServer.walFactory) +
|
||||
BoundedRegionGroupingProvider.getLogFileSize(regionServer.walFactory);
|
||||
WALProvider provider = regionServer.walFactory.getWALProvider();
|
||||
WALProvider metaProvider = regionServer.walFactory.getMetaWALProvider();
|
||||
numWALFiles = (provider == null ? 0 : provider.getNumLogFiles()) +
|
||||
(metaProvider == null ? 0 : metaProvider.getNumLogFiles());
|
||||
walFileSize = (provider == null ? 0 : provider.getLogFileSize()) +
|
||||
(provider == null ? 0 : provider.getLogFileSize());
|
||||
//Copy over computed values so that no thread sees half computed values.
|
||||
numStores = tempNumStores;
|
||||
numStoreFiles = tempNumStoreFiles;
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.wal.RegionGroupingProvider.RegionGroupingStrategy;
|
||||
|
||||
/**
|
||||
* A WAL grouping strategy that limits the number of delegate providers (i.e. wal group) to
|
||||
* "hbase.wal.regiongrouping.numgroups".
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class BoundedGroupingStrategy implements RegionGroupingStrategy{
|
||||
|
||||
static final String NUM_REGION_GROUPS = "hbase.wal.regiongrouping.numgroups";
|
||||
static final int DEFAULT_NUM_REGION_GROUPS = 2;
|
||||
|
||||
private ConcurrentHashMap<String, String> groupNameCache =
|
||||
new ConcurrentHashMap<String, String>();
|
||||
private AtomicInteger counter = new AtomicInteger(0);
|
||||
private String[] groupNames;
|
||||
|
||||
@Override
|
||||
public String group(byte[] identifier) {
|
||||
String idStr = Bytes.toString(identifier);
|
||||
String groupName = groupNameCache.get(idStr);
|
||||
if (null == groupName) {
|
||||
groupName = groupNames[counter.getAndIncrement() % groupNames.length];
|
||||
String extantName = groupNameCache.putIfAbsent(idStr, groupName);
|
||||
if (extantName != null) {
|
||||
return extantName;
|
||||
}
|
||||
}
|
||||
return groupName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Configuration config, String providerId) {
|
||||
int regionGroupNumber = config.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
|
||||
groupNames = new String[regionGroupNumber];
|
||||
for (int i = 0; i < regionGroupNumber; i++) {
|
||||
groupNames[i] = providerId + GROUP_NAME_DELIMITER + "regiongroup-" + i;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,159 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
|
||||
// imports for classes still in regionserver.wal
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
|
||||
/**
|
||||
* A WAL Provider that pre-creates N WALProviders and then limits our grouping strategy to them.
|
||||
* Control the number of delegate providers via "hbase.wal.regiongrouping.numgroups." Control
|
||||
* the choice of delegate provider implementation and the grouping strategy the same as
|
||||
* {@link RegionGroupingProvider}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class BoundedRegionGroupingProvider extends RegionGroupingProvider {
|
||||
private static final Log LOG = LogFactory.getLog(BoundedRegionGroupingProvider.class);
|
||||
|
||||
static final String NUM_REGION_GROUPS = "hbase.wal.regiongrouping.numgroups";
|
||||
static final int DEFAULT_NUM_REGION_GROUPS = 2;
|
||||
private WALProvider[] delegates;
|
||||
private AtomicInteger counter = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public void init(final WALFactory factory, final Configuration conf,
|
||||
final List<WALActionsListener> listeners, final String providerId) throws IOException {
|
||||
super.init(factory, conf, listeners, providerId);
|
||||
// no need to check for and close down old providers; our parent class will throw on re-invoke
|
||||
delegates = new WALProvider[Math.max(1, conf.getInt(NUM_REGION_GROUPS,
|
||||
DEFAULT_NUM_REGION_GROUPS))];
|
||||
for (int i = 0; i < delegates.length; i++) {
|
||||
delegates[i] = factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER, listeners,
|
||||
providerId + i);
|
||||
}
|
||||
LOG.info("Configured to run with " + delegates.length + " delegate WAL providers.");
|
||||
}
|
||||
|
||||
@Override
|
||||
WALProvider populateCache(final byte[] group) {
|
||||
final WALProvider temp = delegates[counter.getAndIncrement() % delegates.length];
|
||||
final WALProvider extant = cached.putIfAbsent(group, temp);
|
||||
// if someone else beat us to initializing, just take what they set.
|
||||
// note that in such a case we skew load away from the provider we picked at first
|
||||
return extant == null ? temp : extant;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() throws IOException {
|
||||
// save the last exception and rethrow
|
||||
IOException failure = null;
|
||||
for (WALProvider provider : delegates) {
|
||||
try {
|
||||
provider.shutdown();
|
||||
} catch (IOException exception) {
|
||||
LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage());
|
||||
LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
|
||||
failure = exception;
|
||||
}
|
||||
}
|
||||
if (failure != null) {
|
||||
throw failure;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
// save the last exception and rethrow
|
||||
IOException failure = null;
|
||||
for (WALProvider provider : delegates) {
|
||||
try {
|
||||
provider.close();
|
||||
} catch (IOException exception) {
|
||||
LOG.error("Problem closing provider '" + provider + "': " + exception.getMessage());
|
||||
LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
|
||||
failure = exception;
|
||||
}
|
||||
}
|
||||
if (failure != null) {
|
||||
throw failure;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* iff the given WALFactory is using the BoundedRegionGroupingProvider for meta and/or non-meta,
|
||||
* count the number of files (rolled and active). if either of them isn't, count 0
|
||||
* for that provider.
|
||||
* @param walFactory may not be null.
|
||||
*/
|
||||
public static long getNumLogFiles(WALFactory walFactory) {
|
||||
long result = 0;
|
||||
if (walFactory.provider instanceof BoundedRegionGroupingProvider) {
|
||||
BoundedRegionGroupingProvider groupProviders =
|
||||
(BoundedRegionGroupingProvider)walFactory.provider;
|
||||
for (int i = 0; i < groupProviders.delegates.length; i++) {
|
||||
result +=
|
||||
((FSHLog)((DefaultWALProvider)(groupProviders.delegates[i])).log).getNumLogFiles();
|
||||
}
|
||||
}
|
||||
WALProvider meta = walFactory.metaProvider.get();
|
||||
if (meta instanceof BoundedRegionGroupingProvider) {
|
||||
for (int i = 0; i < ((BoundedRegionGroupingProvider)meta).delegates.length; i++) {
|
||||
result += ((FSHLog)
|
||||
((DefaultWALProvider)(((BoundedRegionGroupingProvider)meta).delegates[i])).log)
|
||||
.getNumLogFiles(); }
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* iff the given WALFactory is using the BoundedRegionGroupingProvider for meta and/or non-meta,
|
||||
* count the size of files (rolled and active). if either of them isn't, count 0
|
||||
* for that provider.
|
||||
* @param walFactory may not be null.
|
||||
*/
|
||||
public static long getLogFileSize(WALFactory walFactory) {
|
||||
long result = 0;
|
||||
if (walFactory.provider instanceof BoundedRegionGroupingProvider) {
|
||||
BoundedRegionGroupingProvider groupProviders =
|
||||
(BoundedRegionGroupingProvider)walFactory.provider;
|
||||
for (int i = 0; i < groupProviders.delegates.length; i++) {
|
||||
result +=
|
||||
((FSHLog)((DefaultWALProvider)(groupProviders.delegates[i])).log).getLogFileSize();
|
||||
}
|
||||
}
|
||||
WALProvider meta = walFactory.metaProvider.get();
|
||||
if (meta instanceof BoundedRegionGroupingProvider) {
|
||||
for (int i = 0; i < ((BoundedRegionGroupingProvider)meta).delegates.length; i++) {
|
||||
result += ((FSHLog)
|
||||
((DefaultWALProvider)(((BoundedRegionGroupingProvider)meta).delegates[i])).log)
|
||||
.getLogFileSize();
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -126,36 +126,20 @@ public class DefaultWALProvider implements WALProvider {
|
|||
* iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta,
|
||||
* count the number of files (rolled and active). if either of them aren't, count 0
|
||||
* for that provider.
|
||||
* @param walFactory may not be null.
|
||||
*/
|
||||
public static long getNumLogFiles(WALFactory walFactory) {
|
||||
long result = 0;
|
||||
if (walFactory.provider instanceof DefaultWALProvider) {
|
||||
result += ((FSHLog)((DefaultWALProvider)walFactory.provider).log).getNumLogFiles();
|
||||
}
|
||||
WALProvider meta = walFactory.metaProvider.get();
|
||||
if (meta instanceof DefaultWALProvider) {
|
||||
result += ((FSHLog)((DefaultWALProvider)meta).log).getNumLogFiles();
|
||||
}
|
||||
return result;
|
||||
@Override
|
||||
public long getNumLogFiles() {
|
||||
return this.log.getNumLogFiles();
|
||||
}
|
||||
|
||||
/**
|
||||
* iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta,
|
||||
* count the size of files (rolled and active). if either of them aren't, count 0
|
||||
* for that provider.
|
||||
* @param walFactory may not be null.
|
||||
*/
|
||||
public static long getLogFileSize(WALFactory walFactory) {
|
||||
long result = 0;
|
||||
if (walFactory.provider instanceof DefaultWALProvider) {
|
||||
result += ((FSHLog)((DefaultWALProvider)walFactory.provider).log).getLogFileSize();
|
||||
}
|
||||
WALProvider meta = walFactory.metaProvider.get();
|
||||
if (meta instanceof DefaultWALProvider) {
|
||||
result += ((FSHLog)((DefaultWALProvider)meta).log).getLogFileSize();
|
||||
}
|
||||
return result;
|
||||
@Override
|
||||
public long getLogFileSize() {
|
||||
return this.log.getLogFileSize();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -219,4 +219,14 @@ class DisabledWALProvider implements WALProvider {
|
|||
return "WAL disabled.";
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumLogFiles() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLogFileSize() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,9 @@ package org.apache.hadoop.hbase.wal;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
@ -32,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
|
||||
// imports for classes still in regionserver.wal
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
* A WAL Provider that returns a WAL per group of regions.
|
||||
|
@ -57,21 +60,23 @@ class RegionGroupingProvider implements WALProvider {
|
|||
* Map identifiers to a group number.
|
||||
*/
|
||||
public static interface RegionGroupingStrategy {
|
||||
String GROUP_NAME_DELIMITER = ".";
|
||||
/**
|
||||
* Given an identifier, pick a group.
|
||||
* the byte[] returned for a given group must always use the same instance, since we
|
||||
* will be using it as a hash key.
|
||||
*/
|
||||
byte[] group(final byte[] identifier);
|
||||
void init(Configuration config);
|
||||
String group(final byte[] identifier);
|
||||
void init(Configuration config, String providerId);
|
||||
}
|
||||
|
||||
/**
|
||||
* Maps between configuration names for strategies and implementation classes.
|
||||
*/
|
||||
static enum Strategies {
|
||||
defaultStrategy(IdentityGroupingStrategy.class),
|
||||
identity(IdentityGroupingStrategy.class);
|
||||
defaultStrategy(BoundedGroupingStrategy.class),
|
||||
identity(IdentityGroupingStrategy.class),
|
||||
bounded(BoundedGroupingStrategy.class);
|
||||
|
||||
final Class<? extends RegionGroupingStrategy> clazz;
|
||||
Strategies(Class<? extends RegionGroupingStrategy> clazz) {
|
||||
|
@ -97,7 +102,7 @@ class RegionGroupingProvider implements WALProvider {
|
|||
LOG.info("Instantiating RegionGroupingStrategy of type " + clazz);
|
||||
try {
|
||||
final RegionGroupingStrategy result = clazz.newInstance();
|
||||
result.init(conf);
|
||||
result.init(conf, providerId);
|
||||
return result;
|
||||
} catch (InstantiationException exception) {
|
||||
LOG.error("couldn't set up region grouping strategy, check config key " +
|
||||
|
@ -112,14 +117,18 @@ class RegionGroupingProvider implements WALProvider {
|
|||
}
|
||||
}
|
||||
|
||||
private static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
|
||||
private static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
|
||||
public static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
|
||||
public static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
|
||||
|
||||
static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate";
|
||||
static final String DEFAULT_DELEGATE_PROVIDER = WALFactory.Providers.defaultProvider.name();
|
||||
|
||||
protected final ConcurrentMap<byte[], WALProvider> cached =
|
||||
new ConcurrentHashMap<byte[], WALProvider>();
|
||||
/** A group-provider mapping, recommended to make sure one-one rather than many-one mapping */
|
||||
protected final ConcurrentMap<String, WALProvider> cached =
|
||||
new ConcurrentHashMap<String, WALProvider>();
|
||||
/** Stores delegation providers (no duplicated) used by this RegionGroupingProvider */
|
||||
private final Set<WALProvider> providers = Collections
|
||||
.synchronizedSet(new HashSet<WALProvider>());
|
||||
|
||||
|
||||
protected RegionGroupingStrategy strategy = null;
|
||||
|
@ -142,7 +151,7 @@ class RegionGroupingProvider implements WALProvider {
|
|||
/**
|
||||
* Populate the cache for this group.
|
||||
*/
|
||||
WALProvider populateCache(final byte[] group) throws IOException {
|
||||
WALProvider populateCache(final String group) throws IOException {
|
||||
final WALProvider temp = factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER,
|
||||
listeners, providerId + "-" + UUID.randomUUID());
|
||||
final WALProvider extant = cached.putIfAbsent(group, temp);
|
||||
|
@ -151,12 +160,13 @@ class RegionGroupingProvider implements WALProvider {
|
|||
temp.close();
|
||||
return extant;
|
||||
}
|
||||
providers.add(temp);
|
||||
return temp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public WAL getWAL(final byte[] identifier) throws IOException {
|
||||
final byte[] group = strategy.group(identifier);
|
||||
final String group = strategy.group(identifier);
|
||||
WALProvider provider = cached.get(group);
|
||||
if (null == provider) {
|
||||
provider = populateCache(group);
|
||||
|
@ -168,13 +178,15 @@ class RegionGroupingProvider implements WALProvider {
|
|||
public void shutdown() throws IOException {
|
||||
// save the last exception and rethrow
|
||||
IOException failure = null;
|
||||
for (WALProvider provider : cached.values()) {
|
||||
try {
|
||||
provider.shutdown();
|
||||
} catch (IOException exception) {
|
||||
LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage());
|
||||
LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
|
||||
failure = exception;
|
||||
synchronized (providers) {
|
||||
for (WALProvider provider : providers) {
|
||||
try {
|
||||
provider.shutdown();
|
||||
} catch (IOException exception) {
|
||||
LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage());
|
||||
LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
|
||||
failure = exception;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (failure != null) {
|
||||
|
@ -186,13 +198,15 @@ class RegionGroupingProvider implements WALProvider {
|
|||
public void close() throws IOException {
|
||||
// save the last exception and rethrow
|
||||
IOException failure = null;
|
||||
for (WALProvider provider : cached.values()) {
|
||||
try {
|
||||
provider.close();
|
||||
} catch (IOException exception) {
|
||||
LOG.error("Problem closing provider '" + provider + "': " + exception.getMessage());
|
||||
LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
|
||||
failure = exception;
|
||||
synchronized (providers) {
|
||||
for (WALProvider provider : providers) {
|
||||
try {
|
||||
provider.close();
|
||||
} catch (IOException exception) {
|
||||
LOG.error("Problem closing provider '" + provider + "': " + exception.getMessage());
|
||||
LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
|
||||
failure = exception;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (failure != null) {
|
||||
|
@ -202,11 +216,33 @@ class RegionGroupingProvider implements WALProvider {
|
|||
|
||||
static class IdentityGroupingStrategy implements RegionGroupingStrategy {
|
||||
@Override
|
||||
public void init(Configuration config) {}
|
||||
public void init(Configuration config, String providerId) {}
|
||||
@Override
|
||||
public byte[] group(final byte[] identifier) {
|
||||
return identifier;
|
||||
public String group(final byte[] identifier) {
|
||||
return Bytes.toString(identifier);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumLogFiles() {
|
||||
long numLogFiles = 0;
|
||||
synchronized (providers) {
|
||||
for (WALProvider provider : providers) {
|
||||
numLogFiles += provider.getNumLogFiles();
|
||||
}
|
||||
}
|
||||
return numLogFiles;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLogFileSize() {
|
||||
long logFileSize = 0;
|
||||
synchronized (providers) {
|
||||
for (WALProvider provider : providers) {
|
||||
logFileSize += provider.getLogFileSize();
|
||||
}
|
||||
}
|
||||
return logFileSize;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -75,7 +75,7 @@ public class WALFactory {
|
|||
static enum Providers {
|
||||
defaultProvider(DefaultWALProvider.class),
|
||||
filesystem(DefaultWALProvider.class),
|
||||
multiwal(BoundedRegionGroupingProvider.class);
|
||||
multiwal(RegionGroupingProvider.class);
|
||||
|
||||
Class<? extends WALProvider> clazz;
|
||||
Providers(Class<? extends WALProvider> clazz) {
|
||||
|
@ -443,4 +443,12 @@ public class WALFactory {
|
|||
throws IOException {
|
||||
return DefaultWALProvider.createWriter(configuration, fs, path, false);
|
||||
}
|
||||
|
||||
public final WALProvider getWALProvider() {
|
||||
return this.provider;
|
||||
}
|
||||
|
||||
public final WALProvider getMetaWALProvider() {
|
||||
return this.metaProvider.get();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,4 +79,14 @@ public interface WALProvider {
|
|||
long getLength() throws IOException;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get number of the log files this provider is managing
|
||||
*/
|
||||
long getNumLogFiles();
|
||||
|
||||
/**
|
||||
* Get size of the log files this provider is managing
|
||||
*/
|
||||
long getLogFileSize();
|
||||
|
||||
}
|
||||
|
|
|
@ -230,4 +230,14 @@ public class IOTestProvider implements WALProvider {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNumLogFiles() {
|
||||
return this.log.getNumLogFiles();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLogFileSize() {
|
||||
return this.log.getLogFileSize();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,9 +24,10 @@ import java.util.Random;
|
|||
import java.util.Set;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.apache.hadoop.hbase.wal.BoundedRegionGroupingProvider.NUM_REGION_GROUPS;
|
||||
import static org.apache.hadoop.hbase.wal.BoundedRegionGroupingProvider.DEFAULT_NUM_REGION_GROUPS;
|
||||
import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.NUM_REGION_GROUPS;
|
||||
import static org.apache.hadoop.hbase.wal.BoundedGroupingStrategy.DEFAULT_NUM_REGION_GROUPS;
|
||||
import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
|
||||
import static org.apache.hadoop.hbase.wal.RegionGroupingProvider.REGION_GROUPING_STRATEGY;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
|
|||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.wal.RegionGroupingProvider.RegionGroupingStrategy;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
@ -49,8 +51,8 @@ import org.junit.experimental.categories.Category;
|
|||
import org.junit.rules.TestName;
|
||||
|
||||
@Category({RegionServerTests.class, LargeTests.class})
|
||||
public class TestBoundedRegionGroupingProvider {
|
||||
protected static final Log LOG = LogFactory.getLog(TestBoundedRegionGroupingProvider.class);
|
||||
public class TestBoundedRegionGroupingStrategy {
|
||||
protected static final Log LOG = LogFactory.getLog(TestBoundedRegionGroupingStrategy.class);
|
||||
|
||||
@Rule
|
||||
public TestName currentTest = new TestName();
|
||||
|
@ -85,7 +87,8 @@ public class TestBoundedRegionGroupingProvider {
|
|||
conf.setInt("dfs.client.block.recovery.retries", 1);
|
||||
conf.setInt("hbase.ipc.client.connection.maxidletime", 500);
|
||||
|
||||
conf.setClass(WAL_PROVIDER, BoundedRegionGroupingProvider.class, WALProvider.class);
|
||||
conf.setClass(WAL_PROVIDER, RegionGroupingProvider.class, WALProvider.class);
|
||||
conf.set(REGION_GROUPING_STRATEGY, RegionGroupingProvider.Strategies.bounded.name());
|
||||
|
||||
TEST_UTIL.startMiniDFSCluster(3);
|
||||
|
Loading…
Reference in New Issue