diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java new file mode 100644 index 00000000000..ad9279b7641 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutator.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.Closeable; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Used to communicate with a single HBase table in batches. Obtain an instance from a + * {@link AsyncConnection} and call {@link #close()} afterwards. + *

+ * The implementation is required to be thread safe. + */ +@InterfaceAudience.Public +public interface AsyncBufferedMutator extends Closeable { + + /** + * Gets the fully qualified table name instance of the table that this + * {@code AsyncBufferedMutator} writes to. + */ + TableName getName(); + + /** + * Returns the {@link org.apache.hadoop.conf.Configuration} object used by this instance. + *

+ * The reference returned is not a copy, so any change made to it will affect this instance. + */ + Configuration getConfiguration(); + + /** + * Sends a {@link Mutation} to the table. The mutations will be buffered and sent over the wire as + * part of a batch. Currently only supports {@link Put} and {@link Delete} mutations. + * @param mutation The data to send. + */ + CompletableFuture mutate(Mutation mutation); + + /** + * Send some {@link Mutation}s to the table. The mutations will be buffered and sent over the wire + * as part of a batch. There is no guarantee of sending entire content of {@code mutations} in a + * single batch, the implementations are free to break it up according to the write buffer + * capacity. + * @param mutations The data to send. + */ + List> mutate(List mutations); + + /** + * Executes all the buffered, asynchronous operations. + */ + void flush(); + + /** + * Performs a {@link #flush()} and releases any resources held. + */ + @Override + void close(); + + /** + * Returns the maximum size in bytes of the write buffer. + *

+ * The default value comes from the configuration parameter {@code hbase.client.write.buffer}. + * @return The size of the write buffer in bytes. + */ + long getWriteBufferSize(); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java new file mode 100644 index 00000000000..d47ba00ef89 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilder.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * For creating {@link AsyncBufferedMutator}. + */ +@InterfaceAudience.Public +public interface AsyncBufferedMutatorBuilder { + + /** + * Set timeout for the background flush operation. + */ + AsyncBufferedMutatorBuilder setOperationTimeout(long timeout, TimeUnit unit); + + /** + * Set timeout for each rpc request when doing background flush. + */ + AsyncBufferedMutatorBuilder setRpcTimeout(long timeout, TimeUnit unit); + + /** + * Set the base pause time for retrying. We use an exponential policy to generate sleep time when + * retrying. + */ + AsyncBufferedMutatorBuilder setRetryPause(long pause, TimeUnit unit); + + /** + * Set the max retry times for an operation. Usually it is the max attempt times minus 1. + *

+ * Operation timeout and max attempt times(or max retry times) are both limitations for retrying, + * we will stop retrying when we reach any of the limitations. + * @see #setMaxAttempts(int) + * @see #setOperationTimeout(long, TimeUnit) + */ + default AsyncBufferedMutatorBuilder setMaxRetries(int maxRetries) { + return setMaxAttempts(retries2Attempts(maxRetries)); + } + + /** + * Set the max attempt times for an operation. Usually it is the max retry times plus 1. Operation + * timeout and max attempt times(or max retry times) are both limitations for retrying, we will + * stop retrying when we reach any of the limitations. + * @see #setMaxRetries(int) + * @see #setOperationTimeout(long, TimeUnit) + */ + AsyncBufferedMutatorBuilder setMaxAttempts(int maxAttempts); + + /** + * Set the number of retries that are allowed before we start to log. + */ + AsyncBufferedMutatorBuilder setStartLogErrorsCnt(int startLogErrorsCnt); + + /** + * Override the write buffer size specified by the provided {@link AsyncConnection}'s + * {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key + * {@code hbase.client.write.buffer}. + */ + AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize); + + /** + * Create the {@link AsyncBufferedMutator} instance. + */ + AsyncBufferedMutator build(); +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java new file mode 100644 index 00000000000..0c5ab5a8a53 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorBuilderImpl.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.common.base.Preconditions; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * The implementation of {@link AsyncBufferedMutatorBuilder}. + */ +@InterfaceAudience.Private +class AsyncBufferedMutatorBuilderImpl implements AsyncBufferedMutatorBuilder { + + private final AsyncTableBuilder tableBuilder; + + private long writeBufferSize; + + public AsyncBufferedMutatorBuilderImpl(AsyncConnectionConfiguration connConf, + AsyncTableBuilder tableBuilder) { + this.tableBuilder = tableBuilder; + this.writeBufferSize = connConf.getWriteBufferSize(); + } + + @Override + public AsyncBufferedMutatorBuilder setOperationTimeout(long timeout, TimeUnit unit) { + tableBuilder.setOperationTimeout(timeout, unit); + return this; + } + + @Override + public AsyncBufferedMutatorBuilder setRpcTimeout(long timeout, TimeUnit unit) { + tableBuilder.setRpcTimeout(timeout, unit); + return this; + } + + @Override + public AsyncBufferedMutatorBuilder setRetryPause(long pause, TimeUnit unit) { + tableBuilder.setRetryPause(pause, unit); + return this; + } + + @Override + public AsyncBufferedMutatorBuilder setMaxAttempts(int maxAttempts) { + tableBuilder.setMaxAttempts(maxAttempts); + return this; + } + + @Override + public AsyncBufferedMutatorBuilder setStartLogErrorsCnt(int startLogErrorsCnt) { + tableBuilder.setStartLogErrorsCnt(startLogErrorsCnt); + return this; + } + + @Override + public AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize) { + Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must be >= 0", + writeBufferSize); + this.writeBufferSize = writeBufferSize; + return this; + } + + @Override + public AsyncBufferedMutator build() { + return new AsyncBufferedMutatorImpl(tableBuilder.build(), writeBufferSize); + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java new file mode 100644 index 00000000000..01180176f80 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBufferedMutatorImpl.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * The implementation of {@link AsyncBufferedMutator}. Simply wrap an {@link AsyncTableBase}. + */ +@InterfaceAudience.Private +class AsyncBufferedMutatorImpl implements AsyncBufferedMutator { + + private final AsyncTableBase table; + + private final long writeBufferSize; + + private List mutations = new ArrayList<>(); + + private List> futures = new ArrayList<>(); + + private long bufferedSize; + + private boolean closed; + + AsyncBufferedMutatorImpl(AsyncTableBase table, long writeBufferSize) { + this.table = table; + this.writeBufferSize = writeBufferSize; + } + + @Override + public TableName getName() { + return table.getName(); + } + + @Override + public Configuration getConfiguration() { + return table.getConfiguration(); + } + + private void internalFlush() { + List toSend = this.mutations; + if (toSend.isEmpty()) { + return; + } + List> toComplete = this.futures; + assert toSend.size() == toComplete.size(); + this.mutations = new ArrayList<>(); + this.futures = new ArrayList<>(); + bufferedSize = 0L; + Iterator> toCompleteIter = toComplete.iterator(); + for (CompletableFuture future : table.batch(toSend)) { + future.whenComplete((r, e) -> { + CompletableFuture f = toCompleteIter.next(); + if (e != null) { + f.completeExceptionally(e); + } else { + f.complete(null); + } + }); + } + } + + @Override + public CompletableFuture mutate(Mutation mutation) { + CompletableFuture future = new CompletableFuture(); + long heapSize = mutation.heapSize(); + synchronized (this) { + if (closed) { + future.completeExceptionally(new IOException("Already closed")); + return future; + } + mutations.add(mutation); + futures.add(future); + bufferedSize += heapSize; + if (bufferedSize >= writeBufferSize) { + internalFlush(); + } + } + return future; + } + + @Override + public List> mutate(List mutations) { + List> futures = + Stream.> generate(CompletableFuture::new).limit(mutations.size()) + .collect(Collectors.toList()); + long heapSize = mutations.stream().mapToLong(m -> m.heapSize()).sum(); + synchronized (this) { + if (closed) { + IOException ioe = new IOException("Already closed"); + futures.forEach(f -> f.completeExceptionally(ioe)); + return futures; + } + this.mutations.addAll(mutations); + this.futures.addAll(futures); + bufferedSize += heapSize; + if (bufferedSize >= writeBufferSize) { + internalFlush(); + } + } + return futures; + } + + @Override + public synchronized void flush() { + internalFlush(); + } + + @Override + public synchronized void close() { + internalFlush(); + closed = true; + } + + @Override + public long getWriteBufferSize() { + return writeBufferSize; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java index 24907baba14..8d26368099b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java @@ -136,4 +136,43 @@ public interface AsyncConnection extends Closeable { * @param pool the thread pool to use for executing callback */ AsyncAdminBuilder getAdminBuilder(ExecutorService pool); + + /** + * Retrieve an {@link AsyncBufferedMutator} for performing client-side buffering of writes. + *

+ * The returned instance will use default configs. Use + * {@link #getBufferedMutatorBuilder(TableName)} if you want to customize some configs. + * @param tableName the name of the table + * @return an {@link AsyncBufferedMutator} for the supplied tableName. + */ + default AsyncBufferedMutator getBufferedMutator(TableName tableName) { + return getBufferedMutatorBuilder(tableName).build(); + } + + /** + * Returns an {@link AsyncBufferedMutatorBuilder} for creating {@link AsyncBufferedMutator}. + * @param tableName the name of the table + */ + AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName); + + /** + * Retrieve an {@link AsyncBufferedMutator} for performing client-side buffering of writes. + *

+ * The returned instance will use default configs. Use + * {@link #getBufferedMutatorBuilder(TableName, ExecutorService)} if you want to customize some + * configs. + * @param tableName the name of the table + * @param pool the thread pool to use for executing callback + * @return an {@link AsyncBufferedMutator} for the supplied tableName. + */ + default AsyncBufferedMutator getBufferedMutator(TableName tableName, ExecutorService pool) { + return getBufferedMutatorBuilder(tableName, pool).build(); + } + + /** + * Returns an {@link AsyncBufferedMutatorBuilder} for creating {@link AsyncBufferedMutator}. + * @param tableName the name of the table + * @param pool the thread pool to use for executing callback + */ + AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, ExecutorService pool); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index 83caea276a2..a15ff8d3cbd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -39,6 +39,8 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY; import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT; import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_KEY; import java.util.concurrent.TimeUnit; @@ -87,6 +89,8 @@ class AsyncConnectionConfiguration { private final long scannerMaxResultSize; + private final long writeBufferSize; + @SuppressWarnings("deprecation") AsyncConnectionConfiguration(Configuration conf) { this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( @@ -112,6 +116,7 @@ class AsyncConnectionConfiguration { this.metaScannerCaching = conf.getInt(HBASE_META_SCANNER_CACHING, DEFAULT_HBASE_META_SCANNER_CACHING); this.scannerMaxResultSize = conf.getLong(HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT); } long getMetaOperationTimeoutNs() { @@ -161,4 +166,8 @@ class AsyncConnectionConfiguration { long getScannerMaxResultSize() { return scannerMaxResultSize; } + + long getWriteBufferSize() { + return writeBufferSize; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index d8f051fbbe7..800ce15d4d6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -297,4 +297,15 @@ class AsyncConnectionImpl implements AsyncConnection { } }; } + + @Override + public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) { + return new AsyncBufferedMutatorBuilderImpl(connConf, getRawTableBuilder(tableName)); + } + + @Override + public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, + ExecutorService pool) { + return new AsyncBufferedMutatorBuilderImpl(connConf, getTableBuilder(tableName, pool)); + } } \ No newline at end of file diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java new file mode 100644 index 00000000000..dca66d58dad --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncBufferMutator.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestAsyncBufferMutator { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("async"); + + private static byte[] CF = Bytes.toBytes("cf"); + + private static byte[] CQ = Bytes.toBytes("cq"); + + private static int COUNT = 100; + + private static byte[] VALUE = new byte[1024]; + + private static AsyncConnection CONN; + + @BeforeClass + public static void setUp() throws Exception { + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.createTable(TABLE_NAME, CF); + CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + ThreadLocalRandom.current().nextBytes(VALUE); + } + + @AfterClass + public static void tearDown() throws Exception { + CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws InterruptedException { + List> futures = new ArrayList<>(); + try (AsyncBufferedMutator mutator = + CONN.getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferSize(16 * 1024).build()) { + List> fs = mutator.mutate(IntStream.range(0, COUNT / 2) + .mapToObj(i -> new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE)) + .collect(Collectors.toList())); + // exceeded the write buffer size, a flush will be called directly + fs.forEach(f -> f.join()); + IntStream.range(COUNT / 2, COUNT).forEach(i -> { + futures.add(mutator.mutate(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, VALUE))); + }); + // the first future should have been sent out. + futures.get(0).join(); + Thread.sleep(2000); + // the last one should still be in write buffer + assertFalse(futures.get(futures.size() - 1).isDone()); + } + // mutator.close will call mutator.flush automatically so all tasks should have been done. + futures.forEach(f -> f.join()); + RawAsyncTable table = CONN.getRawTable(TABLE_NAME); + IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes(i))).map(g -> table.get(g).join()) + .forEach(r -> { + assertArrayEquals(VALUE, r.getValue(CF, CQ)); + }); + } + + @Test + public void testClosedMutate() throws InterruptedException { + AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME); + mutator.close(); + Put put = new Put(Bytes.toBytes(0)).addColumn(CF, CQ, VALUE); + try { + mutator.mutate(put).get(); + fail("Close check failed"); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(IOException.class)); + assertTrue(e.getCause().getMessage().startsWith("Already closed")); + } + for (CompletableFuture f : mutator.mutate(Arrays.asList(put))) { + try { + f.get(); + fail("Close check failed"); + } catch (ExecutionException e) { + assertThat(e.getCause(), instanceOf(IOException.class)); + assertTrue(e.getCause().getMessage().startsWith("Already closed")); + } + } + } +}