HBASE-17277 Allow alternate BufferedMutator implemenation Specify the name of an alternate BufferedMutator implementation by either:

+ Setting "hbase.client.bufferedmutator.classname" to the name of the
alternate implementation class
+ Or, by setting implementationClassName on BufferedMutatorParams and
passing the amended BufferedMutatorParams when calling Connection#getBufferedMutator.

Add a test to exercise both means.
This commit is contained in:
Michael Stack 2016-12-07 12:33:23 -08:00
parent a9310436d5
commit 68ce3f1e3b
5 changed files with 161 additions and 4 deletions

View File

@ -63,6 +63,11 @@ import java.util.List;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface BufferedMutator extends Closeable {
/**
* Key to use setting non-default BufferedMutator implementation in Configuration.
*/
public static final String CLASSNAME_KEY = "hbase.client.bufferedmutator.classname";
/**
* Gets the fully qualified table name instance of the table that this BufferedMutator writes to.
*/

View File

@ -41,11 +41,13 @@ import java.util.concurrent.atomic.AtomicLong;
* <p>
* Used to communicate with a single HBase table similar to {@link Table}
* but meant for batched, potentially asynchronous puts. Obtain an instance from
* a {@link Connection} and call {@link #close()} afterwards.
* a {@link Connection} and call {@link #close()} afterwards. Provide an alternate
* to this implementation by setting {@link BufferedMutatorParams#implementationClassName(String)}
* or by setting alternate classname via the key {} in Configuration.
* </p>
*
* <p>
* While this can be used accross threads, great care should be used when doing so.
* While this can be used across threads, great care should be used when doing so.
* Errors are global to the buffered mutator and the Exceptions can be thrown on any
* thread that causes the flush for requests.
* </p>
@ -57,6 +59,12 @@ import java.util.concurrent.atomic.AtomicLong;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BufferedMutatorImpl implements BufferedMutator {
/**
* Key to use setting non-default BufferedMutator implementation
* classname via Configuration.
*/
public static final String HBASE_BUFFEREDMUTATOR_CLASSNAME_KEY =
"hbase.client.bufferedmutator.classname";
private static final Log LOG = LogFactory.getLog(BufferedMutatorImpl.class);

View File

@ -38,6 +38,8 @@ public class BufferedMutatorParams {
private long writeBufferSize = UNSET;
private int maxKeyValueSize = UNSET;
private ExecutorService pool = null;
private String implementationClassName = null;
private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
@Override
public void onException(RetriesExhaustedWithDetailsException exception,
@ -96,6 +98,23 @@ public class BufferedMutatorParams {
return this;
}
/**
* @return Name of the class we will use when we construct a
* {@link BufferedMutator} instance or null if default implementation.
*/
public String getImplementationClassName() {
return this.implementationClassName;
}
/**
* Specify a BufferedMutator implementation other than the default.
* @param implementationClassName Name of the BufferedMutator implementation class
*/
public BufferedMutatorParams implementationClassName(String implementationClassName) {
this.implementationClassName = implementationClassName;
return this;
}
public BufferedMutator.ExceptionListener getListener() {
return listener;
}

View File

@ -92,6 +92,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -184,6 +185,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
private final ClientBackoffPolicy backoffPolicy;
/**
* Allow setting an alternate BufferedMutator implementation via
* config. If null, use default.
*/
private final String alternateBufferedMutatorClassName;
/**
* constructor
* @param conf Configuration object
@ -244,6 +251,10 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
ClusterStatusListener.Listener.class);
// Is there an alternate BufferedMutator to use?
this.alternateBufferedMutatorClassName =
this.conf.get(BufferedMutator.CLASSNAME_KEY);
try {
this.registry = setupRegistry();
retrieveClusterId();
@ -315,7 +326,21 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
if (params.getMaxKeyValueSize() == BufferedMutatorParams.UNSET) {
params.maxKeyValueSize(connectionConfig.getMaxKeyValueSize());
}
return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
// Look to see if an alternate BufferedMutation implementation is wanted.
// Look in params and in config. If null, use default.
String implementationClassName = params.getImplementationClassName();
if (implementationClassName == null) {
implementationClassName = this.alternateBufferedMutatorClassName;
}
if (implementationClassName == null) {
return new BufferedMutatorImpl(this, rpcCallerFactory, rpcControllerFactory, params);
}
try {
return (BufferedMutator)ReflectionUtils.newInstance(Class.forName(implementationClassName),
this, rpcCallerFactory, rpcControllerFactory, params);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
@Override

View File

@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({SmallTests.class, ClientTests.class})
public class TestBufferedMutator {
/**
* Registry that does nothing.
* Otherwise, default Registry wants zookeeper up and running.
*/
public static class DoNothingRegistry implements Registry {
@Override
public void init(Connection connection) {
// TODO Auto-generated method stub
}
@Override
public RegionLocations getMetaRegionLocation() throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public String getClusterId() {
// TODO Auto-generated method stub
return null;
}
@Override
public int getCurrentNrHRS() throws IOException {
// TODO Auto-generated method stub
return 0;
}
}
/**
* My BufferedMutator.
* Just to prove that I can insert a BM other than default.
*/
public static class MyBufferedMutator extends BufferedMutatorImpl {
MyBufferedMutator(ClusterConnection conn, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcFactory, BufferedMutatorParams params) {
super(conn, rpcCallerFactory, rpcFactory, params);
}
}
@Test
public void testAlternateBufferedMutatorImpl() throws IOException {
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf("t"));
Configuration conf = HBaseConfiguration.create();
conf.set(RegistryFactory.REGISTRY_IMPL_CONF_KEY, DoNothingRegistry.class.getName());
try (Connection connection = ConnectionFactory.createConnection(conf)) {
BufferedMutator bm = connection.getBufferedMutator(params);
// Assert we get default BM if nothing specified.
assertTrue(bm instanceof BufferedMutatorImpl);
// Now try and set my own BM implementation.
params.implementationClassName(MyBufferedMutator.class.getName());
bm = connection.getBufferedMutator(params);
assertTrue(bm instanceof MyBufferedMutator);
}
// Now try creating a Connection after setting an alterate BufferedMutator into
// the configuration and confirm we get what was expected.
conf.set(BufferedMutator.CLASSNAME_KEY, MyBufferedMutator.class.getName());
try (Connection connection = ConnectionFactory.createConnection(conf)) {
BufferedMutator bm = connection.getBufferedMutator(params);
assertTrue(bm instanceof MyBufferedMutator);
}
}
}