diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java new file mode 100644 index 00000000000..478d5c3aafa --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRegionGroupingProvider.java @@ -0,0 +1,106 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; + +// imports for classes still in regionserver.wal +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; + +/** + * A WAL Provider that pre-creates N WALProviders and then limits our grouping strategy to them. + * Control the number of delegate providers via "hbase.wal.regiongrouping.numgroups." Control + * the choice of delegate provider implementation and the grouping strategy the same as + * {@link RegionGroupingProvider}. + */ +@InterfaceAudience.Private +class BoundedRegionGroupingProvider extends RegionGroupingProvider { + private static final Log LOG = LogFactory.getLog(BoundedRegionGroupingProvider.class); + + static final String NUM_REGION_GROUPS = "hbase.wal.regiongrouping.numgroups"; + static final int DEFAULT_NUM_REGION_GROUPS = 2; + private WALProvider[] delegates; + private AtomicInteger counter = new AtomicInteger(0); + + @Override + public void init(final WALFactory factory, final Configuration conf, + final List listeners, final String providerId) throws IOException { + super.init(factory, conf, listeners, providerId); + // no need to check for and close down old providers; our parent class will throw on re-invoke + delegates = new WALProvider[Math.max(1, conf.getInt(NUM_REGION_GROUPS, + DEFAULT_NUM_REGION_GROUPS))]; + for (int i = 0; i < delegates.length; i++) { + delegates[i] = factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER, listeners, + providerId + i); + } + LOG.info("Configured to run with " + delegates.length + " delegate WAL providers."); + } + + @Override + WALProvider populateCache(final byte[] group) { + final WALProvider temp = delegates[counter.getAndIncrement() % delegates.length]; + final WALProvider extant = cached.putIfAbsent(group, temp); + // if someone else beat us to initializing, just take what they set. + // note that in such a case we skew load away from the provider we picked at first + return extant == null ? temp : extant; + } + + @Override + public void shutdown() throws IOException { + // save the last exception and rethrow + IOException failure = null; + for (WALProvider provider : delegates) { + try { + provider.shutdown(); + } catch (IOException exception) { + LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage()); + LOG.debug("Details of problem shutting down provider '" + provider + "'", exception); + failure = exception; + } + } + if (failure != null) { + throw failure; + } + } + + @Override + public void close() throws IOException { + // save the last exception and rethrow + IOException failure = null; + for (WALProvider provider : delegates) { + try { + provider.close(); + } catch (IOException exception) { + LOG.error("Problem closing provider '" + provider + "': " + exception.getMessage()); + LOG.debug("Details of problem shutting down provider '" + provider + "'", exception); + failure = exception; + } + } + if (failure != null) { + throw failure; + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java new file mode 100644 index 00000000000..eb2c426d1c4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java @@ -0,0 +1,212 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; + +// imports for classes still in regionserver.wal +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; + +/** + * A WAL Provider that returns a WAL per group of regions. + * + * Region grouping is handled via {@link RegionGroupingStrategy} and can be configured via the + * property "hbase.wal.regiongrouping.strategy". Current strategy choices are + * + * Optionally, a FQCN to a custom implementation may be given. + * + * WAL creation is delegated to another WALProvider, configured via the property + * "hbase.wal.regiongrouping.delegate". The property takes the same options as "hbase.wal.provider" + * (ref {@link WALFactory}) and defaults to the defaultProvider. + */ +@InterfaceAudience.Private +class RegionGroupingProvider implements WALProvider { + private static final Log LOG = LogFactory.getLog(RegionGroupingProvider.class); + + /** + * Map identifiers to a group number. + */ + public static interface RegionGroupingStrategy { + /** + * Given an identifier, pick a group. + * the byte[] returned for a given group must always use the same instance, since we + * will be using it as a hash key. + */ + byte[] group(final byte[] identifier); + void init(Configuration config); + } + + /** + * Maps between configuration names for strategies and implementation classes. + */ + static enum Strategies { + defaultStrategy(IdentityGroupingStrategy.class), + identity(IdentityGroupingStrategy.class); + + final Class clazz; + Strategies(Class clazz) { + this.clazz = clazz; + } + } + + /** + * instantiate a strategy from a config property. + * requires conf to have already been set (as well as anything the provider might need to read). + */ + RegionGroupingStrategy getStrategy(final Configuration conf, final String key, + final String defaultValue) throws IOException { + Class clazz; + try { + clazz = Strategies.valueOf(conf.get(key, defaultValue)).clazz; + } catch (IllegalArgumentException exception) { + // Fall back to them specifying a class name + // Note that the passed default class shouldn't actually be used, since the above only fails + // when there is a config value present. + clazz = conf.getClass(key, IdentityGroupingStrategy.class, RegionGroupingStrategy.class); + } + LOG.info("Instantiating RegionGroupingStrategy of type " + clazz); + try { + final RegionGroupingStrategy result = clazz.newInstance(); + result.init(conf); + return result; + } catch (InstantiationException exception) { + LOG.error("couldn't set up region grouping strategy, check config key " + + REGION_GROUPING_STRATEGY); + LOG.debug("Exception details for failure to load region grouping strategy.", exception); + throw new IOException("couldn't set up region grouping strategy", exception); + } catch (IllegalAccessException exception) { + LOG.error("couldn't set up region grouping strategy, check config key " + + REGION_GROUPING_STRATEGY); + LOG.debug("Exception details for failure to load region grouping strategy.", exception); + throw new IOException("couldn't set up region grouping strategy", exception); + } + } + + private static final String REGION_GROUPING_STRATEGY = "hbase.wal.regiongrouping.strategy"; + private static final String DEFAULT_REGION_GROUPING_STRATEGY = Strategies.defaultStrategy.name(); + + static final String DELEGATE_PROVIDER = "hbase.wal.regiongrouping.delegate"; + static final String DEFAULT_DELEGATE_PROVIDER = WALFactory.Providers.defaultProvider.name(); + + protected final ConcurrentMap cached = + new ConcurrentHashMap(); + + + protected RegionGroupingStrategy strategy = null; + private WALFactory factory = null; + private List listeners = null; + private String providerId = null; + + @Override + public void init(final WALFactory factory, final Configuration conf, + final List listeners, final String providerId) throws IOException { + if (null != strategy) { + throw new IllegalStateException("WALProvider.init should only be called once."); + } + this.factory = factory; + this.listeners = null == listeners ? null : Collections.unmodifiableList(listeners); + this.providerId = providerId; + this.strategy = getStrategy(conf, REGION_GROUPING_STRATEGY, DEFAULT_REGION_GROUPING_STRATEGY); + } + + /** + * Populate the cache for this group. + */ + WALProvider populateCache(final byte[] group) throws IOException { + final WALProvider temp = factory.getProvider(DELEGATE_PROVIDER, DEFAULT_DELEGATE_PROVIDER, + listeners, providerId + "-" + UUID.randomUUID()); + final WALProvider extant = cached.putIfAbsent(group, temp); + if (null != extant) { + // someone else beat us to initializing, just take what they set. + temp.close(); + return extant; + } + return temp; + } + + @Override + public WAL getWAL(final byte[] identifier) throws IOException { + final byte[] group = strategy.group(identifier); + WALProvider provider = cached.get(group); + if (null == provider) { + provider = populateCache(group); + } + return provider.getWAL(identifier); + } + + @Override + public void shutdown() throws IOException { + // save the last exception and rethrow + IOException failure = null; + for (WALProvider provider : cached.values()) { + try { + provider.shutdown(); + } catch (IOException exception) { + LOG.error("Problem shutting down provider '" + provider + "': " + exception.getMessage()); + LOG.debug("Details of problem shutting down provider '" + provider + "'", exception); + failure = exception; + } + } + if (failure != null) { + throw failure; + } + } + + @Override + public void close() throws IOException { + // save the last exception and rethrow + IOException failure = null; + for (WALProvider provider : cached.values()) { + try { + provider.close(); + } catch (IOException exception) { + LOG.error("Problem closing provider '" + provider + "': " + exception.getMessage()); + LOG.debug("Details of problem shutting down provider '" + provider + "'", exception); + failure = exception; + } + } + if (failure != null) { + throw failure; + } + } + + static class IdentityGroupingStrategy implements RegionGroupingStrategy { + @Override + public void init(Configuration config) {} + @Override + public byte[] group(final byte[] identifier) { + return identifier; + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 497a4d865b4..b8dee83d60b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -55,7 +55,12 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; * Configure which provider gets used with the configuration setting "hbase.wal.provider". Available * implementations: *
    - *
  • defaultProvider : whatever provider is standard for the hbase version.
  • + *
  • defaultProvider : whatever provider is standard for the hbase version. Currently + * "filesystem"
  • + *
  • filesystem : a provider that will run on top of an implementation of the Hadoop + * FileSystem interface, normally HDFS.
  • + *
  • multiwal : a provider that will use multiple "filesystem" wal instances per region + * server.
  • *
