From e4d8fab104d6c4edc8721ee14238fcc59bd203d0 Mon Sep 17 00:00:00 2001 From: anoopsjohn Date: Sun, 21 Jun 2015 20:04:13 +0530 Subject: [PATCH] HBASE-13926 Close the scanner only after Call#setResponse. --- .../hadoop/hbase/io/HalfStoreFileReader.java | 5 + .../hbase/io/hfile/HFileReaderImpl.java | 5 + .../hadoop/hbase/io/hfile/HFileScanner.java | 3 +- .../hadoop/hbase/ipc/RpcCallContext.java | 8 ++ .../apache/hadoop/hbase/ipc/RpcCallback.java | 36 ++++++ .../apache/hadoop/hbase/ipc/RpcServer.java | 16 +++ .../hadoop/hbase/regionserver/HRegion.java | 19 ++- .../hbase/regionserver/KeyValueHeap.java | 16 +++ .../hbase/regionserver/KeyValueScanner.java | 2 +- .../hadoop/hbase/regionserver/Leases.java | 7 +- .../regionserver/NonLazyKeyValueScanner.java | 5 + .../hbase/regionserver/RSRpcServices.java | 117 ++++++++++++++---- .../hbase/regionserver/RegionScanner.java | 2 +- .../hadoop/hbase/regionserver/Shipper.java | 37 ++++++ .../hbase/regionserver/StoreFileScanner.java | 5 + .../hbase/regionserver/StoreScanner.java | 16 +++ .../coprocessor/TestCoprocessorInterface.java | 5 + 17 files changed, 277 insertions(+), 27 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallback.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java index 7a4a3337504..5f17c3b5401 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java @@ -293,6 +293,11 @@ public class HalfStoreFileReader extends StoreFile.Reader { public void close() { this.delegate.close(); } + + @Override + public void shipped() throws IOException { + this.delegate.shipped(); + } }; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java index c6655c11a34..4bd85a48cef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java @@ -1083,6 +1083,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable { public void close() { // HBASE-12295 will add code here. } + + @Override + public void shipped() throws IOException { + // HBASE-12295 will add code here. + } } public Path getPath() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java index 4d9990e9e59..6120e7194f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.Shipper; import org.apache.hadoop.hbase.Cell; /** @@ -37,7 +38,7 @@ import org.apache.hadoop.hbase.Cell; * getValue. */ @InterfaceAudience.Private -public interface HFileScanner { +public interface HFileScanner extends Shipper { /** * SeekTo or just before the passed cell. Examine the return * code to figure whether we found the cell or not. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java index bb63e018b52..268b34fca08 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java @@ -63,4 +63,12 @@ public interface RpcCallContext extends Delayable { * @return the client version info, or null if the information is not present */ VersionInfo getClientVersionInfo(); + + /** + * Sets a callback which has to be executed at the end of this RPC call. Such a callback is an + * optional one for any Rpc call. + * + * @param callback + */ + void setCallBack(RpcCallback callback); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallback.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallback.java new file mode 100644 index 00000000000..90b7a8778de --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallback.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Denotes a callback action that has to be executed at the end of an Rpc Call. + * + * @see RpcCallContext#setCallBack(RpcCallback) + */ +@InterfaceAudience.Private +public interface RpcCallback { + + /** + * Called at the end of an Rpc Call {@link RpcCallContext} + */ + void run() throws IOException; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index 35577689e25..cafd25de8a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -306,6 +306,7 @@ public class RpcServer implements RpcServerInterface { private User user; private InetAddress remoteAddress; + private RpcCallback callback; Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Message param, CellScanner cellScanner, Connection connection, Responder responder, @@ -431,6 +432,16 @@ public class RpcServer implements RpcServerInterface { LOG.warn("Exception while creating response " + e); } this.response = bc; + // Once a response message is created and set to this.response, this Call can be treated as + // done. The Responder thread will do the n/w write of this message back to client. + if (this.callback != null) { + try { + this.callback.run(); + } catch (Exception e) { + // Don't allow any exception here to kill this handler thread. + LOG.warn("Exception while running the Rpc Callback.", e); + } + } } private BufferChain wrapWithSasl(BufferChain bc) @@ -553,6 +564,11 @@ public class RpcServer implements RpcServerInterface { public VersionInfo getClientVersionInfo() { return connection.getVersionInfo(); } + + @Override + public void setCallBack(RpcCallback callback) { + this.callback = callback; + } } /** Listens on the socket. Creates jobs for the handler threads*/ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a3e862dc631..7a69e3203b2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -5225,7 +5225,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families). */ - class RegionScannerImpl implements RegionScanner { + class RegionScannerImpl implements RegionScanner, org.apache.hadoop.hbase.ipc.RpcCallback { // Package local for testability KeyValueHeap storeHeap = null; /** Heap of key-values that are not essential for the provided filters and are thus read @@ -5830,6 +5830,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } throw new UnsupportedOperationException("not able to abort RS after: " + msg); } + + @Override + public void shipped() throws IOException { + if (storeHeap != null) { + storeHeap.shipped(); + } + if (joinedHeap != null) { + joinedHeap.shipped(); + } + } + + @Override + public void run() throws IOException { + // This is the RPC callback method executed. We do the close in of the scanner in this + // callback + this.close(); + } } // Utility methods diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index a12e7c351eb..2b9d0f5f21a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -406,4 +406,20 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner // here we return the next index key from the top scanner return current == null ? null : current.getNextIndexedKey(); } + + @Override + public void shipped() throws IOException { + for (KeyValueScanner scanner : this.scannersForDelayedClose) { + scanner.close(); // There wont be further fetch of Cells from these scanners. Just close. + } + this.scannersForDelayedClose.clear(); + if (this.current != null) { + this.current.shipped(); + } + if (this.heap != null) { + for (KeyValueScanner scanner : this.heap) { + scanner.shipped(); + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java index 76a9d0fb544..9a62b8f9a74 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.client.Scan; * Scanner that returns the next KeyValue. */ @InterfaceAudience.Private -public interface KeyValueScanner { +public interface KeyValueScanner extends Shipper { /** * Look at the next Cell in this scanner, but do not iterate scanner. * @return the next Cell diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java index 83b9fb12dea..1373e27b1d0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java @@ -155,11 +155,14 @@ public class Leases extends HasThread { * @param leaseName name of the lease * @param leaseTimeoutPeriod length of the lease in milliseconds * @param listener listener that will process lease expirations + * @return The lease created. * @throws LeaseStillHeldException */ - public void createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener) + public Lease createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener) throws LeaseStillHeldException { - addLease(new Lease(leaseName, leaseTimeoutPeriod, listener)); + Lease lease = new Lease(leaseName, leaseTimeoutPeriod, listener); + addLease(lease); + return lease; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java index 957f4174392..9a9036b0108 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java @@ -71,4 +71,9 @@ public abstract class NonLazyKeyValueScanner implements KeyValueScanner { public Cell getNextIndexedKey() { return null; } + + @Override + public void shipped() throws IOException { + // do nothing + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 5fd285a71c6..6dc9f4e0712 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.ipc.RpcCallContext; +import org.apache.hadoop.hbase.ipc.RpcCallback; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface; @@ -154,6 +155,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.quotas.OperationQuota; import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; +import org.apache.hadoop.hbase.regionserver.Leases.Lease; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.Region.FlushResult; import org.apache.hadoop.hbase.regionserver.Region.Operation; @@ -239,16 +241,64 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private final long minimumScanTimeLimitDelta; /** - * Holder class which holds the RegionScanner and nextCallSeq together. + * An Rpc callback for closing a RegionScanner. + */ + private static class RegionScannerCloseCallBack implements RpcCallback { + + private final RegionScanner scanner; + + public RegionScannerCloseCallBack(RegionScanner scanner){ + this.scanner = scanner; + } + + @Override + public void run() throws IOException { + this.scanner.close(); + } + } + + /** + * An Rpc callback for doing shipped() call on a RegionScanner. + */ + private class RegionScannerShippedCallBack implements RpcCallback { + + private final String scannerName; + private final RegionScanner scanner; + private final Lease lease; + + public RegionScannerShippedCallBack(String scannerName, RegionScanner scanner, Lease lease) { + this.scannerName = scannerName; + this.scanner = scanner; + this.lease = lease; + } + + @Override + public void run() throws IOException { + this.scanner.shipped(); + // We're done. On way out re-add the above removed lease. The lease was temp removed for this + // Rpc call and we are at end of the call now. Time to add it back. + if (scanners.containsKey(scannerName)) { + if (lease != null) regionServer.leases.addLease(lease); + } + } + } + + /** + * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together. */ private static class RegionScannerHolder { private AtomicLong nextCallSeq = new AtomicLong(0); private RegionScanner s; private Region r; + final RpcCallback closeCallBack; + final RpcCallback shippedCallback; - public RegionScannerHolder(RegionScanner s, Region r) { + public RegionScannerHolder(RegionScanner s, Region r, RpcCallback closeCallBack, + RpcCallback shippedCallback) { this.s = s; this.r = r; + this.closeCallBack = closeCallBack; + this.shippedCallback = shippedCallback; } private long getNextCallSeq() { @@ -364,6 +414,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return context != null && context.isClientCellBlockSupport(); } + private boolean isClientCellBlockSupport(RpcCallContext context) { + return context != null && context.isClientCellBlockSupport(); + } + private void addResult(final MutateResponse.Builder builder, final Result result, final PayloadCarryingRpcController rpcc) { if (result == null) return; @@ -377,10 +431,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } private void addResults(final ScanResponse.Builder builder, final List results, - final RpcController controller, boolean isDefaultRegion) { + final RpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) { builder.setStale(!isDefaultRegion); if (results == null || results.isEmpty()) return; - if (isClientCellBlockSupport()) { + if (clientCellBlockSupported) { for (Result res : results) { builder.addCellsPerResult(res.size()); builder.addPartialFlagPerResult(res.isPartial()); @@ -923,17 +977,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler, return 0L; } - long addScanner(RegionScanner s, Region r) throws LeaseStillHeldException { - long scannerId = this.scannerIdGen.incrementAndGet(); - String scannerName = String.valueOf(scannerId); - - RegionScannerHolder existing = - scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r)); - assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!"; - - regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, + RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r) + throws LeaseStillHeldException { + Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod, new ScannerListener(scannerName)); - return scannerId; + RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, s, lease); + RpcCallback closeCallback; + if (s instanceof RpcCallback) { + closeCallback = (RpcCallback) s; + } else { + closeCallback = new RegionScannerCloseCallBack(s); + } + RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback); + RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); + assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!"; + return rsh; } /** @@ -2229,11 +2287,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (region.getCoprocessorHost() != null) { scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); } - scannerId = addScanner(scanner, region); + scannerId = this.scannerIdGen.incrementAndGet(); scannerName = String.valueOf(scannerId); + rsh = addScanner(scannerName, scanner, region); ttl = this.scannerLeaseTimeoutPeriod; } + RpcCallContext context = RpcServer.getCurrentCall(); + quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); @@ -2451,7 +2512,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, moreResults = false; results = null; } else { - addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())); + addResults(builder, results, controller, + RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()), + isClientCellBlockSupport(context)); } } catch (IOException e) { // if we have an exception on scanner next and we are using the callSeq @@ -2462,11 +2525,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } throw e; } finally { - // We're done. On way out re-add the above removed lease. + if (context != null) { + context.setCallBack(rsh.shippedCallback); + } // Adding resets expiration time on lease. if (scanners.containsKey(scannerName)) { - if (lease != null) regionServer.leases.addLease(lease); ttl = this.scannerLeaseTimeoutPeriod; + // When context != null, adding back the lease will be done in callback set above. + if (context == null) { + if (lease != null) regionServer.leases.addLease(lease); + } } } } @@ -2481,9 +2549,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } rsh = scanners.remove(scannerName); if (rsh != null) { - scanner = rsh.s; - scanner.close(); - regionServer.leases.cancelLease(scannerName); + if (context != null) { + context.setCallBack(rsh.closeCallBack); + } else { + rsh.s.close(); + } + try { + regionServer.leases.cancelLease(scannerName); + } catch (LeaseException le) { + // No problem, ignore + } if (region != null && region.getCoprocessorHost() != null) { region.getCoprocessorHost().postScannerClose(scanner); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java index 1bc6546eff3..9e7ff0ff1ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java @@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceStability.Evolving -public interface RegionScanner extends InternalScanner { +public interface RegionScanner extends InternalScanner, Shipper { /** * @return The RegionInfo for this scanner. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java new file mode 100644 index 00000000000..fb66f51f62e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * This interface denotes a scanner as one which can ship cells. Scan operation do many RPC requests + * to server and fetch N rows/RPC. These are then shipped to client. At the end of every such batch + * {@link #shipped()} will get called. + */ +@InterfaceAudience.Private +public interface Shipper { + + /** + * Called after a batch of rows scanned and set to be returned to client. Any in between cleanup + * can be done here. + */ + void shipped() throws IOException; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index e7a5af43cc6..2f64607b9c3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -488,4 +488,9 @@ public class StoreFileScanner implements KeyValueScanner { public Cell getNextIndexedKey() { return hfs.getNextIndexedKey(); } + + @Override + public void shipped() throws IOException { + this.hfs.shipped(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index d63ccca88bb..d60087b03ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -901,5 +901,21 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner public Cell getNextIndexedKey() { return this.heap.getNextIndexedKey(); } + + @Override + public void shipped() throws IOException { + lock.lock(); + try { + for (KeyValueHeap h : this.heapsForDelayedClose) { + h.close();// There wont be further fetch of Cells from these scanners. Just close. + } + this.heapsForDelayedClose.clear(); + if (this.heap != null) { + this.heap.shipped(); + } + } finally { + lock.unlock(); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java index 4f5b84dc962..b2ef1bdd6e8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java @@ -144,6 +144,11 @@ public class TestCoprocessorInterface { public int getBatch() { return delegate.getBatch(); } + + @Override + public void shipped() throws IOException { + this.delegate.shipped(); + } } public static class CoprocessorImpl extends BaseRegionObserver {