From d1487fcfad2d4d7607d244cdfd877195242522e4 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sun, 14 Apr 2019 20:32:38 +0800 Subject: [PATCH] HBASE-21725 Implement BufferedMutator Based on AsyncBufferedMutator --- .../hadoop/hbase/client/BufferedMutator.java | 10 + ...fferedMutatorOverAsyncBufferedMutator.java | 175 ++++++++++++++++++ .../hbase/client/BufferedMutatorParams.java | 23 ++- .../client/ConnectionOverAsyncConnection.java | 19 +- .../hbase/client/TestBufferedMutator.java | 82 -------- .../hbase/client/TestBufferedMutator.java | 90 +++++++++ 6 files changed, 309 insertions(+), 90 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java delete mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java index 7805f77e30e..8ad6a792230 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java @@ -62,7 +62,11 @@ import org.apache.yetus.audience.InterfaceAudience; public interface BufferedMutator extends Closeable { /** * Key to use setting non-default BufferedMutator implementation in Configuration. + *

+ * @deprecated Since 3.0.0, will be removed in 4.0.0. For internal test use only, do not use it + * any more. */ + @Deprecated String CLASSNAME_KEY = "hbase.client.bufferedmutator.classname"; /** @@ -179,12 +183,18 @@ public interface BufferedMutator extends Closeable { /** * Set rpc timeout for this mutator instance + * @deprecated Since 3.0.0, will be removed in 4.0.0. Please set this through the + * {@link BufferedMutatorParams}. */ + @Deprecated void setRpcTimeout(int timeout); /** * Set operation timeout for this mutator instance + * @deprecated Since 3.0.0, will be removed in 4.0.0. Please set this through the + * {@link BufferedMutatorParams}. */ + @Deprecated void setOperationTimeout(int timeout); /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java new file mode 100644 index 00000000000..a7d4595c475 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorOverAsyncBufferedMutator.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * {@link BufferedMutator} implementation based on {@link AsyncBufferedMutator}. + */ +@InterfaceAudience.Private +class BufferedMutatorOverAsyncBufferedMutator implements BufferedMutator { + + private final AsyncBufferedMutator mutator; + + private final ExceptionListener listener; + + private List> futures = new ArrayList<>(); + + private final ConcurrentLinkedQueue> errors = + new ConcurrentLinkedQueue<>(); + + private final static int BUFFERED_FUTURES_THRESHOLD = 1024; + + BufferedMutatorOverAsyncBufferedMutator(AsyncBufferedMutator mutator, + ExceptionListener listener) { + this.mutator = mutator; + this.listener = listener; + } + + @Override + public TableName getName() { + return mutator.getName(); + } + + @Override + public Configuration getConfiguration() { + return mutator.getConfiguration(); + } + + @Override + public void mutate(Mutation mutation) throws IOException { + mutate(Collections.singletonList(mutation)); + } + + private static final Pattern ADDR_MSG_MATCHER = Pattern.compile("Call to (\\S+) failed"); + + // not always work, so may return an empty string + private String getHostnameAndPort(Throwable error) { + Matcher matcher = ADDR_MSG_MATCHER.matcher(error.getMessage()); + if (matcher.matches()) { + return matcher.group(1); + } else { + return ""; + } + } + + private RetriesExhaustedWithDetailsException makeError() { + List rows = new ArrayList<>(); + List throwables = new ArrayList<>(); + List hostnameAndPorts = new ArrayList<>(); + for (;;) { + Pair pair = errors.poll(); + if (pair == null) { + break; + } + rows.add(pair.getFirst()); + throwables.add(pair.getSecond()); + hostnameAndPorts.add(getHostnameAndPort(pair.getSecond())); + } + return new RetriesExhaustedWithDetailsException(throwables, rows, hostnameAndPorts); + } + + @Override + public void mutate(List mutations) throws IOException { + List> toBuffered = new ArrayList<>(); + List> fs = mutator.mutate(mutations); + for (int i = 0, n = fs.size(); i < n; i++) { + CompletableFuture toComplete = new CompletableFuture<>(); + final int index = i; + addListener(fs.get(index), (r, e) -> { + if (e != null) { + errors.add(Pair.newPair(mutations.get(index), e)); + toComplete.completeExceptionally(e); + } else { + toComplete.complete(r); + } + }); + toBuffered.add(toComplete); + } + synchronized (this) { + futures.addAll(toBuffered); + if (futures.size() > BUFFERED_FUTURES_THRESHOLD) { + tryCompleteFuture(); + } + if (!errors.isEmpty()) { + RetriesExhaustedWithDetailsException error = makeError(); + listener.onException(error, this); + } + } + } + + private void tryCompleteFuture() { + futures = futures.stream().filter(f -> !f.isDone()).collect(Collectors.toList()); + } + + @Override + public void close() throws IOException { + flush(); + mutator.close(); + } + + @Override + public void flush() throws IOException { + mutator.flush(); + synchronized (this) { + List> toComplete = this.futures; + this.futures = new ArrayList<>(); + try { + CompletableFuture.allOf(toComplete.toArray(new CompletableFuture[toComplete.size()])) + .join(); + } catch (CompletionException e) { + // just ignore, we will record the actual error in the errors field + } + if (!errors.isEmpty()) { + RetriesExhaustedWithDetailsException error = makeError(); + listener.onException(error, this); + } + } + } + + @Override + public long getWriteBufferSize() { + return mutator.getWriteBufferSize(); + } + + @Override + public void setRpcTimeout(int timeout) { + // no effect + } + + @Override + public void setOperationTimeout(int timeout) { + // no effect + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java index 3f6c56570e7..49fb77baef8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorParams.java @@ -101,13 +101,21 @@ public class BufferedMutatorParams implements Cloneable { return this; } + /** + * @deprecated Since 3.0.0, will be removed in 4.0.0. We use a common timer in the whole client + * implementation so you can not set it any more. + */ + @Deprecated public long getWriteBufferPeriodicFlushTimerTickMs() { return writeBufferPeriodicFlushTimerTickMs; } /** * Set the TimerTick how often the buffer timeout if checked. + * @deprecated Since 3.0.0, will be removed in 4.0.0. We use a common timer in the whole client + * implementation so you can not set it any more. */ + @Deprecated public BufferedMutatorParams setWriteBufferPeriodicFlushTimerTickMs(long timerTickMs) { this.writeBufferPeriodicFlushTimerTickMs = timerTickMs; return this; @@ -141,9 +149,12 @@ public class BufferedMutatorParams implements Cloneable { } /** - * @return Name of the class we will use when we construct a - * {@link BufferedMutator} instance or null if default implementation. + * @return Name of the class we will use when we construct a {@link BufferedMutator} instance or + * null if default implementation. + * @deprecated Since 3.0.0, will be removed in 4.0.0. You can not set it any more as the + * implementation has to use too many internal stuffs in HBase. */ + @Deprecated public String getImplementationClassName() { return this.implementationClassName; } @@ -151,7 +162,10 @@ public class BufferedMutatorParams implements Cloneable { /** * Specify a BufferedMutator implementation other than the default. * @param implementationClassName Name of the BufferedMutator implementation class + * @deprecated Since 3.0.0, will be removed in 4.0.0. You can not set it any more as the + * implementation has to use too many internal stuffs in HBase. */ + @Deprecated public BufferedMutatorParams implementationClassName(String implementationClassName) { this.implementationClassName = implementationClassName; return this; @@ -169,11 +183,6 @@ public class BufferedMutatorParams implements Cloneable { return this; } - /* - * (non-Javadoc) - * - * @see java.lang.Object#clone() - */ @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="CN_IDIOM_NO_SUPER_CALL", justification="The clone below is complete") @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java index dfe7d8fb085..8ec7ab83504 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java @@ -87,7 +87,24 @@ class ConnectionOverAsyncConnection implements Connection { @Override public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException { - return oldConn.getBufferedMutator(params); + AsyncBufferedMutatorBuilder builder = conn.getBufferedMutatorBuilder(params.getTableName()); + if (params.getRpcTimeout() != BufferedMutatorParams.UNSET) { + builder.setRpcTimeout(params.getRpcTimeout(), TimeUnit.MILLISECONDS); + } + if (params.getOperationTimeout() != BufferedMutatorParams.UNSET) { + builder.setOperationTimeout(params.getOperationTimeout(), TimeUnit.MILLISECONDS); + } + if (params.getWriteBufferSize() != BufferedMutatorParams.UNSET) { + builder.setWriteBufferSize(params.getWriteBufferSize()); + } + if (params.getWriteBufferPeriodicFlushTimeoutMs() != BufferedMutatorParams.UNSET) { + builder.setWriteBufferPeriodicFlush(params.getWriteBufferPeriodicFlushTimeoutMs(), + TimeUnit.MILLISECONDS); + } + if (params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET) { + builder.setMaxKeyValueSize(params.getMaxKeyValueSize()); + } + return new BufferedMutatorOverAsyncBufferedMutator(builder.build(), params.getListener()); } @Override diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java deleted file mode 100644 index 96bb8461284..00000000000 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.security.UserProvider; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; - -@Category({ SmallTests.class, ClientTests.class }) -public class TestBufferedMutator { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestBufferedMutator.class); - - @Rule - public TestName name = new TestName(); - - /** - * My BufferedMutator. Just to prove that I can insert a BM other than default. - */ - public static class MyBufferedMutator extends BufferedMutatorImpl { - MyBufferedMutator(ConnectionImplementation conn, RpcRetryingCallerFactory rpcCallerFactory, - RpcControllerFactory rpcFactory, BufferedMutatorParams params) { - super(conn, rpcCallerFactory, rpcFactory, params); - } - } - - @Test - public void testAlternateBufferedMutatorImpl() throws IOException { - BufferedMutatorParams params = - new BufferedMutatorParams(TableName.valueOf(name.getMethodName())); - Configuration conf = HBaseConfiguration.create(); - conf.set(AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY, DoNothingAsyncRegistry.class.getName()); - try (ConnectionImplementation connection = ConnectionFactory.createConnectionImpl(conf, null, - UserProvider.instantiate(conf).getCurrent())) { - BufferedMutator bm = connection.getBufferedMutator(params); - // Assert we get default BM if nothing specified. - assertTrue(bm instanceof BufferedMutatorImpl); - // Now try and set my own BM implementation. - params.implementationClassName(MyBufferedMutator.class.getName()); - bm = connection.getBufferedMutator(params); - assertTrue(bm instanceof MyBufferedMutator); - } - // Now try creating a Connection after setting an alterate BufferedMutator into - // the configuration and confirm we get what was expected. - conf.set(BufferedMutator.CLASSNAME_KEY, MyBufferedMutator.class.getName()); - try (Connection connection = ConnectionFactory.createConnectionImpl(conf, null, - UserProvider.instantiate(conf).getCurrent())) { - BufferedMutator bm = connection.getBufferedMutator(params); - assertTrue(bm instanceof MyBufferedMutator); - } - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java new file mode 100644 index 00000000000..23e69ee82b1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBufferedMutator.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertArrayEquals; + +import java.io.IOException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestBufferedMutator { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBufferedMutator.class); + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("test"); + + private static byte[] CF = Bytes.toBytes("cf"); + + private static byte[] CQ = Bytes.toBytes("cq"); + + private static int COUNT = 1024; + + private static byte[] VALUE = new byte[1024]; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.createTable(TABLE_NAME, CF); + } + + @AfterClass + public static void tearDown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + try (BufferedMutator mutator = TEST_UTIL.getConnection().getBufferedMutator(TABLE_NAME)) { + mutator.mutate(IntStream.range(0, COUNT / 2) + .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)) + .collect(Collectors.toList())); + mutator.flush(); + mutator.mutate(IntStream.range(COUNT / 2, COUNT) + .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)) + .collect(Collectors.toList())); + mutator.close(); + verifyData(); + } + } + + private void verifyData() throws IOException { + try (Table table = TEST_UTIL.getConnection().getTable(TABLE_NAME)) { + for (int i = 0; i < COUNT; i++) { + Result r = table.get(new Get(Bytes.toBytes(i))); + assertArrayEquals(VALUE, ((Result) r).getValue(CF, CQ)); + } + } + } +}