If a replicated leveldb slave's connection gets slow, lets merge together journal write events to avoid them queuing up on the master side.

This commit is contained in:
Hiram Chirino 2013-11-05 10:49:40 -05:00
parent d872994260
commit 4367ec1b82
6 changed files with 187 additions and 49 deletions

View File

@ -28,6 +28,7 @@ import java.io.{IOException, File}
import java.net.{SocketAddress, InetSocketAddress, URI}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import scala.reflect.BeanProperty
import org.fusesource.hawtbuf.{Buffer, AsciiBuffer}
class PositionSync(val position:Long, count:Int) extends CountDownLatch(count)
@ -132,7 +133,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
def start_protocol_server = {
transport_server = new TcpTransportServer(new URI(bind))
transport_server.setBlockingExecutor(blocking_executor)
transport_server.setDispatchQueue(createQueue("replication server"))
transport_server.setDispatchQueue(createQueue("master: "+node_id))
transport_server.setTransportServerListener(new TransportServerListener(){
def onAccept(transport: Transport) {
transport.setDispatchQueue(createQueue("connection from "+transport.getRemoteAddress))
@ -266,7 +267,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
sendError("Invalid length")
}
sendOk(null)
send(FileTransferFrame(file, req.offset, req.length))
send(new FileTransferFrame(file, req.offset, req.length))
}
}
@ -282,6 +283,7 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
def start(session:Session) = {
debug("SlaveState:start")
socketAddress = session.transport.getRemoteAddress
session.queue.setLabel(transport_server.getDispatchQueue.getLabel+" -> "+slave_id)
val resp = this.synchronized {
if( this.session!=null ) {
@ -311,16 +313,69 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
}
}
def replicate_wal(frame1:ReplicationFrame, frame2:FileTransferFrame=null ) = {
def queue(func: (Session)=>Unit) = {
val h = this.synchronized {
session
}
if( h !=null ) {
h.queue {
h.send(frame1)
if( frame2!=null ) {
h.send(frame2)
}
func(session)
}
}
}
def replicate(value:LogDelete):Unit = {
val frame = new ReplicationFrame(LOG_DELETE_ACTION, JsonCodec.encode(value))
queue { session =>
session.send(frame)
}
}
var unflushed_replication_frame:DeferredReplicationFrame = null
class DeferredReplicationFrame(file:File, val position:Long, _offset:Long, initialLength:Long) extends ReplicationFrame(WAL_ACTION, null) {
val fileTransferFrame = new FileTransferFrame(file, _offset, initialLength)
var encoded:Buffer = null
def offset = fileTransferFrame.offset
def length = fileTransferFrame.length
override def body: Buffer = {
if( encoded==null ) {
val value = new LogWrite
value.file = position;
value.offset = offset;
value.length = fileTransferFrame.length
value.date = date
encoded = JsonCodec.encode(value)
}
encoded
}
}
def replicate(file:File, position:Long, offset:Long, length:Long):Unit = {
queue { session =>
// Check to see if we can merge the replication event /w the previous event..
if( unflushed_replication_frame == null ||
unflushed_replication_frame.position!=position ||
(unflushed_replication_frame.offset+unflushed_replication_frame.length)!=offset ) {
// We could not merge the replication event /w the previous event..
val frame = new DeferredReplicationFrame(file, position, offset, length)
unflushed_replication_frame = frame
session.send(frame, ()=>{
trace("%s: Sent WAL update: (file:%s, offset: %d, length: %d) to %s", directory, file, frame.offset, frame.length, slave_id)
if( unflushed_replication_frame eq frame ) {
unflushed_replication_frame = null
}
})
session.send(frame.fileTransferFrame)
} else {
// We were able to merge.. yay!
assert(unflushed_replication_frame.encoded == null)
unflushed_replication_frame.fileTransferFrame.length += length
}
}
}
@ -392,18 +447,8 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
def replicate_wal(file:File, position:Long, offset:Long, length:Long):Unit = {
if( length > 0 ) {
val value = new LogWrite
value.file = position;
value.offset = offset;
value.length = length
value.date = date
wal_date = value.date;
value.sync = (syncToMask & SYNC_TO_REMOTE_DISK)!=0
trace("%s: Sending WAL update: (file:%d, offset: %d, length: %d)", directory, value.file, value.offset, value.length)
val frame1 = ReplicationFrame(WAL_ACTION, JsonCodec.encode(value))
val frame2 = FileTransferFrame(file, offset, length)
for( slave <- slaves.values() ) {
slave.replicate_wal(frame1, frame2)
slave.replicate(file, position, offset, length)
}
}
}
@ -411,9 +456,8 @@ class MasterLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
def replicate_log_delete(log:Long):Unit = {
val value = new LogDelete
value.log = log
val frame = ReplicationFrame(LOG_DELETE_ACTION, JsonCodec.encode(value))
for( slave <- slaves.values() ) {
slave.replicate_wal(frame)
slave.replicate(value)
}
}

View File

@ -25,8 +25,10 @@ import java.io.{OutputStream, File}
import org.fusesource.hawtdispatch.transport.ProtocolCodec.BufferState
import java.util
case class ReplicationFrame(action:AsciiBuffer, body:Buffer)
case class FileTransferFrame(file:File, offset:Long, length:Long)
class ReplicationFrame(val action:AsciiBuffer, _body:Buffer) {
def body = _body
}
class FileTransferFrame(val file:File, val offset:Long, var length:Long)
class ReplicationProtocolCodec extends AbstractProtocolCodec {
import ReplicationSupport._
@ -86,7 +88,7 @@ class ReplicationProtocolCodec extends AbstractProtocolCodec {
if( data!=null ) {
data.moveTail(-1);
nextDecodeAction = readHeader
ReplicationFrame(action, data)
new ReplicationFrame(action, data)
} else {
null
}

View File

@ -64,6 +64,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
}
override def doStart() = {
queue.setLabel("slave: "+node_id)
client.init()
if (purgeOnStatup) {
purgeOnStatup = false
@ -97,10 +98,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
}
def start_slave_connections = {
val transport = new TcpTransport()
transport.setBlockingExecutor(blocking_executor)
transport.setDispatchQueue(queue)
transport.connecting(new URI(connect), null)
val transport: TcpTransport = create_transport
status = "Attaching to master: "+connect
info(status)
@ -120,6 +118,14 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
wal_session.start
}
def create_transport: TcpTransport = {
val transport = new TcpTransport()
transport.setBlockingExecutor(blocking_executor)
transport.setDispatchQueue(queue)
transport.connecting(new URI(connect), null)
transport
}
def stop_connections(cb:Task) = {
var then = ^{
unstash(directory)
@ -156,7 +162,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
val ack = new WalAck()
ack.position = wal_append_position
// info("Sending ack: "+wal_append_position)
wal_session.send(ACK_ACTION, ack)
wal_session.send_replication_frame(ACK_ACTION, ack)
if( replay_from != ack.position ) {
val old_replay_from = replay_from
replay_from = ack.position
@ -240,7 +246,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
}
def disconnect(cb:Task) = queue {
send(DISCONNECT_ACTION, null)
send_replication_frame(DISCONNECT_ACTION, null)
transport.flush()
transport.stop(cb)
}
@ -268,7 +274,7 @@ class SlaveLevelDBStore extends LevelDBStore with ReplicatedLevelDBStoreTrait {
def request(action:AsciiBuffer, body:AnyRef)(cb:(ReplicationFrame)=>Unit) = {
response_callbacks.addLast(cb)
send(action, body)
send_replication_frame(action, body)
}
def response_handler: (AnyRef)=>Unit = (command)=> {

View File

@ -28,7 +28,7 @@ import org.fusesource.hawtbuf.AsciiBuffer
*/
abstract class TransportHandler(val transport: Transport) extends TransportListener {
var outbound = new util.LinkedList[AnyRef]()
var outbound = new util.LinkedList[(AnyRef, ()=>Unit)]()
val codec = new ReplicationProtocolCodec
transport.setProtocolCodec(codec)
@ -45,23 +45,26 @@ abstract class TransportHandler(val transport: Transport) extends TransportListe
def drain:Unit = {
while( !outbound.isEmpty ) {
val value = outbound.peekFirst()
val (value, on_send) = outbound.peekFirst()
if( transport.offer(value) ) {
outbound.removeFirst()
if( on_send!=null ) {
on_send()
}
} else {
return
}
}
}
def send(value:AnyRef):Unit = {
def send(value:AnyRef):Unit = send(value, null)
def send(value:AnyRef, on_send: ()=>Unit):Unit = {
transport.getDispatchQueue.assertExecuting()
outbound.add(value)
outbound.add((value, on_send))
drain
}
def send(action:AsciiBuffer, body:AnyRef):Unit = send(ReplicationFrame(action, if(body==null) null else JsonCodec.encode(body)))
def sendError(error:String) = send(ERROR_ACTION, error)
def sendOk(body:AnyRef) = send(OK_ACTION, body)
def send_replication_frame(action:AsciiBuffer, body:AnyRef):Unit = send(new ReplicationFrame(action, if(body==null) null else JsonCodec.encode(body)))
def sendError(error:String) = send_replication_frame(ERROR_ACTION, error)
def sendOk(body:AnyRef) = send_replication_frame(OK_ACTION, body)
}

View File

@ -24,6 +24,7 @@ import org.apache.activemq.leveldb.replicated.MasterLevelDBStore;
import org.apache.activemq.leveldb.replicated.SlaveLevelDBStore;
import org.apache.activemq.leveldb.util.FileSupport;
import org.apache.activemq.store.MessageStore;
import org.fusesource.hawtdispatch.transport.TcpTransport;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,6 +35,7 @@ import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import static org.apache.activemq.leveldb.test.ReplicationTestSupport.addMessage;
import static org.apache.activemq.leveldb.test.ReplicationTestSupport.createPlayload;
import static org.apache.activemq.leveldb.test.ReplicationTestSupport.getMessages;
import static org.junit.Assert.*;
@ -170,13 +172,87 @@ public class ReplicatedLevelDBStoreTest {
}
}
@Test(timeout = 1000*60*60)
public void testSlowSlave() throws Exception {
File node1Dir = new File("target/activemq-data/leveldb-node1");
File node2Dir = new File("target/activemq-data/leveldb-node2");
File node3Dir = new File("target/activemq-data/leveldb-node3");
FileSupport.toRichFile(node1Dir).recursiveDelete();
FileSupport.toRichFile(node2Dir).recursiveDelete();
FileSupport.toRichFile(node3Dir).recursiveDelete();
node2Dir.mkdirs();
node3Dir.mkdirs();
FileSupport.toRichFile(new File(node2Dir, "nodeid.txt")).writeText("node2", "UTF-8");
FileSupport.toRichFile(new File(node3Dir, "nodeid.txt")).writeText("node3", "UTF-8");
ArrayList<String> expected_list = new ArrayList<String>();
MasterLevelDBStore node1 = createMaster(node1Dir);
CountDownFuture masterStart = asyncStart(node1);
// Lets create a 1 slow slave...
SlaveLevelDBStore node2 = new SlaveLevelDBStore() {
boolean hitOnce = false;
@Override
public TcpTransport create_transport() {
if( hitOnce ) {
return super.create_transport();
}
hitOnce = true;
TcpTransport transport = super.create_transport();
transport.setMaxReadRate(64*1024);
return transport;
}
};
configureSlave(node2, node1, node2Dir);
SlaveLevelDBStore node3 = createSlave(node1, node3Dir);
asyncStart(node2);
asyncStart(node3);
masterStart.await();
LOG.info("Adding messages...");
String playload = createPlayload(64 * 1024);
MessageStore ms = node1.createQueueMessageStore(new ActiveMQQueue("TEST"));
final int TOTAL = 10;
for (int i = 0; i < TOTAL; i++) {
if (i == 8) {
// Stop the fast slave so that we wait for the slow slave to
// catch up..
node3.stop();
}
String msgid = "m:" + ":" + i;
addMessage(ms, msgid, playload);
expected_list.add(msgid);
}
LOG.info("Checking node1 state");
assertEquals(expected_list, getMessages(ms));
LOG.info("Stopping node1: " + node1.node_id());
node1.stop();
LOG.info("Stopping slave: " + node2.node_id());
node2.stop();
}
private SlaveLevelDBStore createSlave(MasterLevelDBStore master, File directory) {
SlaveLevelDBStore slave1 = new SlaveLevelDBStore();
slave1.setDirectory(directory);
slave1.setConnect("tcp://127.0.0.1:" + master.getPort());
slave1.setSecurityToken("foo");
slave1.setLogSize(1023 * 200);
return slave1;
SlaveLevelDBStore slave = new SlaveLevelDBStore();
configureSlave(slave, master, directory);
return slave;
}
private SlaveLevelDBStore configureSlave(SlaveLevelDBStore slave, MasterLevelDBStore master, File directory) {
slave.setDirectory(directory);
slave.setConnect("tcp://127.0.0.1:" + master.getPort());
slave.setSecurityToken("foo");
slave.setLogSize(1023 * 200);
return slave;
}
private MasterLevelDBStore createMaster(File directory) {

View File

@ -32,18 +32,25 @@ import java.util.ArrayList;
public class ReplicationTestSupport {
static long id_counter = 0L;
static String payload = "";
{
for (int i = 0; i < 1024; i++) {
static String payload = createPlayload(1024);
public static String createPlayload(int size) {
String payload = "";
for (int i = 0; i < size; i++) {
payload += "x";
}
return payload;
}
static public ActiveMQTextMessage addMessage(MessageStore ms, String body) throws JMSException, IOException {
static public ActiveMQTextMessage addMessage(MessageStore ms, String id) throws JMSException, IOException {
return addMessage(ms, id, payload);
}
static public ActiveMQTextMessage addMessage(MessageStore ms, String id, String payload) throws JMSException, IOException {
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setPersistent(true);
message.setResponseRequired(true);
message.setStringProperty("id", body);
message.setStringProperty("id", id);
message.setText(payload);
id_counter += 1;
MessageId messageId = new MessageId("ID:localhost-56913-1254499826208-0:0:1:1:" + id_counter);