- Switched to using camel case in the xml emlement names to make things consistent.

- Replaced the asyncReplication property in the ReplicationService with a minimumReplicas properties.  When set to 0, async replication will be in effect.
- Also removed the use of a map to track replication requests since at most, only 1 sync requrest is issued at a time.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@720234 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-11-24 17:44:12 +00:00
parent a85ca4a186
commit 2996cf31e8
7 changed files with 349 additions and 349 deletions

View File

@ -26,7 +26,7 @@ import org.apache.activemq.broker.BrokerService;
* he will create the actual BrokerService
*
* @author chirino
* @org.apache.xbean.XBean element="kahadb-replication-broker"
* @org.apache.xbean.XBean element="kahadbReplicationBroker"
*/
public class ReplicationBrokerService extends BrokerService {

View File

@ -23,9 +23,10 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -51,58 +52,61 @@ import org.apache.kahadb.replication.pb.PBType;
import org.apache.kahadb.store.KahaDBStore;
import org.apache.kahadb.util.ByteSequence;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
public class ReplicationMaster implements Service, ClusterListener, ReplicationTarget {
private static final Log LOG = LogFactory.getLog(ReplicationService.class);
private static final Log LOG = LogFactory.getLog(ReplicationService.class);
private final ReplicationService replicationService;
private final ReplicationService replicationService;
private Object serverMutex = new Object() {};
private TransportServer server;
private ArrayList<ReplicationSession> sessions = new ArrayList<ReplicationSession>();
private final AtomicInteger nextSnapshotId = new AtomicInteger();
private final Map<Location, CountDownLatch> requestMap = new LinkedHashMap<Location, CountDownLatch>();
private Object serverMutex = new Object() {
};
private TransportServer server;
public ReplicationMaster(ReplicationService replicationService) {
this.replicationService = replicationService;
}
private ArrayList<ReplicationSession> sessions = new ArrayList<ReplicationSession>();
public void start() throws Exception {
synchronized (serverMutex) {
server = TransportFactory.bind(new URI(replicationService.getUri()));
server.setAcceptListener(new TransportAcceptListener() {
public void onAccept(Transport transport) {
try {
synchronized (serverMutex) {
ReplicationSession session = new ReplicationSession(transport);
session.start();
addSession(session);
}
} catch (Exception e) {
LOG.info("Could not accept replication connection from slave at " + transport.getRemoteAddress() + ", due to: " + e, e);
}
}
private final AtomicInteger nextSnapshotId = new AtomicInteger();
private final Object requestMutex = new Object(){};
private Location requestLocation;
private CountDownLatch requestLatch;
private int minimumReplicas;
public ReplicationMaster(ReplicationService replicationService) {
this.replicationService = replicationService;
minimumReplicas = replicationService.getMinimumReplicas();
}
public void start() throws Exception {
synchronized (serverMutex) {
server = TransportFactory.bind(new URI(replicationService.getUri()));
server.setAcceptListener(new TransportAcceptListener() {
public void onAccept(Transport transport) {
try {
synchronized (serverMutex) {
ReplicationSession session = new ReplicationSession(transport);
session.start();
addSession(session);
}
} catch (Exception e) {
LOG.info("Could not accept replication connection from slave at " + transport.getRemoteAddress() + ", due to: " + e, e);
}
}
public void onAcceptError(Exception e) {
LOG.info("Could not accept replication connection: " + e, e);
}
});
server.start();
}
replicationService.getStore().getJournal().setReplicationTarget(this);
}
public void onAcceptError(Exception e) {
LOG.info("Could not accept replication connection: " + e, e);
}
});
server.start();
}
replicationService.getStore().getJournal().setReplicationTarget(this);
}
boolean isStarted() {
synchronized (serverMutex) {
return server!=null;
return server != null;
}
}
public void stop() throws Exception {
replicationService.getStore().getJournal().setReplicationTarget(null);
synchronized (serverMutex) {
@ -111,24 +115,24 @@ public class ReplicationMaster implements Service, ClusterListener, ReplicationT
server = null;
}
}
ArrayList<ReplicationSession> sessionsSnapshot;
synchronized (this.sessions) {
sessionsSnapshot = this.sessions;
}
for (ReplicationSession session: sessionsSnapshot) {
for (ReplicationSession session : sessionsSnapshot) {
session.stop();
}
}
protected void addSession(ReplicationSession session) {
synchronized (sessions) {
sessions = new ArrayList<ReplicationSession>(sessions);
sessions.add(session);
protected void addSession(ReplicationSession session) {
synchronized (sessions) {
sessions = new ArrayList<ReplicationSession>(sessions);
sessions.add(session);
}
}
protected void removeSession(ReplicationSession session) {
synchronized (sessions) {
sessions = new ArrayList<ReplicationSession>(sessions);
@ -136,352 +140,348 @@ public class ReplicationMaster implements Service, ClusterListener, ReplicationT
}
}
public void onClusterChange(ClusterState config) {
// For now, we don't really care about changes in the slave config..
}
public void onClusterChange(ClusterState config) {
// For now, we don't really care about changes in the slave config..
}
/**
* This is called by the Journal so that we can replicate the update to the
* slaves.
*/
public void replicate(Location location, ByteSequence sequence, boolean sync) {
ArrayList<ReplicationSession> sessionsSnapshot;
/**
* This is called by the Journal so that we can replicate the update to the
* slaves.
*/
public void replicate(Location location, ByteSequence sequence, boolean sync) {
ArrayList<ReplicationSession> sessionsSnapshot;
synchronized (this.sessions) {
// Hurrah for copy on write..
sessionsSnapshot = this.sessions;
}
// We may be configured to always do async replication..
if ( replicationService.isAsyncReplication() ) {
sync=false;
}
CountDownLatch latch=null;
if( sync ) {
latch = new CountDownLatch(1);
synchronized (requestMap) {
requestMap.put(location, latch);
// We may be able to always async replicate...
if (minimumReplicas==0) {
sync = false;
}
CountDownLatch latch = null;
if (sync) {
latch = new CountDownLatch(minimumReplicas);
synchronized (requestMutex) {
requestLatch = latch;
requestLocation = location;
}
}
ReplicationFrame frame=null;
for (ReplicationSession session : sessionsSnapshot) {
if( session.subscribedToJournalUpdates.get() ) {
// Lazy create the frame since we may have not avilable sessions to send to.
if( frame == null ) {
frame = new ReplicationFrame();
}
ReplicationFrame frame = null;
for (ReplicationSession session : sessionsSnapshot) {
if (session.subscribedToJournalUpdates.get()) {
// Lazy create the frame since we may have not avilable sessions
// to send to.
if (frame == null) {
frame = new ReplicationFrame();
frame.setHeader(new PBHeader().setType(PBType.JOURNAL_UPDATE));
PBJournalUpdate payload = new PBJournalUpdate();
payload.setLocation(ReplicationSupport.convert(location));
payload.setData(new org.apache.activemq.protobuf.Buffer(sequence.getData(), sequence.getOffset(), sequence.getLength()));
payload.setSendAck(sync);
frame.setPayload(payload);
}
}
// TODO: use async send threads so that the frames can be pushed
// out in parallel.
try {
session.setLastUpdateLocation(location);
session.transport.oneway(frame);
} catch (IOException e) {
session.onException(e);
}
}
}
// TODO: use async send threads so that the frames can be pushed out in parallel.
try {
session.setLastUpdateLocation(location);
session.transport.oneway(frame);
} catch (IOException e) {
session.onException(e);
}
}
}
if (sync) {
try {
int timeout = 500;
int counter=0;
while( true ) {
if( latch.await(timeout, TimeUnit.MILLISECONDS) ) {
synchronized (requestMap) {
requestMap.remove(location);
}
int counter = 0;
while (true) {
if (latch.await(timeout, TimeUnit.MILLISECONDS)) {
return;
}
if( !isStarted() ) {
if (!isStarted()) {
return;
}
counter++;
if( (counter%10)==0 ) {
LOG.warn("KahaDB is waiting for slave to come online. "+(timeout*counter/1000.f)+" seconds have elapsed.");
if ((counter % 10) == 0) {
LOG.warn("KahaDB is waiting for slave to come online. " + (timeout * counter / 1000.f) + " seconds have elapsed.");
}
}
}
} catch (InterruptedException ignore) {
}
}
}
}
private void ackAllFromTo(Location lastAck, Location newAck) {
if ( replicationService.isAsyncReplication() ) {
Location l;
java.util.concurrent.CountDownLatch latch;
synchronized (requestMutex) {
latch = requestLatch;
l = requestLocation;
}
if( l == null ) {
return;
}
ArrayList<Entry<Location, CountDownLatch>> entries;
synchronized (requestMap) {
entries = new ArrayList<Entry<Location, CountDownLatch>>(requestMap.entrySet());
}
boolean inRange=false;
for (Entry<Location, CountDownLatch> entry : entries) {
Location l = entry.getKey();
if( !inRange ) {
if( lastAck==null || lastAck.compareTo(l) < 0 ) {
inRange=true;
}
if (lastAck == null || lastAck.compareTo(l) < 0) {
if (newAck != null && l.compareTo(newAck) <= 0) {
latch.countDown();
return;
}
if( inRange ) {
entry.getValue().countDown();
if( newAck!=null && l.compareTo(newAck) <= 0 ) {
return;
}
}
}
}
}
class ReplicationSession implements Service, TransportListener {
class ReplicationSession implements Service, TransportListener {
private final Transport transport;
private final AtomicBoolean subscribedToJournalUpdates = new AtomicBoolean();
private final Transport transport;
private final AtomicBoolean subscribedToJournalUpdates = new AtomicBoolean();
private boolean stopped;
private File snapshotFile;
private HashSet<Integer> journalReplicatedFiles;
private Location lastAckLocation;
private File snapshotFile;
private HashSet<Integer> journalReplicatedFiles;
private Location lastAckLocation;
private Location lastUpdateLocation;
private boolean online;
public ReplicationSession(Transport transport) {
this.transport = transport;
}
public ReplicationSession(Transport transport) {
this.transport = transport;
}
synchronized public void setLastUpdateLocation(Location lastUpdateLocation) {
synchronized public void setLastUpdateLocation(Location lastUpdateLocation) {
this.lastUpdateLocation = lastUpdateLocation;
}
public void start() throws Exception {
transport.setTransportListener(this);
transport.start();
}
transport.setTransportListener(this);
transport.start();
}
synchronized public void stop() throws Exception {
if ( !stopped ) {
stopped=true;
deleteReplicationData();
transport.stop();
}
}
if (!stopped) {
stopped = true;
deleteReplicationData();
transport.stop();
}
}
synchronized private void onJournalUpdateAck(ReplicationFrame frame, PBJournalLocation location) {
synchronized private void onJournalUpdateAck(ReplicationFrame frame, PBJournalLocation location) {
Location ack = ReplicationSupport.convert(location);
if( online ) {
if (online) {
ackAllFromTo(lastAckLocation, ack);
}
lastAckLocation=ack;
}
synchronized private void onSlaveOnline(ReplicationFrame frame) {
}
lastAckLocation = ack;
}
synchronized private void onSlaveOnline(ReplicationFrame frame) {
deleteReplicationData();
online = true;
if( lastAckLocation!=null ) {
online = true;
if (lastAckLocation != null) {
ackAllFromTo(null, lastAckLocation);
}
}
public void onCommand(Object command) {
try {
ReplicationFrame frame = (ReplicationFrame) command;
switch (frame.getHeader().getType()) {
case SLAVE_INIT:
onSlaveInit(frame, (PBSlaveInit) frame.getPayload());
break;
case SLAVE_ONLINE:
onSlaveOnline(frame);
break;
case FILE_TRANSFER:
onFileTransfer(frame, (PBFileInfo) frame.getPayload());
break;
case JOURNAL_UPDATE_ACK:
onJournalUpdateAck(frame, (PBJournalLocation) frame.getPayload());
break;
}
} catch (Exception e) {
LOG.warn("Slave request failed: "+e, e);
failed(e);
}
}
try {
ReplicationFrame frame = (ReplicationFrame)command;
switch (frame.getHeader().getType()) {
case SLAVE_INIT:
onSlaveInit(frame, (PBSlaveInit)frame.getPayload());
break;
case SLAVE_ONLINE:
onSlaveOnline(frame);
break;
case FILE_TRANSFER:
onFileTransfer(frame, (PBFileInfo)frame.getPayload());
break;
case JOURNAL_UPDATE_ACK:
onJournalUpdateAck(frame, (PBJournalLocation)frame.getPayload());
break;
}
} catch (Exception e) {
LOG.warn("Slave request failed: " + e, e);
failed(e);
}
}
public void onException(IOException error) {
failed(error);
}
public void onException(IOException error) {
failed(error);
}
public void failed(Exception error) {
try {
stop();
} catch (Exception ignore) {
}
}
public void failed(Exception error) {
try {
stop();
} catch (Exception ignore) {
}
}
public void transportInterupted() {
}
public void transportResumed() {
}
private void deleteReplicationData() {
if( snapshotFile!=null ) {
snapshotFile.delete();
snapshotFile=null;
}
if( journalReplicatedFiles!=null ) {
journalReplicatedFiles=null;
updateJournalReplicatedFiles();
}
}
public void transportInterupted() {
}
private void onSlaveInit(ReplicationFrame frame, PBSlaveInit slaveInit) throws Exception {
public void transportResumed() {
}
// Start sending journal updates to the slave.
subscribedToJournalUpdates.set(true);
private void deleteReplicationData() {
if (snapshotFile != null) {
snapshotFile.delete();
snapshotFile = null;
}
if (journalReplicatedFiles != null) {
journalReplicatedFiles = null;
updateJournalReplicatedFiles();
}
}
// We could look at the slave state sent in the slaveInit and decide
// that a full sync is not needed..
// but for now we will do a full sync every time.
ReplicationFrame rc = new ReplicationFrame();
final PBSlaveInitResponse rcPayload = new PBSlaveInitResponse();
rc.setHeader(new PBHeader().setType(PBType.SLAVE_INIT_RESPONSE));
rc.setPayload(rcPayload);
// Setup a map of all the files that the slave has
final HashMap<String, PBFileInfo> slaveFiles = new HashMap<String, PBFileInfo>();
for (PBFileInfo info : slaveInit.getCurrentFilesList()) {
slaveFiles.put(info.getName(), info);
}
final KahaDBStore store = replicationService.getStore();
store.checkpoint(new Callback() {
public void execute() throws Exception {
// This call back is executed once the checkpoint is
// completed and all data has been synced to disk,
// but while a lock is still held on the store so
// that no updates are done while we are in this
// method.
KahaDBStore store = replicationService.getStore();
if( lastAckLocation==null ) {
lastAckLocation = store.getLastUpdatePosition();
}
int snapshotId = nextSnapshotId.incrementAndGet();
File file = store.getPageFile().getFile();
File dir = replicationService.getTempReplicationDir();
dir.mkdirs();
snapshotFile = new File(dir, "snapshot-" + snapshotId);
journalReplicatedFiles = new HashSet<Integer>();
// Store the list files associated with the snapshot.
ArrayList<PBFileInfo> snapshotInfos = new ArrayList<PBFileInfo>();
Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
for (DataFile df : journalFiles.values()) {
// Look at what the slave has so that only the missing bits are transfered.
String name = "journal-" + df.getDataFileId();
PBFileInfo slaveInfo = slaveFiles.remove(name);
// Use the checksum info to see if the slave has the file already.. Checksums are less acurrate for
// small amounts of data.. so ignore small files.
if( slaveInfo!=null && slaveInfo.getEnd()> 1024*512 ) {
// If the slave's file checksum matches what we have..
if( ReplicationSupport.checksum(df.getFile(), 0, slaveInfo.getEnd())==slaveInfo.getChecksum() ) {
// is Our file longer? then we need to continue transferring the rest of the file.
if( df.getLength() > slaveInfo.getEnd() ) {
snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(), slaveInfo.getEnd(), df.getLength()));
journalReplicatedFiles.add(df.getDataFileId());
continue;
} else {
// No need to replicate this file.
continue;
}
}
}
// If we got here then it means we need to transfer the whole file.
snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(), 0, df.getLength()));
journalReplicatedFiles.add(df.getDataFileId());
}
private void onSlaveInit(ReplicationFrame frame, PBSlaveInit slaveInit) throws Exception {
PBFileInfo info = new PBFileInfo();
info.setName("database");
info.setSnapshotId(snapshotId);
info.setStart(0);
info.setEnd(file.length());
info.setChecksum(ReplicationSupport.copyAndChecksum(file, snapshotFile));
snapshotInfos.add(info);
rcPayload.setCopyFilesList(snapshotInfos);
ArrayList<String> deleteFiles = new ArrayList<String>();
slaveFiles.remove("database");
for (PBFileInfo unused : slaveFiles.values()) {
deleteFiles.add(unused.getName());
}
rcPayload.setDeleteFilesList(deleteFiles);
updateJournalReplicatedFiles();
}
// Start sending journal updates to the slave.
subscribedToJournalUpdates.set(true);
});
transport.oneway(rc);
}
private void onFileTransfer(ReplicationFrame frame, PBFileInfo fileInfo) throws IOException {
File file = replicationService.getReplicationFile(fileInfo.getName());
long payloadSize = fileInfo.getEnd()-fileInfo.getStart();
if( file.length() < fileInfo.getStart()+payloadSize ) {
throw new IOException("Requested replication file dose not have enough data.");
}
ReplicationFrame rc = new ReplicationFrame();
rc.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(payloadSize));
FileInputStream is = new FileInputStream(file);
rc.setPayload(is);
try {
is.skip(fileInfo.getStart());
transport.oneway(rc);
} finally {
try {
is.close();
} catch (Throwable e) {
}
}
}
// We could look at the slave state sent in the slaveInit and decide
// that a full sync is not needed..
// but for now we will do a full sync every time.
ReplicationFrame rc = new ReplicationFrame();
final PBSlaveInitResponse rcPayload = new PBSlaveInitResponse();
rc.setHeader(new PBHeader().setType(PBType.SLAVE_INIT_RESPONSE));
rc.setPayload(rcPayload);
}
// Setup a map of all the files that the slave has
final HashMap<String, PBFileInfo> slaveFiles = new HashMap<String, PBFileInfo>();
for (PBFileInfo info : slaveInit.getCurrentFilesList()) {
slaveFiles.put(info.getName(), info);
}
/**
* Looks at all the journal files being currently replicated and informs the KahaDB so that
* it does not delete them while the replication is occuring.
*/
private void updateJournalReplicatedFiles() {
HashSet<Integer> files = replicationService.getStore().getJournalFilesBeingReplicated();
files.clear();
final KahaDBStore store = replicationService.getStore();
store.checkpoint(new Callback() {
public void execute() throws Exception {
// This call back is executed once the checkpoint is
// completed and all data has been synced to disk,
// but while a lock is still held on the store so
// that no updates are done while we are in this
// method.
KahaDBStore store = replicationService.getStore();
if (lastAckLocation == null) {
lastAckLocation = store.getLastUpdatePosition();
}
int snapshotId = nextSnapshotId.incrementAndGet();
File file = store.getPageFile().getFile();
File dir = replicationService.getTempReplicationDir();
dir.mkdirs();
snapshotFile = new File(dir, "snapshot-" + snapshotId);
journalReplicatedFiles = new HashSet<Integer>();
// Store the list files associated with the snapshot.
ArrayList<PBFileInfo> snapshotInfos = new ArrayList<PBFileInfo>();
Map<Integer, DataFile> journalFiles = store.getJournal().getFileMap();
for (DataFile df : journalFiles.values()) {
// Look at what the slave has so that only the missing
// bits are transfered.
String name = "journal-" + df.getDataFileId();
PBFileInfo slaveInfo = slaveFiles.remove(name);
// Use the checksum info to see if the slave has the
// file already.. Checksums are less acurrate for
// small amounts of data.. so ignore small files.
if (slaveInfo != null && slaveInfo.getEnd() > 1024 * 512) {
// If the slave's file checksum matches what we
// have..
if (ReplicationSupport.checksum(df.getFile(), 0, slaveInfo.getEnd()) == slaveInfo.getChecksum()) {
// is Our file longer? then we need to continue
// transferring the rest of the file.
if (df.getLength() > slaveInfo.getEnd()) {
snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(), slaveInfo.getEnd(), df.getLength()));
journalReplicatedFiles.add(df.getDataFileId());
continue;
} else {
// No need to replicate this file.
continue;
}
}
}
// If we got here then it means we need to transfer the
// whole file.
snapshotInfos.add(ReplicationSupport.createInfo(name, df.getFile(), 0, df.getLength()));
journalReplicatedFiles.add(df.getDataFileId());
}
PBFileInfo info = new PBFileInfo();
info.setName("database");
info.setSnapshotId(snapshotId);
info.setStart(0);
info.setEnd(file.length());
info.setChecksum(ReplicationSupport.copyAndChecksum(file, snapshotFile));
snapshotInfos.add(info);
rcPayload.setCopyFilesList(snapshotInfos);
ArrayList<String> deleteFiles = new ArrayList<String>();
slaveFiles.remove("database");
for (PBFileInfo unused : slaveFiles.values()) {
deleteFiles.add(unused.getName());
}
rcPayload.setDeleteFilesList(deleteFiles);
updateJournalReplicatedFiles();
}
});
transport.oneway(rc);
}
private void onFileTransfer(ReplicationFrame frame, PBFileInfo fileInfo) throws IOException {
File file = replicationService.getReplicationFile(fileInfo.getName());
long payloadSize = fileInfo.getEnd() - fileInfo.getStart();
if (file.length() < fileInfo.getStart() + payloadSize) {
throw new IOException("Requested replication file dose not have enough data.");
}
ReplicationFrame rc = new ReplicationFrame();
rc.setHeader(new PBHeader().setType(PBType.FILE_TRANSFER_RESPONSE).setPayloadSize(payloadSize));
FileInputStream is = new FileInputStream(file);
rc.setPayload(is);
try {
is.skip(fileInfo.getStart());
transport.oneway(rc);
} finally {
try {
is.close();
} catch (Throwable e) {
}
}
}
}
/**
* Looks at all the journal files being currently replicated and informs the
* KahaDB so that it does not delete them while the replication is occuring.
*/
private void updateJournalReplicatedFiles() {
HashSet<Integer> files = replicationService.getStore().getJournalFilesBeingReplicated();
files.clear();
ArrayList<ReplicationSession> sessionsSnapshot;
synchronized (this.sessions) {
// Hurrah for copy on write..
sessionsSnapshot = this.sessions;
}
for (ReplicationSession session : sessionsSnapshot) {
if( session.journalReplicatedFiles !=null ) {
files.addAll(session.journalReplicatedFiles);
}
}
}
for (ReplicationSession session : sessionsSnapshot) {
if (session.journalReplicatedFiles != null) {
files.addAll(session.journalReplicatedFiles);
}
}
}
}

View File

@ -34,7 +34,7 @@ import org.apache.kahadb.store.KahaDBStore;
* slave or master facets of the broker.
*
* @author chirino
* @org.apache.xbean.XBean element="kahadb-replication"
* @org.apache.xbean.XBean element="kahadbReplication"
*/
public class ReplicationService implements Service, ClusterListener {
@ -47,7 +47,7 @@ public class ReplicationService implements Service, ClusterListener {
private File tempReplicationDir;
private String uri;
private ClusterStateManager cluster;
private boolean asyncReplication=false;
private int minimumReplicas=1;
private KahaDBStore store;
@ -279,12 +279,12 @@ public class ReplicationService implements Service, ClusterListener {
this.cluster = cluster;
}
public void setAsyncReplication(boolean asyncReplication) {
this.asyncReplication = asyncReplication;
public int getMinimumReplicas() {
return minimumReplicas;
}
public boolean isAsyncReplication() {
return asyncReplication;
public void setMinimumReplicas(int minimumReplicas) {
this.minimumReplicas = minimumReplicas;
}

View File

@ -48,7 +48,7 @@ import org.apache.zookeeper.data.Stat;
/**
*
* @author chirino
* @org.apache.xbean.XBean element="zookeeper-cluster"
* @org.apache.xbean.XBean element="zookeeperCluster"
*/
public class ZooKeeperClusterStateManager implements ClusterStateManager, Watcher {
private static final Log LOG = LogFactory.getLog(ZooKeeperClusterStateManager.class);

View File

@ -49,7 +49,7 @@ public class ReplicationTest extends TestCase {
StaticClusterStateManager cluster = new StaticClusterStateManager();
ReplicationService rs1 = new ReplicationService();
rs1.setAsyncReplication(true);
rs1.setMinimumReplicas(0);
rs1.setUri(BROKER1_REPLICATION_ID);
rs1.setCluster(cluster);
rs1.setDirectory(new File("target/replication-test/broker1"));
@ -57,7 +57,7 @@ public class ReplicationTest extends TestCase {
rs1.start();
ReplicationService rs2 = new ReplicationService();
rs2.setAsyncReplication(true);
rs2.setMinimumReplicas(0);
rs2.setUri(BROKER2_REPLICATION_ID);
rs2.setCluster(cluster);
rs2.setDirectory(new File("target/replication-test/broker2"));

View File

@ -27,21 +27,21 @@
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<kahadb-replication-broker xmlns="http://activemq.apache.org/schema/kahadb">
<kahadbReplicationBroker xmlns="http://activemq.apache.org/schema/kahadb">
<replicationService>
<kahadb-replication
<kahadbReplication
directory="target/kaha-data/broker1"
brokerURI="xbean:broker1/ha-broker.xml"
uri="kdbr://localhost:6001"
asyncReplication="true">
minimumReplicas="0">
<cluster>
<zookeeper-cluster uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq" password=""/>
<zookeeperCluster uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq" password=""/>
</cluster>
</kahadb-replication>
</kahadbReplication>
</replicationService>
</kahadb-replication-broker>
</kahadbReplicationBroker>
</beans>
<!-- END SNIPPET: example -->

View File

@ -27,21 +27,21 @@
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<kahadb-replication-broker xmlns="http://activemq.apache.org/schema/kahadb">
<kahadbReplicationBroker xmlns="http://activemq.apache.org/schema/kahadb">
<replicationService>
<kahadb-replication
<kahadbReplication
directory="target/kaha-data-broker2"
brokerURI="xbean:broker2/ha-broker.xml"
uri="kdbr://localhost:6002"
asyncReplication="true">
minimumReplicas="0">
<cluster>
<zookeeper-cluster uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq" password=""/>
<zookeeperCluster uri="zk://localhost:2181/activemq/ha-cluster/mygroup" userid="activemq" password=""/>
</cluster>
</kahadb-replication>
</kahadbReplication>
</replicationService>
</kahadb-replication-broker>
</kahadbReplicationBroker>
</beans>
<!-- END SNIPPET: example -->