diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
index 7a4a3337504..5f17c3b5401 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java
@@ -293,6 +293,11 @@ public class HalfStoreFileReader extends StoreFile.Reader {
public void close() {
this.delegate.close();
}
+
+ @Override
+ public void shipped() throws IOException {
+ this.delegate.shipped();
+ }
};
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
index c6655c11a34..4bd85a48cef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java
@@ -1083,6 +1083,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
public void close() {
// HBASE-12295 will add code here.
}
+
+ @Override
+ public void shipped() throws IOException {
+ // HBASE-12295 will add code here.
+ }
}
public Path getPath() {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
index 4d9990e9e59..6120e7194f2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.Shipper;
import org.apache.hadoop.hbase.Cell;
/**
@@ -37,7 +38,7 @@ import org.apache.hadoop.hbase.Cell;
* getValue.
*/
@InterfaceAudience.Private
-public interface HFileScanner {
+public interface HFileScanner extends Shipper {
/**
* SeekTo or just before the passed cell
. Examine the return
* code to figure whether we found the cell or not.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
index bb63e018b52..268b34fca08 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallContext.java
@@ -63,4 +63,12 @@ public interface RpcCallContext extends Delayable {
* @return the client version info, or null if the information is not present
*/
VersionInfo getClientVersionInfo();
+
+ /**
+ * Sets a callback which has to be executed at the end of this RPC call. Such a callback is an
+ * optional one for any Rpc call.
+ *
+ * @param callback
+ */
+ void setCallBack(RpcCallback callback);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallback.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallback.java
new file mode 100644
index 00000000000..90b7a8778de
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcCallback.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Denotes a callback action that has to be executed at the end of an Rpc Call.
+ *
+ * @see RpcCallContext#setCallBack(RpcCallback)
+ */
+@InterfaceAudience.Private
+public interface RpcCallback {
+
+ /**
+ * Called at the end of an Rpc Call {@link RpcCallContext}
+ */
+ void run() throws IOException;
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 35577689e25..cafd25de8a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -306,6 +306,7 @@ public class RpcServer implements RpcServerInterface {
private User user;
private InetAddress remoteAddress;
+ private RpcCallback callback;
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, Connection connection, Responder responder,
@@ -431,6 +432,16 @@ public class RpcServer implements RpcServerInterface {
LOG.warn("Exception while creating response " + e);
}
this.response = bc;
+ // Once a response message is created and set to this.response, this Call can be treated as
+ // done. The Responder thread will do the n/w write of this message back to client.
+ if (this.callback != null) {
+ try {
+ this.callback.run();
+ } catch (Exception e) {
+ // Don't allow any exception here to kill this handler thread.
+ LOG.warn("Exception while running the Rpc Callback.", e);
+ }
+ }
}
private BufferChain wrapWithSasl(BufferChain bc)
@@ -553,6 +564,11 @@ public class RpcServer implements RpcServerInterface {
public VersionInfo getClientVersionInfo() {
return connection.getVersionInfo();
}
+
+ @Override
+ public void setCallBack(RpcCallback callback) {
+ this.callback = callback;
+ }
}
/** Listens on the socket. Creates jobs for the handler threads*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index a3e862dc631..7a69e3203b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5225,7 +5225,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
*/
- class RegionScannerImpl implements RegionScanner {
+ class RegionScannerImpl implements RegionScanner, org.apache.hadoop.hbase.ipc.RpcCallback {
// Package local for testability
KeyValueHeap storeHeap = null;
/** Heap of key-values that are not essential for the provided filters and are thus read
@@ -5830,6 +5830,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
throw new UnsupportedOperationException("not able to abort RS after: " + msg);
}
+
+ @Override
+ public void shipped() throws IOException {
+ if (storeHeap != null) {
+ storeHeap.shipped();
+ }
+ if (joinedHeap != null) {
+ joinedHeap.shipped();
+ }
+ }
+
+ @Override
+ public void run() throws IOException {
+ // This is the RPC callback method executed. We do the close in of the scanner in this
+ // callback
+ this.close();
+ }
}
// Utility methods
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
index a12e7c351eb..2b9d0f5f21a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
@@ -406,4 +406,20 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
// here we return the next index key from the top scanner
return current == null ? null : current.getNextIndexedKey();
}
+
+ @Override
+ public void shipped() throws IOException {
+ for (KeyValueScanner scanner : this.scannersForDelayedClose) {
+ scanner.close(); // There wont be further fetch of Cells from these scanners. Just close.
+ }
+ this.scannersForDelayedClose.clear();
+ if (this.current != null) {
+ this.current.shipped();
+ }
+ if (this.heap != null) {
+ for (KeyValueScanner scanner : this.heap) {
+ scanner.shipped();
+ }
+ }
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
index 76a9d0fb544..9a62b8f9a74 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.client.Scan;
* Scanner that returns the next KeyValue.
*/
@InterfaceAudience.Private
-public interface KeyValueScanner {
+public interface KeyValueScanner extends Shipper {
/**
* Look at the next Cell in this scanner, but do not iterate scanner.
* @return the next Cell
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java
index 83b9fb12dea..1373e27b1d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java
@@ -155,11 +155,14 @@ public class Leases extends HasThread {
* @param leaseName name of the lease
* @param leaseTimeoutPeriod length of the lease in milliseconds
* @param listener listener that will process lease expirations
+ * @return The lease created.
* @throws LeaseStillHeldException
*/
- public void createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
+ public Lease createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
throws LeaseStillHeldException {
- addLease(new Lease(leaseName, leaseTimeoutPeriod, listener));
+ Lease lease = new Lease(leaseName, leaseTimeoutPeriod, listener);
+ addLease(lease);
+ return lease;
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
index 957f4174392..9a9036b0108 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
@@ -71,4 +71,9 @@ public abstract class NonLazyKeyValueScanner implements KeyValueScanner {
public Cell getNextIndexedKey() {
return null;
}
+
+ @Override
+ public void shipped() throws IOException {
+ // do nothing
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 5fd285a71c6..6dc9f4e0712 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.QosPriority;
import org.apache.hadoop.hbase.ipc.RpcCallContext;
+import org.apache.hadoop.hbase.ipc.RpcCallback;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
@@ -154,6 +155,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.quotas.OperationQuota;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.regionserver.Leases.Lease;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
@@ -239,16 +241,64 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private final long minimumScanTimeLimitDelta;
/**
- * Holder class which holds the RegionScanner and nextCallSeq together.
+ * An Rpc callback for closing a RegionScanner.
+ */
+ private static class RegionScannerCloseCallBack implements RpcCallback {
+
+ private final RegionScanner scanner;
+
+ public RegionScannerCloseCallBack(RegionScanner scanner){
+ this.scanner = scanner;
+ }
+
+ @Override
+ public void run() throws IOException {
+ this.scanner.close();
+ }
+ }
+
+ /**
+ * An Rpc callback for doing shipped() call on a RegionScanner.
+ */
+ private class RegionScannerShippedCallBack implements RpcCallback {
+
+ private final String scannerName;
+ private final RegionScanner scanner;
+ private final Lease lease;
+
+ public RegionScannerShippedCallBack(String scannerName, RegionScanner scanner, Lease lease) {
+ this.scannerName = scannerName;
+ this.scanner = scanner;
+ this.lease = lease;
+ }
+
+ @Override
+ public void run() throws IOException {
+ this.scanner.shipped();
+ // We're done. On way out re-add the above removed lease. The lease was temp removed for this
+ // Rpc call and we are at end of the call now. Time to add it back.
+ if (scanners.containsKey(scannerName)) {
+ if (lease != null) regionServer.leases.addLease(lease);
+ }
+ }
+ }
+
+ /**
+ * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
*/
private static class RegionScannerHolder {
private AtomicLong nextCallSeq = new AtomicLong(0);
private RegionScanner s;
private Region r;
+ final RpcCallback closeCallBack;
+ final RpcCallback shippedCallback;
- public RegionScannerHolder(RegionScanner s, Region r) {
+ public RegionScannerHolder(RegionScanner s, Region r, RpcCallback closeCallBack,
+ RpcCallback shippedCallback) {
this.s = s;
this.r = r;
+ this.closeCallBack = closeCallBack;
+ this.shippedCallback = shippedCallback;
}
private long getNextCallSeq() {
@@ -364,6 +414,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return context != null && context.isClientCellBlockSupport();
}
+ private boolean isClientCellBlockSupport(RpcCallContext context) {
+ return context != null && context.isClientCellBlockSupport();
+ }
+
private void addResult(final MutateResponse.Builder builder,
final Result result, final PayloadCarryingRpcController rpcc) {
if (result == null) return;
@@ -377,10 +431,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
private void addResults(final ScanResponse.Builder builder, final List results,
- final RpcController controller, boolean isDefaultRegion) {
+ final RpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
builder.setStale(!isDefaultRegion);
if (results == null || results.isEmpty()) return;
- if (isClientCellBlockSupport()) {
+ if (clientCellBlockSupported) {
for (Result res : results) {
builder.addCellsPerResult(res.size());
builder.addPartialFlagPerResult(res.isPartial());
@@ -923,17 +977,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return 0L;
}
- long addScanner(RegionScanner s, Region r) throws LeaseStillHeldException {
- long scannerId = this.scannerIdGen.incrementAndGet();
- String scannerName = String.valueOf(scannerId);
-
- RegionScannerHolder existing =
- scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
- assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
-
- regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
+ RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r)
+ throws LeaseStillHeldException {
+ Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
new ScannerListener(scannerName));
- return scannerId;
+ RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, s, lease);
+ RpcCallback closeCallback;
+ if (s instanceof RpcCallback) {
+ closeCallback = (RpcCallback) s;
+ } else {
+ closeCallback = new RegionScannerCloseCallBack(s);
+ }
+ RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback);
+ RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
+ assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
+ return rsh;
}
/**
@@ -2229,11 +2287,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (region.getCoprocessorHost() != null) {
scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
}
- scannerId = addScanner(scanner, region);
+ scannerId = this.scannerIdGen.incrementAndGet();
scannerName = String.valueOf(scannerId);
+ rsh = addScanner(scannerName, scanner, region);
ttl = this.scannerLeaseTimeoutPeriod;
}
+ RpcCallContext context = RpcServer.getCurrentCall();
+
quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
@@ -2451,7 +2512,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
moreResults = false;
results = null;
} else {
- addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
+ addResults(builder, results, controller,
+ RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
+ isClientCellBlockSupport(context));
}
} catch (IOException e) {
// if we have an exception on scanner next and we are using the callSeq
@@ -2462,11 +2525,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
throw e;
} finally {
- // We're done. On way out re-add the above removed lease.
+ if (context != null) {
+ context.setCallBack(rsh.shippedCallback);
+ }
// Adding resets expiration time on lease.
if (scanners.containsKey(scannerName)) {
- if (lease != null) regionServer.leases.addLease(lease);
ttl = this.scannerLeaseTimeoutPeriod;
+ // When context != null, adding back the lease will be done in callback set above.
+ if (context == null) {
+ if (lease != null) regionServer.leases.addLease(lease);
+ }
}
}
}
@@ -2481,9 +2549,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
rsh = scanners.remove(scannerName);
if (rsh != null) {
- scanner = rsh.s;
- scanner.close();
- regionServer.leases.cancelLease(scannerName);
+ if (context != null) {
+ context.setCallBack(rsh.closeCallBack);
+ } else {
+ rsh.s.close();
+ }
+ try {
+ regionServer.leases.cancelLease(scannerName);
+ } catch (LeaseException le) {
+ // No problem, ignore
+ }
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerClose(scanner);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
index 1bc6546eff3..9e7ff0ff1ea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
-public interface RegionScanner extends InternalScanner {
+public interface RegionScanner extends InternalScanner, Shipper {
/**
* @return The RegionInfo for this scanner.
*/
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java
new file mode 100644
index 00000000000..fb66f51f62e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This interface denotes a scanner as one which can ship cells. Scan operation do many RPC requests
+ * to server and fetch N rows/RPC. These are then shipped to client. At the end of every such batch
+ * {@link #shipped()} will get called.
+ */
+@InterfaceAudience.Private
+public interface Shipper {
+
+ /**
+ * Called after a batch of rows scanned and set to be returned to client. Any in between cleanup
+ * can be done here.
+ */
+ void shipped() throws IOException;
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index e7a5af43cc6..2f64607b9c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -488,4 +488,9 @@ public class StoreFileScanner implements KeyValueScanner {
public Cell getNextIndexedKey() {
return hfs.getNextIndexedKey();
}
+
+ @Override
+ public void shipped() throws IOException {
+ this.hfs.shipped();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index d63ccca88bb..d60087b03ef 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -901,5 +901,21 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
public Cell getNextIndexedKey() {
return this.heap.getNextIndexedKey();
}
+
+ @Override
+ public void shipped() throws IOException {
+ lock.lock();
+ try {
+ for (KeyValueHeap h : this.heapsForDelayedClose) {
+ h.close();// There wont be further fetch of Cells from these scanners. Just close.
+ }
+ this.heapsForDelayedClose.clear();
+ if (this.heap != null) {
+ this.heap.shipped();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
index 4f5b84dc962..b2ef1bdd6e8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
@@ -144,6 +144,11 @@ public class TestCoprocessorInterface {
public int getBatch() {
return delegate.getBatch();
}
+
+ @Override
+ public void shipped() throws IOException {
+ this.delegate.shipped();
+ }
}
public static class CoprocessorImpl extends BaseRegionObserver {