HBASE-13926 Close the scanner only after Call#setResponse.

This commit is contained in:
anoopsjohn 2015-06-21 20:04:13 +05:30
parent 04c25e0f35
commit e4d8fab104
17 changed files with 277 additions and 27 deletions

View File

@ -293,6 +293,11 @@ public class HalfStoreFileReader extends StoreFile.Reader {
public void close() { public void close() {
this.delegate.close(); this.delegate.close();
} }
@Override
public void shipped() throws IOException {
this.delegate.shipped();
}
}; };
} }

View File

@ -1083,6 +1083,11 @@ public class HFileReaderImpl implements HFile.Reader, Configurable {
public void close() { public void close() {
// HBASE-12295 will add code here. // HBASE-12295 will add code here.
} }
@Override
public void shipped() throws IOException {
// HBASE-12295 will add code here.
}
} }
public Path getPath() { public Path getPath() {

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.Shipper;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
/** /**
@ -37,7 +38,7 @@ import org.apache.hadoop.hbase.Cell;
* getValue. * getValue.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface HFileScanner { public interface HFileScanner extends Shipper {
/** /**
* SeekTo or just before the passed <code>cell</code>. Examine the return * SeekTo or just before the passed <code>cell</code>. Examine the return
* code to figure whether we found the cell or not. * code to figure whether we found the cell or not.

View File

@ -63,4 +63,12 @@ public interface RpcCallContext extends Delayable {
* @return the client version info, or null if the information is not present * @return the client version info, or null if the information is not present
*/ */
VersionInfo getClientVersionInfo(); VersionInfo getClientVersionInfo();
/**
* Sets a callback which has to be executed at the end of this RPC call. Such a callback is an
* optional one for any Rpc call.
*
* @param callback
*/
void setCallBack(RpcCallback callback);
} }

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* Denotes a callback action that has to be executed at the end of an Rpc Call.
*
* @see RpcCallContext#setCallBack(RpcCallback)
*/
@InterfaceAudience.Private
public interface RpcCallback {
/**
* Called at the end of an Rpc Call {@link RpcCallContext}
*/
void run() throws IOException;
}

View File

@ -306,6 +306,7 @@ public class RpcServer implements RpcServerInterface {
private User user; private User user;
private InetAddress remoteAddress; private InetAddress remoteAddress;
private RpcCallback callback;
Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header, Call(int id, final BlockingService service, final MethodDescriptor md, RequestHeader header,
Message param, CellScanner cellScanner, Connection connection, Responder responder, Message param, CellScanner cellScanner, Connection connection, Responder responder,
@ -431,6 +432,16 @@ public class RpcServer implements RpcServerInterface {
LOG.warn("Exception while creating response " + e); LOG.warn("Exception while creating response " + e);
} }
this.response = bc; this.response = bc;
// Once a response message is created and set to this.response, this Call can be treated as
// done. The Responder thread will do the n/w write of this message back to client.
if (this.callback != null) {
try {
this.callback.run();
} catch (Exception e) {
// Don't allow any exception here to kill this handler thread.
LOG.warn("Exception while running the Rpc Callback.", e);
}
}
} }
private BufferChain wrapWithSasl(BufferChain bc) private BufferChain wrapWithSasl(BufferChain bc)
@ -553,6 +564,11 @@ public class RpcServer implements RpcServerInterface {
public VersionInfo getClientVersionInfo() { public VersionInfo getClientVersionInfo() {
return connection.getVersionInfo(); return connection.getVersionInfo();
} }
@Override
public void setCallBack(RpcCallback callback) {
this.callback = callback;
}
} }
/** Listens on the socket. Creates jobs for the handler threads*/ /** Listens on the socket. Creates jobs for the handler threads*/

View File

