LUCENE-5612: Add ant target to test NativeFSLockFactory, refactor the lock verifier.

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1588570 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Uwe Schindler 2014-04-18 21:32:49 +00:00
parent 8e3727bb05
commit 4df4a1c902
5 changed files with 244 additions and 120 deletions

View File

@ -146,4 +146,73 @@
<target name="regenerate" depends="createLevAutomata,createPackedIntSources"/>
<macrodef name="startLockStressTestClient">
<attribute name="clientId"/>
<sequential>
<local name="lockverifyserver.port"/>
<groovy><![CDATA[
String port;
while ((port = System.getProperty("lockverifyserver.port")) == null) {
Thread.sleep(10L);
}
properties["lockverifyserver.port"] = port;
]]></groovy>
<java taskname="LockStressTest@{clientId}" fork="true" classpathref="test-lock.classpath" classname="org.apache.lucene.store.LockStressTest" failOnError="true">
<arg value="@{clientId}"/>
<arg value="${lockverifyserver.host}"/>
<arg value="${lockverifyserver.port}"/>
<arg value="${lock.factory.impl}"/>
<arg value="${lock.factory.dir}"/>
<arg value="${lockverify.delay}"/>
<arg value="${lockverify.count}"/>
</java>
</sequential>
</macrodef>
<target name="test-lock-factory" depends="resolve-groovy,compile-core">
<property name="lockverifyserver.host" value="127.0.0.1"/>
<property name="lock.factory.impl" value="org.apache.lucene.store.NativeFSLockFactory"/>
<property name="lock.factory.dir" location="${build.dir}/lockfactorytest"/>
<property name="lockverify.delay" value="1"/>
<groovy taskname="LockVerifySetup"><![CDATA[
System.clearProperty("lockverifyserver.port"); // make sure it is undefined
if (!properties["lockverify.count"]) {
int count = Boolean.parseBoolean(properties["tests.nightly"]) ?
20000 : 2000;
count *= Integer.parseInt(properties["tests.multiplier"]);
properties["lockverify.count"] = count;
}
task.log("Configuration properties:");
["lock.factory.impl", "lockverify.delay", "lockverify.count"].each {
k -> task.log(" " + k + "=" + properties[k]);
}
]]></groovy>
<path id="test-lock.classpath">
<path refid="classpath"/>
<pathelement location="${build.dir}/classes/java"/>
</path>
<mkdir dir="${lock.factory.dir}"/>
<parallel threadCount="3" failonany="false">
<sequential>
<!-- the server runs in-process, so we can wait for the sysproperty -->
<java taskname="LockVerifyServer" fork="false" classpathref="test-lock.classpath" classname="org.apache.lucene.store.LockVerifyServer" failOnError="true">
<arg value="${lockverifyserver.host}"/>
<arg value="2"/>
</java>
</sequential>
<sequential>
<startLockStressTestClient clientId="1"/>
</sequential>
<sequential>
<startLockStressTestClient clientId="2"/>
</sequential>
</parallel>
</target>
<!-- once we fixed LUCENE-5612, reenable this:
<target name="test" depends="common.test, test-lock-factory"/>
-->
</project>

View File

