SOLR-6874: There is a race around SocketProxy binding to it's port the way we setup JettySolrRunner and SocketProxy.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1649154 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy Potter 2015-01-03 00:20:46 +00:00
parent c2d9d1c602
commit c96668e207
4 changed files with 94 additions and 82 deletions

View File

@ -381,6 +381,9 @@ Bug Fixes
* SOLR-6779: fix /browse for schemaless example (ehatcher)
* SOLR-6874: There is a race around SocketProxy binding to it's port the way we setup
JettySolrRunner and SocketProxy. (Mark Miller, Timothy Potter)
Optimizations
----------------------

View File

@ -56,8 +56,6 @@ public class ReplicationFactorTest extends AbstractFullDistribZkTestBase {
private static final transient Logger log =
LoggerFactory.getLogger(ReplicationFactorTest.class);
private Map<URI,SocketProxy> proxies = new HashMap<URI,SocketProxy>();
public ReplicationFactorTest() {
super();
sliceCount = 3;
@ -104,24 +102,7 @@ public class ReplicationFactorTest extends AbstractFullDistribZkTestBase {
String shardList, String solrConfigOverride, String schemaOverride)
throws Exception {
JettySolrRunner jetty = new JettySolrRunner(solrHome.getPath(), context,
0, solrConfigOverride, schemaOverride, false,
getExtraServlets(), sslConfig, getExtraRequestFilters());
jetty.setShards(shardList);
jetty.setDataDir(getDataDir(dataDir));
// setup to proxy Http requests to this server unless it is the control
// server
int proxyPort = getNextAvailablePort();
jetty.setProxyPort(proxyPort);
jetty.start();
// create a socket proxy for the jetty server ...
SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI());
proxies.put(proxy.getUrl(), proxy);
return jetty;
return createProxiedJetty(solrHome, dataDir, shardList, solrConfigOverride, schemaOverride);
}
protected int getNextAvailablePort() throws Exception {
@ -321,20 +302,6 @@ public class ReplicationFactorTest extends AbstractFullDistribZkTestBase {
ensureAllReplicasAreActive(testCollectionName, shardId, numShards, replicationFactor, 30);
}
protected SocketProxy getProxyForReplica(Replica replica) throws Exception {
String replicaBaseUrl = replica.getStr(ZkStateReader.BASE_URL_PROP);
assertNotNull(replicaBaseUrl);
URL baseUrl = new URL(replicaBaseUrl);
SocketProxy proxy = proxies.get(baseUrl.toURI());
if (proxy == null && !baseUrl.toExternalForm().endsWith("/")) {
baseUrl = new URL(baseUrl.toExternalForm() + "/");
proxy = proxies.get(baseUrl.toURI());
}
assertNotNull("No proxy found for " + baseUrl + "!", proxy);
return proxy;
}
protected int sendDoc(int docId, int minRf) throws Exception {
UpdateRequest up = new UpdateRequest();
up.setParam(UpdateRequest.MIN_REPFACT, String.valueOf(minRf));

View File

@ -536,16 +536,11 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
jetty.setShards(shardList);
jetty.setDataDir(getDataDir(dataDir));
// setup to proxy Http requests to this server unless it is the control
// server
int proxyPort = getNextAvailablePort();
jetty.setProxyPort(proxyPort);
SocketProxy proxy = new SocketProxy(0, sslConfig == null ? false : sslConfig.isSSLMode());
jetty.setProxyPort(proxy.getListenPort());
jetty.start();
// create a socket proxy for the jetty server ...
SocketProxy proxy = new SocketProxy(proxyPort, jetty.getBaseUrl().toURI());
proxy.open(jetty.getBaseUrl().toURI());
proxies.put(proxy.getUrl(), proxy);
return jetty;
}

View File

@ -59,7 +59,7 @@ public class SocketProxy {
public List<Bridge> connections = new LinkedList<Bridge>();
private int listenPort = 0;
private final int listenPort;
private int receiveBufferSize = -1;
@ -67,16 +67,32 @@ public class SocketProxy {
private int acceptBacklog = 50;
public SocketProxy() throws Exception {}
private boolean usesSSL;
public SocketProxy(URI uri) throws Exception {
this(0, uri);
public SocketProxy() throws Exception {
this(0, false);
}
public SocketProxy(int port, URI uri) throws Exception {
listenPort = port;
public SocketProxy( boolean useSSL) throws Exception {
this(0, useSSL);
}
public SocketProxy(int port, boolean useSSL) throws Exception {
int listenPort = port;
this.usesSSL = useSSL;
serverSocket = createServerSocket(useSSL);
serverSocket.setReuseAddress(true);
if (receiveBufferSize > 0) {
serverSocket.setReceiveBufferSize(receiveBufferSize);
}
serverSocket.bind(new InetSocketAddress(listenPort), acceptBacklog);
this.listenPort = serverSocket.getLocalPort();
}
public void open(URI uri) throws Exception {
target = uri;
open();
proxyUrl = urlFromSocket(target, serverSocket);
doOpen();
}
public String toString() {
@ -91,18 +107,8 @@ public class SocketProxy {
target = tcpBrokerUri;
}
public void open() throws Exception {
serverSocket = createServerSocket(target);
serverSocket.setReuseAddress(true);
if (receiveBufferSize > 0) {
serverSocket.setReceiveBufferSize(receiveBufferSize);
}
if (proxyUrl == null) {
serverSocket.bind(new InetSocketAddress(listenPort), acceptBacklog);
proxyUrl = urlFromSocket(target, serverSocket);
} else {
serverSocket.bind(new InetSocketAddress(proxyUrl.getPort()));
}
private void doOpen() throws Exception {
acceptor = new Acceptor(serverSocket, target);
if (pauseAtStart) {
acceptor.pause();
@ -112,19 +118,19 @@ public class SocketProxy {
closed = new CountDownLatch(1);
}
private boolean isSsl(URI target) {
return "ssl".equals(target.getScheme());
public int getListenPort() {
return listenPort;
}
private ServerSocket createServerSocket(URI target) throws Exception {
if (isSsl(target)) {
private ServerSocket createServerSocket(boolean useSSL) throws Exception {
if (useSSL) {
return SSLServerSocketFactory.getDefault().createServerSocket();
}
return new ServerSocket();
}
private Socket createSocket(URI target) throws Exception {
if (isSsl(target)) {
private Socket createSocket(boolean useSSL) throws Exception {
if (useSSL) {
return SSLSocketFactory.getDefault().createSocket();
}
return new Socket();
@ -175,7 +181,16 @@ public class SocketProxy {
public void reopen() {
log.info("Re-opening connectivity to "+getUrl());
try {
open();
if (proxyUrl == null) {
throw new IllegalStateException("Can not call open before open(URI uri).");
}
serverSocket = createServerSocket(usesSSL);
serverSocket.setReuseAddress(true);
if (receiveBufferSize > 0) {
serverSocket.setReceiveBufferSize(receiveBufferSize);
}
serverSocket.bind(new InetSocketAddress(proxyUrl.getPort()));
doOpen();
} catch (Exception e) {
log.debug("exception on reopen url:" + getUrl(), e);
}
@ -257,7 +272,7 @@ public class SocketProxy {
public Bridge(Socket socket, URI target) throws Exception {
receiveSocket = socket;
sendSocket = createSocket(target);
sendSocket = createSocket(usesSSL);
if (receiveBufferSize > 0) {
sendSocket.setReceiveBufferSize(receiveBufferSize);
}
@ -291,9 +306,9 @@ public class SocketProxy {
}
private void linkWithThreads(Socket source, Socket dest) {
requestThread = new Pump(source, dest);
requestThread = new Pump("Request", source, dest);
requestThread.start();
responseThread = new Pump(dest, source);
responseThread = new Pump("Response", dest, source);
responseThread.start();
}
@ -303,8 +318,8 @@ public class SocketProxy {
private Socket destination;
private AtomicReference<CountDownLatch> pause = new AtomicReference<CountDownLatch>();
public Pump(Socket source, Socket dest) {
super("SocketProxy-DataTransfer-" + source.getPort() + ":"
public Pump(String kind, Socket source, Socket dest) {
super("SocketProxy-"+kind+"-" + source.getPort() + ":"
+ dest.getPort());
src = source;
destination = dest;
@ -321,16 +336,33 @@ public class SocketProxy {
public void run() {
byte[] buf = new byte[1024];
try {
InputStream in = src.getInputStream();
OutputStream out = destination.getOutputStream();
src.setSoTimeout(10 * 1000);
} catch (SocketException e) {
log.error("Failed to set socket timeout on "+src+" due to: "+e);
throw new RuntimeException(e);
}
InputStream in = null;
OutputStream out = null;
try {
in = src.getInputStream();
out = destination.getOutputStream();
while (true) {
int len = in.read(buf);
int len = -1;
try {
len = in.read(buf);
} catch (SocketTimeoutException ste) {
log.warn(ste+" when reading from "+src);
}
if (len == -1) {
log.debug("read eof from:" + src);
break;
}
pause.get().await();
if (len > 0)
out.write(buf, 0, len);
}
} catch (Exception e) {
@ -342,6 +374,21 @@ public class SocketProxy {
close();
}
} catch (Exception ignore) {}
} finally {
if (in != null) {
try {
in.close();
} catch (Exception exc) {
log.debug(exc+" when closing InputStream on socket: "+src);
}
}
if (out != null) {
try {
out.close();
} catch (Exception exc) {
log.debug(exc+" when closing OutputStream on socket: "+destination);
}
}
}
}
}