HBASE-5699 Adds multiple WALs per Region Server based on groups of regions.
This commit is contained in:
parent
0c7d22439d
commit
f1c41e307e
|
@ -0,0 +1,106 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.wal;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
// imports for classes still in regionserver.wal
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A WAL Provider that pre-creates N WALProviders and then limits our grouping strategy to them.
|
||||||
|
* Control the number of delegate providers via "hbase.wal.regiongrouping.numgroups." Control
|
||||||
|
* the choice of delegate provider implementation and the grouping strategy the same as
|
||||||
|
* {@link RegionGroupingProvider}.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class BoundedRegionGroupingProvider extends RegionGroupingProvider {
|
||||||
|
private static final Log LOG = LogFactory.getLog(BoundedRegionGroupingProvider.class);
|
||||||
|
|
||||||
|
static final String NUM_REGION_GROUPS = "hbase.wal.regiongrouping.numgroups";
|
||||||
|
static final int DEFAULT_NUM_REGION_GROUPS = 2;
|
||||||
|
private WALProvider[] delegates;
|
||||||
|
private AtomicInteger counter = new AtomicInteger(0);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(final WALFactory factory, final Configuration conf,
|
||||||
|
final List<WALActionsListener> listeners, final String providerId) throws IOException {
|
||||||
|
super.init(factory, conf, listeners, providerId);
|
||||||
|
// no need to check for and close down old providers; our parent class will throw on re-invoke
|
||||||
|
delegates = new WALProvider[Math.max(1, conf.getInt(NUM_REGION_GROUPS,
|
||||||
|
DEFAULT_NUM_REGION_GROUPS))];
|
||||||
|
for (int i = 0; i < delegates.length; i++) {
|
||||||
|
delegates[i] = factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER, listeners,
|
||||||
|
providerId + i);
|
||||||
|
}
|
||||||
|
LOG.info("Configured to run with " + delegates.length + " delegate WAL providers.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
WALProvider populateCache(final byte[] group) {
|
||||||
|
final WALProvider temp = delegates[counter.getAndIncrement() % delegates.length];
|
||||||
|
final WALProvider extant = cached.putIfAbsent(group, temp);
|
||||||
|
// if someone else beat us to initializing, just take what they set.
|
||||||
|
// note that in such a case we skew load away from the provider we picked at first
|
||||||
|
return extant == null ? temp : extant;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() throws IOException {
|
||||||
|
// save the last exception and rethrow
|
||||||
|
IOException failure = null;
|
||||||
|
for (WALProvider provider : delegates) {
|
||||||
|
try {
|
||||||
|
provider.shutdown();
|
||||||
|
} catch (IOException exception) {
|
||||||
|
LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage());
|
||||||
|
LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
|
||||||
|
failure = exception;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (failure != null) {
|
||||||
|
throw failure;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
// save the last exception and rethrow
|
||||||
|
IOException failure = null;
|
||||||
|
for (WALProvider provider : delegates) {
|
||||||
|
try {
|
||||||
|
provider.close();
|
||||||
|
} catch (IOException exception) {
|
||||||
|
LOG.error("Problem closing provider '" + provider + "': " + exception.getMessage());
|
||||||
|
LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
|
||||||
|
failure = exception;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (failure != null) {
|
||||||
|
throw failure;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,212 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.wal;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
// imports for classes still in regionserver.wal
|
||||||
|
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A WAL Provider that returns a WAL per group of regions.
|
||||||
|
*
|
||||||
|
* Region grouping is handled via {@link RegionGroupingStrategy} and can be configured via the
|
||||||
|
* property "hbase.wal.regiongrouping.strategy". Current strategy choices are
|
||||||
|
* <ul>
|
||||||
|
* <li><em>defaultStrategy</em> : Whatever strategy this version of HBase picks. currently
|
||||||
|
* "identity".</li>
|
||||||
|
* <li><em>identity</em> : each region belongs to its own group.</li>
|
||||||
|
* </ul>
|
||||||
|
* Optionally, a FQCN to a custom implementation may be given.
|
||||||
|
*
|
||||||
|
* WAL creation is delegated to another WALProvider, configured via the property
|
||||||
|
* "hbase.wal.regiongrouping.delegate". The property takes the same options as "hbase.wal.provider"
|
||||||
|
* (ref {@link WALFactory}) and defaults to the defaultProvider.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class RegionGroupingProvider implements WALProvider {
|
||||||
|
private static final Log LOG = LogFactory.getLog(RegionGroupingProvider.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Map identifiers to a group number.
|
||||||
|
*/
|
||||||
|
public static interface RegionGroupingStrategy {
|
||||||
|
/**
|
||||||
|
* Given an identifier, pick a group.
|
||||||
|
* the byte[] returned for a given group must always use the same instance, since we
|
||||||
|
* will be using it as a hash key.
|
||||||
|
*/
|
||||||
|
byte[] group(final byte[] identifier);
|
||||||
|
void init(Configuration config);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Maps between configuration names for strategies and implementation classes.
|
||||||
|
*/
|
||||||
|
static enum Strategies {
|
||||||
|
defaultStrategy(IdentityGroupingStrategy.class),
|
||||||
|
identity(IdentityGroupingStrategy.class);
|
||||||
|
|
||||||
|
final Class<? extends RegionGroupingStrategy> clazz;
|
||||||
|
Strategies(Class<? extends RegionGroupingStrategy> clazz) {
|
||||||
|
this.clazz = clazz;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* instantiate a strategy from a config property.
|
||||||
|
* requires conf to have already been set (as well as anything the provider might need to read).
|
||||||
|
*/
|
||||||
|
RegionGroupingStrategy getStrategy(final Configuration conf, final String key,
|
||||||
|
final String defaultValue) throws IOException {
|
||||||
|
Class<? extends RegionGroupingStrategy> clazz;
|
||||||
|
try {
|
||||||
|
clazz = Strategies.valueOf(conf.get(key, defaultValue)).clazz;
|
||||||
|
} catch (IllegalArgumentException exception) {
|
||||||
|
// Fall back to them specifying a class name
|
||||||
|
// Note that the passed default class shouldn't actually be used, since the above only fails
|
||||||
|
// when there is a config value present.
|
||||||
|
clazz = conf.getClass(key, IdentityGroupingStrategy.class, RegionGroupingStrategy.class);
|
||||||
|
}
|
||||||
|
LOG.info("Instantiating RegionGroupingStrategy of type " + clazz);
|
||||||
|
try {
|
||||||
|
final RegionGroupingStrategy result = clazz.newInstance();
|
||||||
|
result.init(conf);
|
||||||
|
return result;
|
||||||
|
} catch (InstantiationException exception) {
|
||||||
|
LOG.error("couldn't set up region grouping strategy, check config key " +
|
||||||
|
REGION_GROUPING_STRATEGY);
|
||||||
|
LOG.debug("Exception details for failure to load region grouping strategy.", exception);
|
||||||
|
throw new IOException("couldn't set up region grouping strategy", exception);
|
||||||
|
} catch (IllegalAccessException exception) {
|
||||||
|
LOG.error("couldn't set up region grouping strategy, check config key " +
|
||||||
|
REGION_GROUPING_STRATEGY);
|
||||||
|
LOG.debug("Exception details for failure to load region grouping strategy.", exception);
|
||||||
|
throw new IOException("couldn't set up region grouping strategy", exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy";
|
||||||
|
private static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name();
|
||||||
|
|
||||||
|
static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate";
|
||||||
|
static final String DEFAULT_DELEGATE_PROVIDER = WALFactory.Providers.defaultProvider.name();
|
||||||
|
|
||||||
|
protected final ConcurrentMap<byte[], WALProvider> cached =
|
||||||
|
new ConcurrentHashMap<byte[], WALProvider>();
|
||||||
|
|
||||||
|
|
||||||
|
protected RegionGroupingStrategy strategy = null;
|
||||||
|
private WALFactory factory = null;
|
||||||
|
private List<WALActionsListener> listeners = null;
|
||||||
|
private String providerId = null;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(final WALFactory factory, final Configuration conf,
|
||||||
|
final List<WALActionsListener> listeners, final String providerId) throws IOException {
|
||||||
|
if (null != strategy) {
|
||||||
|
throw new IllegalStateException("WALProvider.init should only be called once.");
|
||||||
|
}
|
||||||
|
this.factory = factory;
|
||||||
|
this.listeners = null == listeners ? null : Collections.unmodifiableList(listeners);
|
||||||
|
this.providerId = providerId;
|
||||||
|
this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Populate the cache for this group.
|
||||||
|
*/
|
||||||
|
WALProvider populateCache(final byte[] group) throws IOException {
|
||||||
|
final WALProvider temp = factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER,
|
||||||
|
listeners, providerId + "-" + UUID.randomUUID());
|
||||||
|
final WALProvider extant = cached.putIfAbsent(group, temp);
|
||||||
|
if (null != extant) {
|
||||||
|
// someone else beat us to initializing, just take what they set.
|
||||||
|
temp.close();
|
||||||
|
return extant;
|
||||||
|
}
|
||||||
|
return temp;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WAL getWAL(final byte[] identifier) throws IOException {
|
||||||
|
final byte[] group = strategy.group(identifier);
|
||||||
|
WALProvider provider = cached.get(group);
|
||||||
|
if (null == provider) {
|
||||||
|
provider = populateCache(group);
|
||||||
|
}
|
||||||
|
return provider.getWAL(identifier);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() throws IOException {
|
||||||
|
// save the last exception and rethrow
|
||||||
|
IOException failure = null;
|
||||||
|
for (WALProvider provider : cached.values()) {
|
||||||
|
try {
|
||||||
|
provider.shutdown();
|
||||||
|
} catch (IOException exception) {
|
||||||
|
LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage());
|
||||||
|
LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
|
||||||
|
failure = exception;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (failure != null) {
|
||||||
|
throw failure;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
// save the last exception and rethrow
|
||||||
|
IOException failure = null;
|
||||||
|
for (WALProvider provider : cached.values()) {
|
||||||
|
try {
|
||||||
|
provider.close();
|
||||||
|
} catch (IOException exception) {
|
||||||
|
LOG.error("Problem closing provider '" + provider + "': " + exception.getMessage());
|
||||||
|
LOG.debug("Details of problem shutting down provider '" + provider + "'", exception);
|
||||||
|
failure = exception;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (failure != null) {
|
||||||
|
throw failure;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class IdentityGroupingStrategy implements RegionGroupingStrategy {
|
||||||
|
@Override
|
||||||
|
public void init(Configuration config) {}
|
||||||
|
@Override
|
||||||
|
public byte[] group(final byte[] identifier) {
|
||||||
|
return identifier;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -54,7 +54,12 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
||||||
* Configure which provider gets used with the configuration setting "hbase.wal.provider". Available
|
* Configure which provider gets used with the configuration setting "hbase.wal.provider". Available
|
||||||
* implementations:
|
* implementations:
|
||||||
* <ul>
|
* <ul>
|
||||||
* <li><em>defaultProvider</em> : whatever provider is standard for the hbase version.</li>
|
* <li><em>defaultProvider</em> : whatever provider is standard for the hbase version. Currently
|
||||||
|
* "filesystem"</li>
|
||||||
|
* <li><em>filesystem</em> : a provider that will run on top of an implementation of the Hadoop
|
||||||
|
* FileSystem interface, normally HDFS.</li>
|
||||||
|
* <li><em>multiwal</em> : a provider that will use multiple "filesystem" wal instances per region
|
||||||
|
* server.</li>
|
||||||
* </ul>
|
* </ul>
|
||||||
*
|
*
|
||||||
* Alternatively, you may provide a custome implementation of {@link WALProvider} by class name.
|
* Alternatively, you may provide a custome implementation of {@link WALProvider} by class name.
|
||||||
|
@ -68,7 +73,9 @@ public class WALFactory {
|
||||||
* Maps between configuration names for providers and implementation classes.
|
* Maps between configuration names for providers and implementation classes.
|
||||||
*/
|
*/
|
||||||
static enum Providers {
|
static enum Providers {
|
||||||
defaultProvider(DefaultWALProvider.class);
|
defaultProvider(DefaultWALProvider.class),
|
||||||
|
filesystem(DefaultWALProvider.class),
|
||||||
|
multiwal(BoundedRegionGroupingProvider.class);
|
||||||
|
|
||||||
Class<? extends WALProvider> clazz;
|
Class<? extends WALProvider> clazz;
|
||||||
Providers(Class<? extends WALProvider> clazz) {
|
Providers(Class<? extends WALProvider> clazz) {
|
||||||
|
@ -133,6 +140,7 @@ public class WALFactory {
|
||||||
// when there is a config value present.
|
// when there is a config value present.
|
||||||
clazz = conf.getClass(key, DefaultWALProvider.class, WALProvider.class);
|
clazz = conf.getClass(key, DefaultWALProvider.class, WALProvider.class);
|
||||||
}
|
}
|
||||||
|
LOG.info("Instantiating WALProvider of type " + clazz);
|
||||||
try {
|
try {
|
||||||
final WALProvider result = clazz.newInstance();
|
final WALProvider result = clazz.newInstance();
|
||||||
result.init(this, conf, listeners, providerId);
|
result.init(this, conf, listeners, providerId);
|
||||||
|
|
|
@ -0,0 +1,183 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.wal;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.apache.hadoop.hbase.wal.BoundedRegionGroupingProvider.NUM_REGION_GROUPS;
|
||||||
|
import static org.apache.hadoop.hbase.wal.BoundedRegionGroupingProvider.DEFAULT_NUM_REGION_GROUPS;
|
||||||
|
import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
|
||||||
|
@Category({RegionServerTests.class, LargeTests.class})
|
||||||
|
public class TestBoundedRegionGroupingProvider {
|
||||||
|
protected static final Log LOG = LogFactory.getLog(TestBoundedRegionGroupingProvider.class);
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName currentTest = new TestName();
|
||||||
|
protected static Configuration conf;
|
||||||
|
protected static FileSystem fs;
|
||||||
|
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
FileStatus[] entries = fs.listStatus(new Path("/"));
|
||||||
|
for (FileStatus dir : entries) {
|
||||||
|
fs.delete(dir.getPath(), true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
conf = TEST_UTIL.getConfiguration();
|
||||||
|
// Make block sizes small.
|
||||||
|
conf.setInt("dfs.blocksize", 1024 * 1024);
|
||||||
|
// quicker heartbeat interval for faster DN death notification
|
||||||
|
conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000);
|
||||||
|
conf.setInt("dfs.heartbeat.interval", 1);
|
||||||
|
conf.setInt("dfs.client.socket-timeout", 5000);
|
||||||
|
|
||||||
|
// faster failover with cluster.shutdown();fs.close() idiom
|
||||||
|
conf.setInt("hbase.ipc.client.connect.max.retries", 1);
|
||||||
|
conf.setInt("dfs.client.block.recovery.retries", 1);
|
||||||
|
conf.setInt("hbase.ipc.client.connection.maxidletime", 500);
|
||||||
|
|
||||||
|
conf.setClass(WAL_PROVIDER, BoundedRegionGroupingProvider.class, WALProvider.class);
|
||||||
|
|
||||||
|
TEST_UTIL.startMiniDFSCluster(3);
|
||||||
|
|
||||||
|
fs = TEST_UTIL.getDFSCluster().getFileSystem();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDownAfterClass() throws Exception {
|
||||||
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write to a log file with three concurrent threads and verifying all data is written.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testConcurrentWrites() throws Exception {
|
||||||
|
// Run the WPE tool with three threads writing 3000 edits each concurrently.
|
||||||
|
// When done, verify that all edits were written.
|
||||||
|
int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
|
||||||
|
new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"});
|
||||||
|
assertEquals(0, errCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make sure we can successfully run with more regions then our bound.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMoreRegionsThanBound() throws Exception {
|
||||||
|
final String parallelism = Integer.toString(DEFAULT_NUM_REGION_GROUPS * 2);
|
||||||
|
int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
|
||||||
|
new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000",
|
||||||
|
"-regions", parallelism});
|
||||||
|
assertEquals(0, errCode);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBoundsGreaterThanDefault() throws Exception {
|
||||||
|
final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
|
||||||
|
try {
|
||||||
|
conf.setInt(NUM_REGION_GROUPS, temp*4);
|
||||||
|
final String parallelism = Integer.toString(temp*4);
|
||||||
|
int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
|
||||||
|
new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000",
|
||||||
|
"-regions", parallelism});
|
||||||
|
assertEquals(0, errCode);
|
||||||
|
} finally {
|
||||||
|
conf.setInt(NUM_REGION_GROUPS, temp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMoreRegionsThanBoundWithBoundsGreaterThanDefault() throws Exception {
|
||||||
|
final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
|
||||||
|
try {
|
||||||
|
conf.setInt(NUM_REGION_GROUPS, temp*4);
|
||||||
|
final String parallelism = Integer.toString(temp*4*2);
|
||||||
|
int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf),
|
||||||
|
new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000",
|
||||||
|
"-regions", parallelism});
|
||||||
|
assertEquals(0, errCode);
|
||||||
|
} finally {
|
||||||
|
conf.setInt(NUM_REGION_GROUPS, temp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensure that we can use Set.add to deduplicate WALs
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void setMembershipDedups() throws IOException {
|
||||||
|
final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS);
|
||||||
|
WALFactory wals = null;
|
||||||
|
try {
|
||||||
|
conf.setInt(NUM_REGION_GROUPS, temp*4);
|
||||||
|
wals = new WALFactory(conf, null, currentTest.getMethodName());
|
||||||
|
final Set<WAL> seen = new HashSet<WAL>(temp*4);
|
||||||
|
final Random random = new Random();
|
||||||
|
int count = 0;
|
||||||
|
// we know that this should see one of the wals more than once
|
||||||
|
for (int i = 0; i < temp*8; i++) {
|
||||||
|
final WAL maybeNewWAL = wals.getWAL(Bytes.toBytes(random.nextInt()));
|
||||||
|
LOG.info("Iteration " + i + ", checking wal " + maybeNewWAL);
|
||||||
|
if (seen.add(maybeNewWAL)) {
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals("received back a different number of WALs that are not equal() to each other " +
|
||||||
|
"than the bound we placed.", temp*4, count);
|
||||||
|
} finally {
|
||||||
|
if (wals != null) {
|
||||||
|
wals.close();
|
||||||
|
}
|
||||||
|
conf.setInt(NUM_REGION_GROUPS, temp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue