SOLR-3715: remove sync around tlog serialization

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1371379 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2012-08-09 18:53:19 +00:00
parent 858f4ed040
commit 932901bb01
5 changed files with 288 additions and 82 deletions

View File

@ -31,6 +31,13 @@ Upgrading from Solr 4.0.0-BETA
In order to better support distributed search mode, the TermVectorComponent's response format has been changed so that if the schema defines a uniqueKeyField, then that field value is used as the "key" for each document in it's response section, instead of the internal lucene doc id. Users w/o a uniqueKeyField will continue to see the same response format. See SOLR-3229 for more details.
Optimizations
----------------------
* SOLR-3715: improve concurrency of the transaction log by removing
synchronization around log record serialization. (yonik)
Bug Fixes
----------------------

View File

@ -34,9 +34,11 @@ import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@ -149,7 +151,8 @@ public class TransactionLog {
long start = raf.length();
channel = raf.getChannel();
os = Channels.newOutputStream(channel);
fos = FastOutputStream.wrap(os);
fos = new FastOutputStream(os, new byte[65536], 0);
// fos = FastOutputStream.wrap(os);
if (openExisting) {
if (start > 0) {
@ -300,93 +303,119 @@ public class TransactionLog {
numRecords++;
}
private void checkWriteHeader(LogCodec codec, SolrInputDocument optional) throws IOException {
// Unsynchronized access. We can get away with an unsynchronized access here
// since we will never get a false non-zero when the position is in fact 0.
// rollback() is the only function that can reset to zero, and it blocks updates.
if (fos.size() != 0) return;
synchronized (this) {
if (fos.size() != 0) return; // check again while synchronized
if (optional != null) {
addGlobalStrings(optional.getFieldNames());
}
writeLogHeader(codec);
}
}
int lastAddSize;
public long write(AddUpdateCommand cmd, int flags) {
LogCodec codec = new LogCodec();
long pos = 0;
synchronized (this) {
try {
pos = fos.size(); // if we had flushed, this should be equal to channel.position()
SolrInputDocument sdoc = cmd.getSolrInputDocument();
SolrInputDocument sdoc = cmd.getSolrInputDocument();
if (pos == 0) { // TODO: needs to be changed if we start writing a header first
addGlobalStrings(sdoc.getFieldNames());
writeLogHeader(codec);
pos = fos.size();
}
try {
checkWriteHeader(codec, sdoc);
// adaptive buffer sizing
int bufSize = lastAddSize; // unsynchronized access of lastAddSize should be fine
bufSize = Math.min(1024*1024, bufSize+(bufSize>>3)+256);
MemOutputStream out = new MemOutputStream(new byte[bufSize]);
codec.init(out);
codec.writeTag(JavaBinCodec.ARR, 3);
codec.writeInt(UpdateLog.ADD | flags); // should just take one byte
codec.writeLong(cmd.getVersion());
codec.writeSolrInputDocument(cmd.getSolrInputDocument());
lastAddSize = (int)out.size();
synchronized (this) {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
assert pos != 0;
/***
System.out.println("###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
System.out.println("###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
if (pos != fos.size()) {
throw new RuntimeException("ERROR" + "###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
}
throw new RuntimeException("ERROR" + "###writing at " + pos + " fos.size()=" + fos.size() + " raf.length()=" + raf.length());
}
***/
codec.init(fos);
codec.writeTag(JavaBinCodec.ARR, 3);
codec.writeInt(UpdateLog.ADD | flags); // should just take one byte
codec.writeLong(cmd.getVersion());
codec.writeSolrInputDocument(cmd.getSolrInputDocument());
out.writeAll(fos);
endRecord(pos);
// fos.flushBuffer(); // flush later
return pos;
} catch (IOException e) {
// TODO: reset our file pointer back to "pos", the start of this record.
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error logging add", e);
}
} catch (IOException e) {
// TODO: reset our file pointer back to "pos", the start of this record.
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error logging add", e);
}
}
public long writeDelete(DeleteUpdateCommand cmd, int flags) {
LogCodec codec = new LogCodec();
synchronized (this) {
try {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
if (pos == 0) {
writeLogHeader(codec);
pos = fos.size();
}
codec.init(fos);
codec.writeTag(JavaBinCodec.ARR, 3);
codec.writeInt(UpdateLog.DELETE | flags); // should just take one byte
codec.writeLong(cmd.getVersion());
BytesRef br = cmd.getIndexedId();
codec.writeByteArray(br.bytes, br.offset, br.length);
try {
checkWriteHeader(codec, null);
BytesRef br = cmd.getIndexedId();
MemOutputStream out = new MemOutputStream(new byte[20 + br.length]);
codec.init(out);
codec.writeTag(JavaBinCodec.ARR, 3);
codec.writeInt(UpdateLog.DELETE | flags); // should just take one byte
codec.writeLong(cmd.getVersion());
codec.writeByteArray(br.bytes, br.offset, br.length);
synchronized (this) {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
assert pos != 0;
out.writeAll(fos);
endRecord(pos);
// fos.flushBuffer(); // flush later
return pos;
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
public long writeDeleteByQuery(DeleteUpdateCommand cmd, int flags) {
LogCodec codec = new LogCodec();
synchronized (this) {
try {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
if (pos == 0) {
writeLogHeader(codec);
pos = fos.size();
}
codec.init(fos);
codec.writeTag(JavaBinCodec.ARR, 3);
codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags); // should just take one byte
codec.writeLong(cmd.getVersion());
codec.writeStr(cmd.query);
try {
checkWriteHeader(codec, null);
MemOutputStream out = new MemOutputStream(new byte[20 + (cmd.query.length())]);
codec.init(out);
codec.writeTag(JavaBinCodec.ARR, 3);
codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags); // should just take one byte
codec.writeLong(cmd.getVersion());
codec.writeStr(cmd.query);
synchronized (this) {
long pos = fos.size(); // if we had flushed, this should be equal to channel.position()
out.writeAll(fos);
endRecord(pos);
// fos.flushBuffer(); // flush later
return pos;
}
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, e);
}
}
}
@ -745,3 +774,32 @@ class ChannelFastInputStream extends FastInputStream {
}
}
class MemOutputStream extends FastOutputStream {
public List<byte[]> buffers = new LinkedList<byte[]>();
public MemOutputStream(byte[] tempBuffer) {
super(null, tempBuffer, 0);
}
@Override
public void flush(byte[] arr, int offset, int len) throws IOException {
if (arr == buf && offset==0 && len==buf.length) {
buffers.add(buf); // steal the buffer
buf = new byte[8192];
} else if (len > 0) {
byte[] newBuf = new byte[len];
System.arraycopy(arr, offset, newBuf, 0, len);
buffers.add(newBuf);
}
}
public void writeAll(FastOutputStream fos) throws IOException {
for (byte[] buffer : buffers) {
fos.write(buffer);
}
if (pos > 0) {
fos.write(buf, 0, pos);
}
}
}

View File

@ -52,7 +52,7 @@ public class FastWriter extends Writer {
public void write(char c) throws IOException {
if (pos >= buf.length) {
sink.write(buf,0,pos);
flush(buf,0,pos);
pos=0;
}
buf[pos++] = c;
@ -61,7 +61,7 @@ public class FastWriter extends Writer {
@Override
public FastWriter append(char c) throws IOException {
if (pos >= buf.length) {
sink.write(buf,0,pos);
flush(buf,0,pos);
pos=0;
}
buf[pos++] = c;
@ -77,14 +77,14 @@ public class FastWriter extends Writer {
} else if (len<BUFSIZE) {
// if the data to write is small enough, buffer it.
System.arraycopy(cbuf, off, buf, pos, space);
sink.write(buf, 0, buf.length);
flush(buf, 0, buf.length);
pos = len-space;
System.arraycopy(cbuf, off+space, buf, 0, pos);
} else {
sink.write(buf,0,pos); // flush
flush(buf,0,pos); // flush
pos=0;
// don't buffer, just write to sink
sink.write(cbuf, off, len);
flush(cbuf, off, len);
}
}
@ -97,32 +97,40 @@ public class FastWriter extends Writer {
} else if (len<BUFSIZE) {
// if the data to write is small enough, buffer it.
str.getChars(off, off+space, buf, pos);
sink.write(buf, 0, buf.length);
flush(buf, 0, buf.length);
str.getChars(off+space, off+len, buf, 0);
pos = len-space;
} else {
sink.write(buf,0,pos); // flush
flush(buf,0,pos); // flush
pos=0;
// don't buffer, just write to sink
sink.write(str, off, len);
flush(str, off, len);
}
}
@Override
public void flush() throws IOException {
sink.write(buf,0,pos);
flush(buf, 0, pos);
pos=0;
sink.flush();
if (sink != null) sink.flush();
}
public void flush(char[] buf, int offset, int len) throws IOException {
sink.write(buf, offset, len);
}
public void flush(String str, int offset, int len) throws IOException {
sink.write(str, offset, len);
}
@Override
public void close() throws IOException {
flush();
sink.close();
if (sink != null) sink.close();
}
public void flushBuffer() throws IOException {
sink.write(buf, 0, pos);
flush(buf, 0, pos);
pos=0;
}
}

View File

@ -18,24 +18,149 @@
package org.apache.solr.search;
import org.apache.lucene.util.OpenBitSet;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.request.SolrQueryRequest;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
public class TestSolrJ extends SolrTestCaseJ4 {
public void testSolrJ() {
public void testSolrJ() throws Exception {
// docs, producers, connections, sleep_time
// main(new String[] {"1000000","4", "1", "0"});
// doCommitPerf();
}
public static SolrServer server;
public static String idField = "id";
public static Exception ex;
public static void main(String[] args) throws Exception {
// String addr = "http://odin.local:80/solr";
// String addr = "http://odin.local:8983/solr";
String addr = "http://localhost:8983/solr";
int i = 0;
final int nDocs = Integer.parseInt(args[i++]);
final int nProducers = Integer.parseInt(args[i++]);
final int nConnections = Integer.parseInt(args[i++]);
final int maxSleep = Integer.parseInt(args[i++]);
ConcurrentUpdateSolrServer sserver = null;
// server = sserver = new ConcurrentUpdateSolrServer(addr,32,8);
server = sserver = new ConcurrentUpdateSolrServer(addr,64,nConnections);
server.deleteByQuery("*:*");
server.commit();
long start = System.currentTimeMillis();
final int docsPerThread = nDocs / nProducers;
Thread[] threads = new Thread[nProducers];
for (int threadNum = 0; threadNum<nProducers; threadNum++) {
final int base = threadNum * docsPerThread;
threads[threadNum] = new Thread("add-thread"+i) {
public void run(){
try {
indexDocs(base, docsPerThread, maxSleep);
} catch (Exception e) {
System.out.println("###############################CAUGHT EXCEPTION");
e.printStackTrace();
ex = e;
}
}
};
threads[threadNum].start();
}
// optional: wait for commit?
for (int threadNum = 0; threadNum<nProducers; threadNum++) {
threads[threadNum].join();
}
if (sserver != null) {
sserver.blockUntilFinished();
}
long end = System.currentTimeMillis();
System.out.println("time="+(end-start) + " throughput="+(nDocs*1000/(end-start)) + " Exception="+ex);
// should server threads be marked as daemon?
// need a server.close()!!!
}
public static SolrInputDocument getDocument(int docnum) {
SolrInputDocument doc = new SolrInputDocument();
doc.setField(idField, docnum );
doc.setField("cat", Integer.toString(docnum&0x0f) );
doc.setField("name", "my name is " + Integer.toString(docnum&0xff) );
doc.setField("foo_t", "now is the time for all good men to come to the aid of their country" );
doc.setField("foo_i", Integer.toString(docnum&0x0f) );
doc.setField("foo_s", Integer.toString(docnum&0xff) );
doc.setField("foo_b", Boolean.toString( (docnum&0x01) == 1) );
doc.setField("parent_s", Integer.toString(docnum-1) );
doc.setField("price", Integer.toString(docnum >> 4));
int golden = (int)2654435761L;
int h = docnum * golden;
int n = (h & 0xff) + 1;
List lst = new ArrayList(n);
for (int i=0; i<n; i++) {
h = (h+i) * golden;
lst.add(h & 0xfff);
}
doc.setField("num_is", lst);
return doc;
}
public static void indexDocs(int base, int count, int maxSleep) throws IOException, SolrServerException {
Random r = new Random();
for (int i=base; i<count+base; i++) {
if ((i & 0xfffff) == 0) {
System.out.print("\n% " + new Date()+ "\t" + i + "\t");
System.out.flush();
}
if ((i & 0xffff) == 0) {
System.out.print(".");
System.out.flush();
}
SolrInputDocument doc = getDocument(i);
server.add(doc);
if (maxSleep > 0) {
int sleep = r.nextInt(maxSleep);
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
}
public void doCommitPerf() throws Exception {
HttpSolrServer client = new HttpSolrServer("http://localhost:8983/solr");
@ -55,4 +180,7 @@ public class TestSolrJ extends SolrTestCaseJ4 {
System.out.println("TIME: " + (end-start));
}
}

View File

@ -23,10 +23,10 @@ import java.io.*;
* Internal Solr use only, subject to change.
*/
public class FastOutputStream extends OutputStream implements DataOutput {
private final OutputStream out;
private final byte[] buf;
private long written; // how many bytes written to the underlying stream
private int pos;
protected final OutputStream out;
protected byte[] buf;
protected long written; // how many bytes written to the underlying stream
protected int pos;
public FastOutputStream(OutputStream w) {
// use default BUFSIZE of BufferedOutputStream so if we wrap that
@ -57,7 +57,7 @@ public class FastOutputStream extends OutputStream implements DataOutput {
public void write(byte b) throws IOException {
if (pos >= buf.length) {
out.write(buf);
flush(buf, 0, buf.length);
written += pos;
pos=0;
}
@ -73,18 +73,18 @@ public class FastOutputStream extends OutputStream implements DataOutput {
} else if (len<buf.length) {
// if the data to write is small enough, buffer it.
System.arraycopy(arr, off, buf, pos, space);
out.write(buf);
flush(buf, 0, buf.length);
written += buf.length;
pos = len-space;
System.arraycopy(arr, off+space, buf, 0, pos);
} else {
if (pos>0) {
out.write(buf,0,pos); // flush
flush(buf,0,pos); // flush
written += pos;
pos=0;
}
// don't buffer, just write to sink
out.write(arr, off, len);
flush(arr, off, len);
written += len;
}
}
@ -168,13 +168,13 @@ public class FastOutputStream extends OutputStream implements DataOutput {
@Override
public void flush() throws IOException {
flushBuffer();
out.flush();
if (out != null) out.flush();
}
@Override
public void close() throws IOException {
flushBuffer();
out.close();
if (out != null) out.close();
}
/** Only flushes the buffer of the FastOutputStream, not that of the
@ -182,12 +182,17 @@ public class FastOutputStream extends OutputStream implements DataOutput {
*/
public void flushBuffer() throws IOException {
if (pos > 0) {
out.write(buf, 0, pos);
flush(buf, 0, pos);
written += pos;
pos=0;
}
}
/** All writes to the sink will go through this method */
public void flush(byte[] buf, int offset, int len) throws IOException {
out.write(buf, offset, len);
}
public long size() {
return written + pos;
}