This commit is contained in:
Martyn Taylor 2017-09-13 11:44:38 +01:00
commit dab80e7bcf
1 changed files with 49 additions and 21 deletions

View File

@ -33,11 +33,13 @@ public class FileLockNodeManager extends NodeManager {
private static final Logger logger = Logger.getLogger(FileLockNodeManager.class); private static final Logger logger = Logger.getLogger(FileLockNodeManager.class);
private static final int LIVE_LOCK_POS = 1; private static final long STATE_LOCK_POS = 0;
private static final int BACKUP_LOCK_POS = 2; private static final long LIVE_LOCK_POS = 1;
private static final int LOCK_LENGTH = 1; private static final long BACKUP_LOCK_POS = 2;
private static final long LOCK_LENGTH = 1;
private static final byte LIVE = 'L'; private static final byte LIVE = 'L';
@ -113,6 +115,7 @@ public class FileLockNodeManager extends NodeManager {
@Override @Override
public void awaitLiveNode() throws Exception { public void awaitLiveNode() throws Exception {
logger.debug("awaiting live node...");
do { do {
byte state = getState(); byte state = getState();
while (state == FileLockNodeManager.NOT_STARTED || state == FIRST_TIME_START) { while (state == FileLockNodeManager.NOT_STARTED || state == FIRST_TIME_START) {
@ -229,28 +232,51 @@ public class FileLockNodeManager extends NodeManager {
* @param status * @param status
* @throws IOException * @throws IOException
*/ */
private void writeFileLockStatus(byte status) throws IOException { private void writeFileLockStatus(byte status) throws Exception {
if (replicatedBackup && channel == null) if (replicatedBackup && channel == null)
return; return;
logger.debug("writing status: " + status);
ByteBuffer bb = ByteBuffer.allocateDirect(1); ByteBuffer bb = ByteBuffer.allocateDirect(1);
bb.put(status); bb.put(status);
bb.position(0); bb.position(0);
if (!channel.isOpen()) { if (!channel.isOpen()) {
setUpServerLockFile(); setUpServerLockFile();
} }
FileLock lock = null;
try {
lock = lock(STATE_LOCK_POS);
channel.write(bb, 0); channel.write(bb, 0);
channel.force(true); channel.force(true);
} finally {
if (lock != null) {
lock.release();
}
}
} }
private byte getState() throws Exception { private byte getState() throws Exception {
byte result;
logger.debug("getting state...");
ByteBuffer bb = ByteBuffer.allocateDirect(1); ByteBuffer bb = ByteBuffer.allocateDirect(1);
int read; int read;
FileLock lock = null;
try {
lock = lock(STATE_LOCK_POS);
read = channel.read(bb, 0); read = channel.read(bb, 0);
if (read <= 0) { if (read <= 0) {
return FileLockNodeManager.NOT_STARTED; result = FileLockNodeManager.NOT_STARTED;
} else { } else {
return bb.get(0); result = bb.get(0);
} }
} finally {
if (lock != null) {
lock.release();
}
}
logger.debug("state: " + result);
return result;
} }
@Override @Override
@ -267,25 +293,27 @@ public class FileLockNodeManager extends NodeManager {
return getNodeId(); return getNodeId();
} }
protected FileLock tryLock(final int lockPos) throws Exception { protected FileLock tryLock(final long lockPos) throws IOException {
try { try {
return channel.tryLock(lockPos, LOCK_LENGTH, false); logger.debug("trying to lock position: " + lockPos);
FileLock lock = channel.tryLock(lockPos, LOCK_LENGTH, false);
if (lock != null) {
logger.debug("locked position: " + lockPos);
} else {
logger.debug("failed to lock position: " + lockPos);
}
return lock;
} catch (java.nio.channels.OverlappingFileLockException ex) { } catch (java.nio.channels.OverlappingFileLockException ex) {
// This just means that another object on the same JVM is holding the lock // This just means that another object on the same JVM is holding the lock
return null; return null;
} }
} }
protected FileLock lock(final int liveLockPos) throws Exception { protected FileLock lock(final long lockPosition) throws Exception {
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
while (!interrupted) { while (!interrupted) {
FileLock lock = null; FileLock lock = tryLock(lockPosition);
try {
lock = channel.tryLock(liveLockPos, 1, false);
} catch (java.nio.channels.OverlappingFileLockException ex) {
// This just means that another object on the same JVM is holding the lock
}
if (lock == null) { if (lock == null) {
try { try {
@ -306,7 +334,7 @@ public class FileLockNodeManager extends NodeManager {
// need to investigate further and review // need to investigate further and review
FileLock lock; FileLock lock;
do { do {
lock = channel.tryLock(liveLockPos, 1, false); lock = tryLock(lockPosition);
if (lock == null) { if (lock == null) {
try { try {
Thread.sleep(500); Thread.sleep(500);