@ -5225,7 +5225,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/** /**
* RegionScannerImpl is used to combine scanners from multiple Stores (aka column families). * RegionScannerImpl is used to combine scanners from multiple Stores (aka column families).
*/ */
class RegionScannerImpl implements RegionScanner { class RegionScannerImpl implements RegionScanner, org.apache.hadoop.hbase.ipc.RpcCallback {
// Package local for testability // Package local for testability
KeyValueHeap storeHeap = null; KeyValueHeap storeHeap = null;
/** Heap of key-values that are not essential for the provided filters and are thus read /** Heap of key-values that are not essential for the provided filters and are thus read
@ -5830,6 +5830,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
throw new UnsupportedOperationException("not able to abort RS after: " + msg); throw new UnsupportedOperationException("not able to abort RS after: " + msg);
} }
@Override
public void shipped() throws IOException {
if (storeHeap != null) {
storeHeap.shipped();
}
if (joinedHeap != null) {
joinedHeap.shipped();
}
}
@Override
public void run() throws IOException {
// This is the RPC callback method executed. We do the close in of the scanner in this
// callback
this.close();
}
} }
// Utility methods // Utility methods

View File

@ -406,4 +406,20 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
// here we return the next index key from the top scanner // here we return the next index key from the top scanner
return current == null ? null : current.getNextIndexedKey(); return current == null ? null : current.getNextIndexedKey();
} }
@Override
public void shipped() throws IOException {
for (KeyValueScanner scanner : this.scannersForDelayedClose) {
scanner.close(); // There wont be further fetch of Cells from these scanners. Just close.
}
this.scannersForDelayedClose.clear();
if (this.current != null) {
this.current.shipped();
}
if (this.heap != null) {
for (KeyValueScanner scanner : this.heap) {
scanner.shipped();
}
}
}
} }

View File

@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.client.Scan;
* Scanner that returns the next KeyValue. * Scanner that returns the next KeyValue.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public interface KeyValueScanner { public interface KeyValueScanner extends Shipper {
/** /**
* Look at the next Cell in this scanner, but do not iterate scanner. * Look at the next Cell in this scanner, but do not iterate scanner.
* @return the next Cell * @return the next Cell

View File

@ -155,11 +155,14 @@ public class Leases extends HasThread {
* @param leaseName name of the lease * @param leaseName name of the lease
* @param leaseTimeoutPeriod length of the lease in milliseconds * @param leaseTimeoutPeriod length of the lease in milliseconds
* @param listener listener that will process lease expirations * @param listener listener that will process lease expirations
* @return The lease created.
* @throws LeaseStillHeldException * @throws LeaseStillHeldException
*/ */
public void createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener) public Lease createLease(String leaseName, int leaseTimeoutPeriod, final LeaseListener listener)
throws LeaseStillHeldException { throws LeaseStillHeldException {
addLease(new Lease(leaseName, leaseTimeoutPeriod, listener)); Lease lease = new Lease(leaseName, leaseTimeoutPeriod, listener);
addLease(lease);
return lease;
} }
/** /**

View File

@ -71,4 +71,9 @@ public abstract class NonLazyKeyValueScanner implements KeyValueScanner {
public Cell getNextIndexedKey() { public Cell getNextIndexedKey() {
return null; return null;
} }
@Override
public void shipped() throws IOException {
// do nothing
}
} }

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.PriorityFunction; import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.QosPriority; import org.apache.hadoop.hbase.ipc.QosPriority;
import org.apache.hadoop.hbase.ipc.RpcCallContext; import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.RpcCallback;
import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.RpcServerInterface;
@ -154,6 +155,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.quotas.OperationQuota; import org.apache.hadoop.hbase.quotas.OperationQuota;
import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
import org.apache.hadoop.hbase.regionserver.Leases.Lease;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.Region.FlushResult; import org.apache.hadoop.hbase.regionserver.Region.FlushResult;
import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.Region.Operation;
@ -239,16 +241,64 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
private final long minimumScanTimeLimitDelta; private final long minimumScanTimeLimitDelta;
/** /**
* Holder class which holds the RegionScanner and nextCallSeq together. * An Rpc callback for closing a RegionScanner.
*/
private static class RegionScannerCloseCallBack implements RpcCallback {
private final RegionScanner scanner;
public RegionScannerCloseCallBack(RegionScanner scanner){
this.scanner = scanner;
}
@Override
public void run() throws IOException {
this.scanner.close();
}
}
/**
* An Rpc callback for doing shipped() call on a RegionScanner.
*/
private class RegionScannerShippedCallBack implements RpcCallback {
private final String scannerName;
private final RegionScanner scanner;
private final Lease lease;
public RegionScannerShippedCallBack(String scannerName, RegionScanner scanner, Lease lease) {
this.scannerName = scannerName;
this.scanner = scanner;
this.lease = lease;
}
@Override
public void run() throws IOException {
this.scanner.shipped();
// We're done. On way out re-add the above removed lease. The lease was temp removed for this
// Rpc call and we are at end of the call now. Time to add it back.
if (scanners.containsKey(scannerName)) {
if (lease != null) regionServer.leases.addLease(lease);
}
}
}
/**
* Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
*/ */
private static class RegionScannerHolder { private static class RegionScannerHolder {
private AtomicLong nextCallSeq = new AtomicLong(0); private AtomicLong nextCallSeq = new AtomicLong(0);
private RegionScanner s; private RegionScanner s;
private Region r; private Region r;
final RpcCallback closeCallBack;
final RpcCallback shippedCallback;
public RegionScannerHolder(RegionScanner s, Region r) { public RegionScannerHolder(RegionScanner s, Region r, RpcCallback closeCallBack,
RpcCallback shippedCallback) {
this.s = s; this.s = s;
this.r = r; this.r = r;
this.closeCallBack = closeCallBack;
this.shippedCallback = shippedCallback;
} }
private long getNextCallSeq() { private long getNextCallSeq() {
@ -364,6 +414,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return context != null && context.isClientCellBlockSupport(); return context != null && context.isClientCellBlockSupport();
} }
private boolean isClientCellBlockSupport(RpcCallContext context) {
return context != null && context.isClientCellBlockSupport();
}
private void addResult(final MutateResponse.Builder builder, private void addResult(final MutateResponse.Builder builder,
final Result result, final PayloadCarryingRpcController rpcc) { final Result result, final PayloadCarryingRpcController rpcc) {
if (result == null) return; if (result == null) return;
@ -377,10 +431,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
private void addResults(final ScanResponse.Builder builder, final List<Result> results, private void addResults(final ScanResponse.Builder builder, final List<Result> results,
final RpcController controller, boolean isDefaultRegion) { final RpcController controller, boolean isDefaultRegion, boolean clientCellBlockSupported) {
builder.setStale(!isDefaultRegion); builder.setStale(!isDefaultRegion);
if (results == null || results.isEmpty()) return; if (results == null || results.isEmpty()) return;
if (isClientCellBlockSupport()) { if (clientCellBlockSupported) {
for (Result res : results) { for (Result res : results) {
builder.addCellsPerResult(res.size()); builder.addCellsPerResult(res.size());
builder.addPartialFlagPerResult(res.isPartial()); builder.addPartialFlagPerResult(res.isPartial());
@ -923,17 +977,21 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
return 0L; return 0L;
} }
long addScanner(RegionScanner s, Region r) throws LeaseStillHeldException { RegionScannerHolder addScanner(String scannerName, RegionScanner s, Region r)
long scannerId = this.scannerIdGen.incrementAndGet(); throws LeaseStillHeldException {
String scannerName = String.valueOf(scannerId); Lease lease = regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
RegionScannerHolder existing =
scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
regionServer.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
new ScannerListener(scannerName)); new ScannerListener(scannerName));
return scannerId; RpcCallback shippedCallback = new RegionScannerShippedCallBack(scannerName, s, lease);
RpcCallback closeCallback;
if (s instanceof RpcCallback) {
closeCallback = (RpcCallback) s;
} else {
closeCallback = new RegionScannerCloseCallBack(s);
}
RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback);
RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh);
assert existing == null : "scannerId must be unique within regionserver's whole lifecycle!";
return rsh;
} }
/** /**
@ -2229,11 +2287,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner); scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
} }
scannerId = addScanner(scanner, region); scannerId = this.scannerIdGen.incrementAndGet();
scannerName = String.valueOf(scannerId); scannerName = String.valueOf(scannerId);
rsh = addScanner(scannerName, scanner, region);
ttl = this.scannerLeaseTimeoutPeriod; ttl = this.scannerLeaseTimeoutPeriod;
} }
RpcCallContext context = RpcServer.getCurrentCall();
quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable()); long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
@ -2451,7 +2512,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
moreResults = false; moreResults = false;
results = null; results = null;
} else { } else {
addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())); addResults(builder, results, controller,
RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
isClientCellBlockSupport(context));
} }
} catch (IOException e) { } catch (IOException e) {
// if we have an exception on scanner next and we are using the callSeq // if we have an exception on scanner next and we are using the callSeq
@ -2462,11 +2525,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
throw e; throw e;
} finally { } finally {
// We're done. On way out re-add the above removed lease. if (context != null) {
context.setCallBack(rsh.shippedCallback);
}
// Adding resets expiration time on lease. // Adding resets expiration time on lease.
if (scanners.containsKey(scannerName)) { if (scanners.containsKey(scannerName)) {
if (lease != null) regionServer.leases.addLease(lease);
ttl = this.scannerLeaseTimeoutPeriod; ttl = this.scannerLeaseTimeoutPeriod;
// When context != null, adding back the lease will be done in callback set above.
if (context == null) {
if (lease != null) regionServer.leases.addLease(lease);
}
} }
} }
} }
@ -2481,9 +2549,16 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
rsh = scanners.remove(scannerName); rsh = scanners.remove(scannerName);
if (rsh != null) { if (rsh != null) {
scanner = rsh.s; if (context != null) {
scanner.close(); context.setCallBack(rsh.closeCallBack);
regionServer.leases.cancelLease(scannerName); } else {
rsh.s.close();
}
try {
regionServer.leases.cancelLease(scannerName);
} catch (LeaseException le) {
// No problem, ignore
}
if (region != null && region.getCoprocessorHost() != null) { if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerClose(scanner); region.getCoprocessorHost().postScannerClose(scanner);
} }

View File

@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
*/ */
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving @InterfaceStability.Evolving
public interface RegionScanner extends InternalScanner { public interface RegionScanner extends InternalScanner, Shipper {
/** /**
* @return The RegionInfo for this scanner. * @return The RegionInfo for this scanner.
*/ */

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* This interface denotes a scanner as one which can ship cells. Scan operation do many RPC requests
* to server and fetch N rows/RPC. These are then shipped to client. At the end of every such batch
* {@link #shipped()} will get called.
*/
@InterfaceAudience.Private
public interface Shipper {
/**
* Called after a batch of rows scanned and set to be returned to client. Any in between cleanup
* can be done here.
*/
void shipped() throws IOException;
}

View File

@ -488,4 +488,9 @@ public class StoreFileScanner implements KeyValueScanner {
public Cell getNextIndexedKey() { public Cell getNextIndexedKey() {
return hfs.getNextIndexedKey(); return hfs.getNextIndexedKey();
} }
@Override
public void shipped() throws IOException {
this.hfs.shipped();
}
} }

View File

@ -901,5 +901,21 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
public Cell getNextIndexedKey() { public Cell getNextIndexedKey() {
return this.heap.getNextIndexedKey(); return this.heap.getNextIndexedKey();
} }
@Override
public void shipped() throws IOException {
lock.lock();
try {
for (KeyValueHeap h : this.heapsForDelayedClose) {
h.close();// There wont be further fetch of Cells from these scanners. Just close.
}
this.heapsForDelayedClose.clear();
if (this.heap != null) {
this.heap.shipped();
}
} finally {
lock.unlock();
}
}
} }

View File

@ -144,6 +144,11 @@ public class TestCoprocessorInterface {
public int getBatch() { public int getBatch() {
return delegate.getBatch(); return delegate.getBatch();
} }
@Override
public void shipped() throws IOException {
this.delegate.shipped();
}
} }
public static class CoprocessorImpl extends BaseRegionObserver { public static class CoprocessorImpl extends BaseRegionObserver {