* * Alternatively, you may provide a custome implementation of {@link WALProvider} by class name. @@ -69,7 +74,9 @@ public class WALFactory { * Maps between configuration names for providers and implementation classes. */ static enum Providers { - defaultProvider(DefaultWALProvider.class); + defaultProvider(DefaultWALProvider.class), + filesystem(DefaultWALProvider.class), + multiwal(BoundedRegionGroupingProvider.class); Class clazz; Providers(Class clazz) { @@ -134,6 +141,7 @@ public class WALFactory { // when there is a config value present. clazz = conf.getClass(key, DefaultWALProvider.class, WALProvider.class); } + LOG.info("Instantiating WALProvider of type " + clazz); try { final WALProvider result = clazz.newInstance(); result.init(this, conf, listeners, providerId); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingProvider.java new file mode 100644 index 00000000000..4e3608c6882 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestBoundedRegionGroupingProvider.java @@ -0,0 +1,182 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Random; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.apache.hadoop.hbase.wal.BoundedRegionGroupingProvider.NUM_REGION_GROUPS; +import static org.apache.hadoop.hbase.wal.BoundedRegionGroupingProvider.DEFAULT_NUM_REGION_GROUPS; +import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category(LargeTests.class) +public class TestBoundedRegionGroupingProvider { + protected static final Log LOG = LogFactory.getLog(TestBoundedRegionGroupingProvider.class); + + @Rule + public TestName currentTest = new TestName(); + protected static Configuration conf; + protected static FileSystem fs; + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + @Before + public void setUp() throws Exception { + FileStatus[] entries = fs.listStatus(new Path("/")); + for (FileStatus dir : entries) { + fs.delete(dir.getPath(), true); + } + } + + @After + public void tearDown() throws Exception { + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = TEST_UTIL.getConfiguration(); + // Make block sizes small. + conf.setInt("dfs.blocksize", 1024 * 1024); + // quicker heartbeat interval for faster DN death notification + conf.setInt("dfs.namenode.heartbeat.recheck-interval", 5000); + conf.setInt("dfs.heartbeat.interval", 1); + conf.setInt("dfs.client.socket-timeout", 5000); + + // faster failover with cluster.shutdown();fs.close() idiom + conf.setInt("hbase.ipc.client.connect.max.retries", 1); + conf.setInt("dfs.client.block.recovery.retries", 1); + conf.setInt("hbase.ipc.client.connection.maxidletime", 500); + + conf.setClass(WAL_PROVIDER, BoundedRegionGroupingProvider.class, WALProvider.class); + + TEST_UTIL.startMiniDFSCluster(3); + + fs = TEST_UTIL.getDFSCluster().getFileSystem(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Write to a log file with three concurrent threads and verifying all data is written. + */ + @Test + public void testConcurrentWrites() throws Exception { + // Run the WPE tool with three threads writing 3000 edits each concurrently. + // When done, verify that all edits were written. + int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf), + new String [] {"-threads", "3", "-verify", "-noclosefs", "-iterations", "3000"}); + assertEquals(0, errCode); + } + + /** + * Make sure we can successfully run with more regions then our bound. + */ + @Test + public void testMoreRegionsThanBound() throws Exception { + final String parallelism = Integer.toString(DEFAULT_NUM_REGION_GROUPS * 2); + int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf), + new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000", + "-regions", parallelism}); + assertEquals(0, errCode); + } + + @Test + public void testBoundsGreaterThanDefault() throws Exception { + final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS); + try { + conf.setInt(NUM_REGION_GROUPS, temp*4); + final String parallelism = Integer.toString(temp*4); + int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf), + new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000", + "-regions", parallelism}); + assertEquals(0, errCode); + } finally { + conf.setInt(NUM_REGION_GROUPS, temp); + } + } + + @Test + public void testMoreRegionsThanBoundWithBoundsGreaterThanDefault() throws Exception { + final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS); + try { + conf.setInt(NUM_REGION_GROUPS, temp*4); + final String parallelism = Integer.toString(temp*4*2); + int errCode = WALPerformanceEvaluation.innerMain(new Configuration(conf), + new String [] {"-threads", parallelism, "-verify", "-noclosefs", "-iterations", "3000", + "-regions", parallelism}); + assertEquals(0, errCode); + } finally { + conf.setInt(NUM_REGION_GROUPS, temp); + } + } + + /** + * Ensure that we can use Set.add to deduplicate WALs + */ + @Test + public void setMembershipDedups() throws IOException { + final int temp = conf.getInt(NUM_REGION_GROUPS, DEFAULT_NUM_REGION_GROUPS); + WALFactory wals = null; + try { + conf.setInt(NUM_REGION_GROUPS, temp*4); + wals = new WALFactory(conf, null, currentTest.getMethodName()); + final Set seen = new HashSet(temp*4); + final Random random = new Random(); + int count = 0; + // we know that this should see one of the wals more than once + for (int i = 0; i < temp*8; i++) { + final WAL maybeNewWAL = wals.getWAL(Bytes.toBytes(random.nextInt())); + LOG.info("Iteration " + i + ", checking wal " + maybeNewWAL); + if (seen.add(maybeNewWAL)) { + count++; + } + } + assertEquals("received back a different number of WALs that are not equal() to each other " + + "than the bound we placed.", temp*4, count); + } finally { + if (wals != null) { + wals.close(); + } + conf.setInt(NUM_REGION_GROUPS, temp); + } + } +}