@ -72,7 +72,7 @@ public abstract class Lock implements Closeable {
* out of bounds
* @throws IOException if obtain() throws IOException
*/
public boolean obtain(long lockWaitTimeout) throws IOException {
public final boolean obtain(long lockWaitTimeout) throws IOException {
failureReason = null;
boolean locked = obtain();
if (lockWaitTimeout < 0 && lockWaitTimeout != LOCK_OBTAIN_WAIT_FOREVER)

View File

@ -19,6 +19,12 @@ package org.apache.lucene.store;
import java.io.IOException;
import java.io.File;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Random;
import org.apache.lucene.util.IOUtils;
/**
* Simple standalone tool that forever acquires & releases a
@ -33,49 +39,45 @@ public class LockStressTest {
public static void main(String[] args) throws Exception {
if (args.length != 6) {
System.out.println("\nUsage: java org.apache.lucene.store.LockStressTest myID verifierHostOrIP verifierPort lockFactoryClassName lockDirName sleepTime\n" +
if (args.length != 7) {
System.out.println("Usage: java org.apache.lucene.store.LockStressTest myID verifierHost verifierPort lockFactoryClassName lockDirName sleepTimeMS count\n" +
"\n" +
" myID = int from 0 .. 255 (should be unique for test process)\n" +
" verifierHostOrIP = host name or IP address where LockVerifyServer is running\n" +
" verifierHost = hostname that LockVerifyServer is listening on\n" +
" verifierPort = port that LockVerifyServer is listening on\n" +
" lockFactoryClassName = primary LockFactory class that we will use\n" +
" lockDirName = path to the lock directory (only set for Simple/NativeFSLockFactory\n" +
" sleepTimeMS = milliseconds to pause betweeen each lock obtain/release\n" +
" count = number of locking tries\n" +
"\n" +
"You should run multiple instances of this process, each with its own\n" +
"unique ID, and each pointing to the same lock directory, to verify\n" +
"that locking is working correctly.\n" +
"\n" +
"Make sure you are first running LockVerifyServer.\n" +
"\n");
"Make sure you are first running LockVerifyServer.");
System.exit(1);
}
final int myID = Integer.parseInt(args[0]);
int arg = 0;
final int myID = Integer.parseInt(args[arg++]);
if (myID < 0 || myID > 255) {
System.out.println("myID must be a unique int 0..255");
System.exit(1);
}
final String verifierHost = args[1];
final int verifierPort = Integer.parseInt(args[2]);
final String lockFactoryClassName = args[3];
final String lockDirName = args[4];
final int sleepTimeMS = Integer.parseInt(args[5]);
final String verifierHost = args[arg++];
final int verifierPort = Integer.parseInt(args[arg++]);
final String lockFactoryClassName = args[arg++];
final String lockDirName = args[arg++];
final int sleepTimeMS = Integer.parseInt(args[arg++]);
final int count = Integer.parseInt(args[arg++]);
LockFactory lockFactory;
try {
lockFactory = Class.forName(lockFactoryClassName).asSubclass(LockFactory.class).newInstance();
} catch (IllegalAccessException e) {
throw new IOException("IllegalAccessException when instantiating LockClass " + lockFactoryClassName);
} catch (InstantiationException e) {
throw new IOException("InstantiationException when instantiating LockClass " + lockFactoryClassName);
} catch (ClassCastException e) {
throw new IOException("unable to cast LockClass " + lockFactoryClassName + " instance to a LockFactory");
} catch (ClassNotFoundException e) {
throw new IOException("unable to find LockClass " + lockFactoryClassName);
} catch (IllegalAccessException | InstantiationException | ClassCastException | ClassNotFoundException e) {
throw new IOException("Cannot instantiate lock factory " + lockFactoryClassName);
}
File lockDir = new File(lockDirName);
@ -84,27 +86,49 @@ public class LockStressTest {
((FSLockFactory) lockFactory).setLockDir(lockDir);
}
final InetSocketAddress addr = new InetSocketAddress(verifierHost, verifierPort);
System.out.println("Connecting to server " + addr +
" and registering as client " + myID + "...");
Socket socket = new Socket();
socket.setReuseAddress(true);
socket.connect(addr, 500);
OutputStream os = socket.getOutputStream();
os.write(myID);
os.flush();
lockFactory.setLockPrefix("test");
final LockFactory verifyLF = new VerifyingLockFactory(lockFactory, socket);
final Lock l = verifyLF.makeLock("test.lock");
final Random rnd = new Random();
LockFactory verifyLF = new VerifyingLockFactory((byte) myID, lockFactory, verifierHost, verifierPort);
Lock l = verifyLF.makeLock("test.lock");
while(true) {
// wait for starting gun
if (socket.getInputStream().read() != 43) {
throw new IOException("Protocol violation");
}
for (int i = 0; i < count; i++) {
boolean obtained = false;
try {
obtained = l.obtain(10);
obtained = l.obtain(rnd.nextInt(100) + 10);
} catch (LockObtainFailedException e) {
System.out.print("x");
}
if (obtained) {
System.out.print("l");
Thread.sleep(sleepTimeMS);
l.close();
}
if (i % 500 == 0) {
System.out.println((i * 100. / count) + "% done.");
}
Thread.sleep(sleepTimeMS);
}
IOUtils.closeWhileHandlingException(socket);
System.out.println("Finished " + count + " tries.");
}
}

View File

@ -17,11 +17,15 @@ package org.apache.lucene.store;
* limitations under the License.
*/
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.io.OutputStream;
import java.io.InputStream;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.apache.lucene.util.IOUtils;
/**
* Simple standalone server that must be running when you
@ -35,62 +39,108 @@ import java.io.IOException;
public class LockVerifyServer {
private static String getTime(long startTime) {
return "[" + ((System.currentTimeMillis()-startTime)/1000) + "s] ";
}
public static void main(String[] args) throws Exception {
public static void main(String[] args) throws IOException {
if (args.length != 1) {
System.out.println("\nUsage: java org.apache.lucene.store.LockVerifyServer port\n");
if (args.length != 2) {
System.out.println("Usage: java org.apache.lucene.store.LockVerifyServer bindToIp clients\n");
System.exit(1);
}
final int port = Integer.parseInt(args[0]);
int arg = 0;
final String hostname = args[arg++];
final int maxClients = Integer.parseInt(args[arg++]);
ServerSocket s = new ServerSocket(port);
try (final ServerSocket s = new ServerSocket()) {
s.setReuseAddress(true);
System.out.println("\nReady on port " + port + "...");
s.setSoTimeout(30000); // initially 30 secs to give clients enough time to startup
s.bind(new InetSocketAddress(hostname, 0));
final InetSocketAddress localAddr = (InetSocketAddress) s.getLocalSocketAddress();
System.out.println("Listening on " + localAddr + "...");
int lockedID = 0;
long startTime = System.currentTimeMillis();
// we set the port as a sysprop, so the ANT task can read it. For that to work, this server must run in-process:
System.setProperty("lockverifyserver.port", Integer.toString(localAddr.getPort()));
final Object localLock = new Object();
final int[] lockedID = new int[1];
lockedID[0] = -1;
final CountDownLatch startingGun = new CountDownLatch(1);
final Thread[] threads = new Thread[maxClients];
for (int count = 0; count < maxClients; count++) {
final Socket cs = s.accept();
threads[count] = new Thread() {
@Override
public void run() {
try (InputStream in = cs.getInputStream(); OutputStream os = cs.getOutputStream()) {
final int id = in.read();
if (id < 0) {
throw new IOException("Client closed connection before communication started.");
}
startingGun.await();
os.write(43);
os.flush();
while(true) {
Socket cs = s.accept();
OutputStream out = cs.getOutputStream();
InputStream in = cs.getInputStream();
final int command = in.read();
if (command < 0) {
return; // closed
}
int id = in.read();
int command = in.read();
boolean err = false;
if (command == 1) {
synchronized(localLock) {
final int currentLock = lockedID[0];
if (currentLock == -2) {
return; // another thread got error, so we exit, too!
}
switch (command) {
case 1:
// Locked
if (lockedID != 0) {
err = true;
System.out.println(getTime(startTime) + " ERROR: id " + id + " got lock, but " + lockedID + " already holds the lock");
if (currentLock != -1) {
lockedID[0] = -2;
throw new IllegalStateException("id " + id + " got lock, but " + currentLock + " already holds the lock");
}
lockedID = id;
} else if (command == 0) {
if (lockedID != id) {
err = true;
System.out.println(getTime(startTime) + " ERROR: id " + id + " released the lock, but " + lockedID + " is the one holding the lock");
lockedID[0] = id;
break;
case 0:
// Unlocked
if (currentLock != id) {
lockedID[0] = -2;
throw new IllegalStateException("id " + id + " released the lock, but " + currentLock + " is the one holding the lock");
}
lockedID[0] = -1;
break;
default:
throw new RuntimeException("Unrecognized command: " + command);
}
os.write(command);
os.flush();
}
}
} catch (RuntimeException | Error e) {
throw e;
} catch (Exception ioe) {
throw new RuntimeException(ioe);
} finally {
IOUtils.closeWhileHandlingException(cs);
}
}
};
threads[count].start();
}
lockedID = 0;
} else
throw new RuntimeException("unrecognized command " + command);
System.out.print(".");
// start
System.out.println("All clients started, fire gun...");
startingGun.countDown();
if (err)
out.write(1);
else
out.write(0);
// wait for all threads to finish
for (Thread t : threads) {
t.join();
}
out.close();
in.close();
cs.close();
// cleanup sysprop
System.clearProperty("lockverifyserver.port");
System.out.println("Server terminated.");
}
}
}

View File

@ -19,7 +19,6 @@ package org.apache.lucene.store;
import java.net.Socket;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
/**
@ -38,47 +37,35 @@ import java.io.OutputStream;
public class VerifyingLockFactory extends LockFactory {
LockFactory lf;
byte id;
String host;
int port;
final LockFactory lf;
final Socket socket;
private class CheckedLock extends Lock {
private Lock lock;
private final Lock lock;
public CheckedLock(Lock lock) {
this.lock = lock;
}
private void verify(byte message) {
try {
Socket s = new Socket(host, port);
OutputStream out = s.getOutputStream();
out.write(id);
private void verify(byte message) throws IOException {
final OutputStream out = socket.getOutputStream();
out.write(message);
InputStream in = s.getInputStream();
int result = in.read();
in.close();
out.close();
s.close();
if (result != 0)
throw new RuntimeException("lock was double acquired");
} catch (Exception e) {
throw new RuntimeException(e);
out.flush();
final int ret = socket.getInputStream().read();
if (ret < 0) {
throw new IllegalStateException("Lock server died because of locking error.");
}
if (ret != message) {
throw new IOException("Protocol violation.");
}
@Override
public synchronized boolean obtain(long lockWaitTimeout) throws IOException {
boolean obtained = lock.obtain(lockWaitTimeout);
if (obtained)
verify((byte) 1);
return obtained;
}
@Override
public synchronized boolean obtain() throws IOException {
return lock.obtain();
boolean obtained = lock.obtain();
if (obtained)
verify((byte) 1);
return obtained;
}
@Override
@ -96,18 +83,12 @@ public class VerifyingLockFactory extends LockFactory {
}
/**
* @param id should be a unique id across all clients
* @param lf the LockFactory that we are testing
* @param host host or IP where {@link LockVerifyServer}
is running
* @param port the port {@link LockVerifyServer} is
listening on
* @param socket the socket connected to {@link LockVerifyServer}
*/
public VerifyingLockFactory(byte id, LockFactory lf, String host, int port) {
this.id = id;
public VerifyingLockFactory(LockFactory lf, Socket socket) {
this.lf = lf;
this.host = host;
this.port = port;
this.socket = socket;
}
@Override