diff --git a/distributed-lock/grails-app/services/com/bertram/lock/LockService.groovy b/distributed-lock/grails-app/services/com/bertram/lock/LockService.groovy index 4707bf3..7790844 100644 --- a/distributed-lock/grails-app/services/com/bertram/lock/LockService.groovy +++ b/distributed-lock/grails-app/services/com/bertram/lock/LockService.groovy @@ -3,6 +3,7 @@ package com.bertram.lock import com.bertram.lock.conf.LockServiceConfigurer import com.bertram.lock.provider.LockProvider import grails.async.Promises +import java.util.concurrent.atomic.AtomicBoolean /** * Grails service that wraps a specific locking provider @@ -15,51 +16,87 @@ class LockService implements GroovyInterceptable { private LockProvider providerDelegate = null - def withLockByDomain(Object domainInstance, Closure clos) { - return withLockByDomain(domainInstance, null, clos) - } + def withLockByDomain(Object domainInstance, Closure clos) { + return withLockByDomain(domainInstance, null, clos) + } + + def withLockByDomain(Object domainInstance, Map params, Closure clos) { + def acquired + try { + acquired = acquireLockByDomain(domainInstance, params) + if (acquired) + return clos.call() + else + log.warn("Unable to obtain lock for: ${domainInstance} - ${params}") + } + finally { + if (acquired) + releaseLockByDomain(domainInstance, params) + } + } + def withLock(String name, Closure clos) { + return withLock(name, null, clos) + } + + /** + * Method handles the lock negotiation for its user while executing some runtime implementation provided by a closure + * @param name + * @param params + * @param clos + * @return + */ + def withLock(String name, Map params, Closure clos) { + def acquired, active, monitor, promise + try { + acquired = acquireLock(name, params) + if (acquired) { + if(params.autoRenew) { + def ttl = params?.ttl == null ? providerDelegate.expireTimeout : params.ttl + def renewInterval = Math.max(1000L, (Long) (ttl / 2L)) + active = new AtomicBoolean(true) + monitor = new Object() + promise = Promises.task { + while (active.get()) { + synchronized (monitor) { + monitor.wait(renewInterval) + } + if (active.get()) { + try { + renewLock(name, params + [lock: acquired, ttl: ttl]) + } catch (Exception ex) { + log.warn("Unable to renew lock ${name}: ${ex.message}") + } + } + } + true + } + } + return clos.call() + } else if ((params?.raiseError != null ? params.raiseError : providerDelegate.raiseError)) { + throw new RuntimeException("Unable to acquire lock for ${name} - ${params}") + } else { + log.warn("Unable to obtain lock for: ${name} - ${params}") + } + } + finally { + if (acquired) { // Only release if we succesfully acquired a lock + if(params.autoRenew) { + active.set(false) - def withLockByDomain(Object domainInstance, Map params, Closure clos) { - def acquired - try { - acquired = acquireLockByDomain(domainInstance, params) - if (acquired) - return clos.call() - else - log.warn("Unable to obtain lock for: ${domainInstance} - ${params}") - } - finally { - if (acquired) - releaseLockByDomain(domainInstance, params) - } - } - def withLock(String name, Closure clos) { - return withLock(name, null, clos) - } + try { + synchronized (monitor) { + monitor.notifyAll() + } + } catch (Exception e) {} - /** - * Method handles the lock negotiation for its user while executing some runtime implementation provided by a closure - * @param name - * @param params - * @param clos - * @return - */ - def withLock(String name, Map params, Closure clos) { - def acquired - try { - acquired = acquireLock(name, params) - if (acquired) { - return clos.call() - } - else { - log.warn("Unable to obtain lock for: ${name} - ${params}") - } - } - finally { - if (acquired) // Only release if we succesfully acquired a lock - releaseLock(name, params) - } - } + try { + promise.get() + } catch (Exception e) {} + } + releaseLock(name, params) + } + } + } /** * acquire lock with no extra arguments * @param name @@ -134,7 +171,7 @@ class LockService implements GroovyInterceptable { */ def asyncReleaseLock(String name, Map params) { Promises.task { - return providerDelegate.releaseLock(name, params) + return providerDelegate.releaseLock(name, params) } } @@ -185,11 +222,11 @@ class LockService implements GroovyInterceptable { } /** - * Method is used to see if a lock has already been acquired or not - * @param name - * @param params - * @return - */ + * Method is used to see if a lock has already been acquired or not + * @param name + * @param params + * @return + */ String checkLock(String name, Map params) { return providerDelegate.checkLock(name, params) } @@ -259,9 +296,9 @@ class LockService implements GroovyInterceptable { def metaMethod = this.metaClass.getMetaMethod(name, args) - if (!metaMethod) - throw new MissingMethodException(name, LockService.class, args) + if (!metaMethod) + throw new MissingMethodException(name, LockService.class, args) return metaMethod.invoke(this, args) } -} +} \ No newline at end of file