From 79ac555477879139bcfad73a96a711ade4cbd250 Mon Sep 17 00:00:00 2001 From: ASU Date: Sun, 16 Mar 2025 18:10:09 +0200 Subject: [PATCH 1/2] Added FileLockForPath class --- .../esamtrade/bucketbase/FileLockForPath.java | 159 ++++++++++++++++++ .../bucketbase/FileLockForPathTest.java | 157 +++++++++++++++++ 2 files changed, 316 insertions(+) create mode 100644 java/src/main/java/com/esamtrade/bucketbase/FileLockForPath.java create mode 100644 java/src/test/java/com/esamtrade/bucketbase/FileLockForPathTest.java diff --git a/java/src/main/java/com/esamtrade/bucketbase/FileLockForPath.java b/java/src/main/java/com/esamtrade/bucketbase/FileLockForPath.java new file mode 100644 index 0000000..adfe674 --- /dev/null +++ b/java/src/main/java/com/esamtrade/bucketbase/FileLockForPath.java @@ -0,0 +1,159 @@ +package com.esamtrade.bucketbase; + +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; + +public class FileLockForPath implements Lock, AutoCloseable { + private static final ConcurrentHashMap LOCKED_PATHS = new ConcurrentHashMap<>(); + + private final Path lockFilePath; + private FileChannel channel; + private FileLock lock; + + /** + * Creates FileLock for a Path destination by creating a lock file with the same name extended with .lock + * + * @param path The path to lock + */ + public FileLockForPath(Path path) { + this.lockFilePath = Path.of(path.toString() + ".lock"); + } + + /** + * Acquires the lock + * + * @return true if the lock was acquired, false otherwise + * @throws IOException if an I/O error occurs + */ + private boolean acquire(long timeout, TimeUnit unit) throws IOException { + long timeOutMillis = unit.toMillis(timeout); + // Check if this JVM already holds the lock for this path + Thread currentOwner = LOCKED_PATHS.get(lockFilePath); + long startTime = System.currentTimeMillis(); + if (currentOwner == Thread.currentThread()) { + // Already locked by current thread + return true; + } else if (currentOwner != null) { + if (timeout == -1) { + // Already locked by another thread in this JVM, wait indefinitely + timeOutMillis = Long.MAX_VALUE; + } + // Already locked by another thread in this JVM, wait for timeout + + while (LOCKED_PATHS.containsKey(lockFilePath)) { + if (System.currentTimeMillis() - startTime > timeOutMillis) { + return false; + } + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + } + + // Create parent directories if they don't exist + Files.createDirectories(lockFilePath.getParent()); + + // Open or create the lock file + channel = FileChannel.open( + lockFilePath, + StandardOpenOption.CREATE, + StandardOpenOption.WRITE, + StandardOpenOption.READ); + + // Try to acquire the lock + while (true) { + try { + lock = channel.tryLock(); + if (lock != null) { + LOCKED_PATHS.put(lockFilePath, Thread.currentThread()); + return true; + } + } catch (IOException e) { + // Failed to acquire lock + } + + // Check for timeout + if (System.currentTimeMillis() - startTime > timeOutMillis) { + return false; + } + + try { + Thread.sleep(100); // Wait a bit before retrying + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + } + + + @Override + public void close() throws IOException { + unlock(); + } + + @Override + public void lock() { + try { + acquire(-1, TimeUnit.MILLISECONDS); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void lockInterruptibly() throws InterruptedException { + throw new UnsupportedOperationException("Not implemented"); + } + + @Override + public boolean tryLock() { + try { + return acquire(0, TimeUnit.MILLISECONDS); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { + try { + return acquire(time, unit); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void unlock() { + LOCKED_PATHS.remove(lockFilePath); + try { + if (lock != null) { + lock.release(); + lock = null; + } + if (channel != null) { + channel.close(); + channel = null; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Condition newCondition() { + return null; + } +} \ No newline at end of file diff --git a/java/src/test/java/com/esamtrade/bucketbase/FileLockForPathTest.java b/java/src/test/java/com/esamtrade/bucketbase/FileLockForPathTest.java new file mode 100644 index 0000000..23c9eb7 --- /dev/null +++ b/java/src/test/java/com/esamtrade/bucketbase/FileLockForPathTest.java @@ -0,0 +1,157 @@ +package com.esamtrade.bucketbase; + +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +class FileLockForPathTest { + + @Test + void acquire() throws Exception { + // Create a temporary file for testing + Path tempFile = Files.createTempFile("locktest", ".tmp"); + + // Set up thread coordination objects + CountDownLatch thread1Acquired = new CountDownLatch(1); + CountDownLatch thread2Attempt = new CountDownLatch(1); + CountDownLatch thread1Released = new CountDownLatch(1); + + AtomicBoolean thread1Success = new AtomicBoolean(false); + AtomicBoolean thread2FirstAttemptSuccess = new AtomicBoolean(false); + AtomicBoolean thread2SecondAttemptSuccess = new AtomicBoolean(false); + + long DEFAULT_TIMEOUT = 5000L; + // Thread 1: First to acquire lock + Thread thread1 = new Thread(() -> { + try (FileLockForPath lock = new FileLockForPath(tempFile)) { + thread1Success.set(lock.tryLock(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS)); + thread1Acquired.countDown(); // Signal that thread1 has acquired the lock + + // Wait for thread2 to attempt acquiring the lock + thread2Attempt.await(); + + // Hold the lock for a moment + Thread.sleep(500); + + // Release the lock + lock.unlock(); + thread1Released.countDown(); // Signal that thread1 has released the lock + } catch (Exception e) { + fail("Thread 1 encountered exception: " + e); + } + }); + + // Thread 2: Try to acquire while thread1 holds lock, then try again after release + Thread thread2 = new Thread(() -> { + try { + // Wait for thread1 to acquire the lock + thread1Acquired.await(); + + // First attempt - should fail or timeout + try (FileLockForPath lock = new FileLockForPath(tempFile)) { + thread2FirstAttemptSuccess.set(lock.tryLock()); + } + thread2Attempt.countDown(); // Signal that thread2 has attempted to acquire + + // Wait for thread1 to release lock + thread1Released.await(); + + // Second attempt - should succeed + try (FileLockForPath lock = new FileLockForPath(tempFile)) { + thread2SecondAttemptSuccess.set(lock.tryLock()); + + // Hold the lock briefly + Thread.sleep(100); + } + } catch (Exception e) { + fail("Thread 2 encountered exception: " + e); + } + }); + + // Start the threads and wait for completion + thread1.start(); + thread2.start(); + thread1.join(); + thread2.join(); + + // Verify the test results + assertTrue(thread1Success.get(), "Thread 1 should acquire the lock successfully"); + assertFalse(thread2FirstAttemptSuccess.get(), "Thread 2 should fail to acquire the lock while Thread 1 holds it"); + assertTrue(thread2SecondAttemptSuccess.get(), "Thread 2 should acquire the lock after Thread 1 releases it"); + + // Clean up + Files.deleteIfExists(tempFile); + Files.deleteIfExists(Path.of(tempFile.toString() + ".lock")); + } + + @Test + void testFileLockWithTwoThreads() throws Exception { + // Create a temporary directory and test file + Path tempDir = Files.createTempDirectory("locktest"); + Path testFile = tempDir.resolve("testfile.txt"); + + // Create two instances for the same file (they use a lock file with ".lock" appended) + FileLockForPath lock1 = new FileLockForPath(testFile); + FileLockForPath lock2 = new FileLockForPath(testFile); + + AtomicBoolean thread2AcquiredAfterDelay = new AtomicBoolean(false); + CountDownLatch latch = new CountDownLatch(1); + + Thread thread1 = new Thread(() -> { + try { + // First thread acquires the lock (should succeed immediately) + lock1.lock(); + // Signal that lock1 is held so thread2 can start trying to acquire + latch.countDown(); + // Hold the lock for 500ms + Thread.sleep(500); + lock1.unlock(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + Thread thread2 = new Thread(() -> { + try { + // Wait until thread1 has acquired the lock + latch.await(); + long start = System.currentTimeMillis(); + // Retry loop: if an OverlappingFileLockException occurs (since locks in one JVM conflict), + // catch it, wait a bit and retry until the lock becomes available. + lock2.lock(); + long elapsed = System.currentTimeMillis() - start; + // We expect that thread2 didn't get the lock until after ~500ms, so allow a little slack + if (elapsed >= 400) { + thread2AcquiredAfterDelay.set(true); + } + lock2.unlock(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + thread1.start(); + thread2.start(); + thread1.join(); + thread2.join(); + + assertTrue(thread2AcquiredAfterDelay.get(), + "Thread2 should acquire the lock after thread1 releases it."); + + // Cleanup temporary directory and files + Files.walk(tempDir) + .sorted(Comparator.reverseOrder()) + .map(Path::toFile) + .forEach(File::delete); + } +} \ No newline at end of file From 37a48673298886477968dd32329d2e74c0006c9d Mon Sep 17 00:00:00 2001 From: ASU Date: Sun, 16 Mar 2025 23:37:12 +0200 Subject: [PATCH 2/2] Added constructor to S3BucketSDKv1 --- .../esamtrade/bucketbase/S3BucketSDKv1.java | 21 +++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java b/java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java index f35507e..033ba29 100644 --- a/java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java +++ b/java/src/main/java/com/esamtrade/bucketbase/S3BucketSDKv1.java @@ -36,12 +36,21 @@ public class S3BucketSDKv1 extends BaseBucket { protected String bucketName; public S3BucketSDKv1(String endpoint, String accessKey, String secretKey, String bucketName) { - BasicAWSCredentials awsCreds = new BasicAWSCredentials(accessKey, secretKey); - this.s3Client = AmazonS3ClientBuilder.standard() - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, "")) - .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) - .withPathStyleAccessEnabled(true) - .build(); + this( + AmazonS3ClientBuilder + .standard() + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration(endpoint, "")) + .withCredentials( + new AWSStaticCredentialsProvider( + new BasicAWSCredentials(accessKey, secretKey))) + .withPathStyleAccessEnabled(true) + .build(), + bucketName); + } + + public S3BucketSDKv1(AmazonS3 s3Client, String bucketName) { + this.s3Client = s3Client; this.bucketName = bucketName; }