diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java index fcc9af715e9..cea9304e1f0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java @@ -63,6 +63,11 @@ import java.util.List; @InterfaceAudience.Public @InterfaceStability.Evolving public interface BufferedMutator extends Closeable { + /** + * Key to use setting non-default BufferedMutator implementation in Configuration. + */ + public static final String CLASSNAME_KEY = "hbase.client.bufferedmutator.classname"; + /** * Gets the fully qualified table name instance of the table that this BufferedMutator writes to. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 54955a080a4..0085767b24f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -41,11 +41,13 @@ import java.util.concurrent.atomic.AtomicLong; *

* Used to communicate with a single HBase table similar to {@link Table} * but meant for batched, potentially asynchronous puts. Obtain an instance from - * a {@link Connection} and call {@link #close()} afterwards. + * a {@link Connection} and call {@link #close()} afterwards. Provide an alternate + * to this implementation by setting {@link BufferedMutatorParams#implementationClassName(String)} + * or by setting alternate classname via the key {} in Configuration. *

* *

- * While this can be used accross threads, great care should be used when doing so. + * While this can be used across threads, great care should be used when doing so. * Errors are global to the buffered mutator and the Exceptions can be thrown on any * thread that causes the flush for requests. *

@@ -57,6 +59,12 @@ import java.util.concurrent.atomic.AtomicLong; @InterfaceAudience.Private @InterfaceStability.Evolving public class BufferedMutatorImpl implements BufferedMutator { + /** + * Key to use setting non-default BufferedMutator implementation + * classname via Configuration. + */ + public static final String HBASE_BUFFEREDMUTATOR_CLASSNAME_KEY = + "hbase.client.bufferedmutator.classname"; private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java index d4cdeadd34c..aacb5f34205 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java @@ -38,6 +38,8 @@ public class BufferedMutatorParams { private long writeBufferSize = UNSET; private int maxKeyValueSize = UNSET; private ExecutorService pool = null; + private String implementationClassName = null; + private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() { @Override public void onException(RetriesExhaustedWithDetailsException exception, @@ -96,6 +98,23 @@ public class BufferedMutatorParams { return this; } + /** + * @return Name of the class we will use when we construct a + * {@link BufferedMutator} instance or null if default implementation. + */ + public String getImplementationClassName() { + return this.implementationClassName; + } + + /** + * Specify a BufferedMutator implementation other than the default. + * @param implementationClassName Name of the BufferedMutator implementation class + */ + public BufferedMutatorParams implementationClassName(String implementationClassName) { + this.implementationClassName = implementationClassName; + return this; + } + public BufferedMutator.ExceptionListener getListener() { return listener; } @@ -107,4 +126,4 @@ public class BufferedMutatorParams { this.listener = listener; return this; } -} +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index e75d9a545d8..0c512beccbd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -184,6 +185,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable { private final ClientBackoffPolicy backoffPolicy; + /** + * Allow setting an alternate BufferedMutator implementation via + * config. If null, use default. + */ + private final String alternateBufferedMutatorClassName; + /** * constructor * @param conf Configuration object @@ -244,6 +251,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable { ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class); + // Is there an alternate BufferedMutator to use? + this.alternateBufferedMutatorClassName = + this.conf.get(BufferedMutator.CLASSNAME_KEY); + try { this.registry = setupRegistry(); retrieveClusterId(); @@ -315,7 +326,21 @@ class ConnectionImplementation implements ClusterConnection, Closeable { if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) { params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize()); } - return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params); + // Look to see if an alternate BufferedMutation implementation is wanted. + // Look in params and in config. If null, use default. + String implementationClassName = params.getImplementationClassName(); + if (implementationClassName == null) { + implementationClassName = this.alternateBufferedMutatorClassName; + } + if (implementationClassName == null) { + return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params); + } + try { + return (BufferedMutator)ReflectionUtils.newInstance(Class.forName(implementationClassName), + this, rpcCallerFactory, rpcControllerFactory, params); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } } @Override diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java new file mode 100644 index 00000000000..84eb948b022 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({SmallTests.class, ClientTests.class}) +public class TestBufferedMutator { + + /** + * Registry that does nothing. + * Otherwise, default Registry wants zookeeper up and running. + */ + public static class DoNothingRegistry implements Registry { + @Override + public void init(Connection connection) { + // TODO Auto-generated method stub + } + + @Override + public RegionLocations getMetaRegionLocation() throws IOException { + // TODO Auto-generated method stub + return null; + } + + @Override + public String getClusterId() { + // TODO Auto-generated method stub + return null; + } + + @Override + public int getCurrentNrHRS() throws IOException { + // TODO Auto-generated method stub + return 0; + } + } + + /** + * My BufferedMutator. + * Just to prove that I can insert a BM other than default. + */ + public static class MyBufferedMutator extends BufferedMutatorImpl { + MyBufferedMutator(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory, + RpcControllerFactory rpcFactory, BufferedMutatorParams params) { + super(conn, rpcCallerFactory, rpcFactory, params); + } + } + + @Test + public void testAlternateBufferedMutatorImpl() throws IOException { + BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf("t")); + Configuration conf = HBaseConfiguration.create(); + conf.set(RegistryFactory.REGISTRY_IMPL_CONF_KEY, DoNothingRegistry.class.getName()); + try (Connection connection = ConnectionFactory.createConnection(conf)) { + BufferedMutator bm = connection.getBufferedMutator(params); + // Assert we get default BM if nothing specified. + assertTrue(bm instanceof BufferedMutatorImpl); + // Now try and set my own BM implementation. + params.implementationClassName(MyBufferedMutator.class.getName()); + bm = connection.getBufferedMutator(params); + assertTrue(bm instanceof MyBufferedMutator); + } + // Now try creating a Connection after setting an alterate BufferedMutator into + // the configuration and confirm we get what was expected. + conf.set(BufferedMutator.CLASSNAME_KEY, MyBufferedMutator.class.getName()); + try (Connection connection = ConnectionFactory.createConnection(conf)) { + BufferedMutator bm = connection.getBufferedMutator(params); + assertTrue(bm instanceof MyBufferedMutator); + } + } +}