Async utils
ci/woodpecker/push/woodpecker Pipeline was successful Details

This commit is contained in:
Johannes Frohnmeyer 2023-07-24 13:34:41 +02:00
parent ef507aad1b
commit 8586273904
Signed by: Johannes
GPG Key ID: E76429612C2929F4
4 changed files with 160 additions and 0 deletions

View File

@ -0,0 +1,9 @@
package io.gitlab.jfronny.commons;
import java.lang.annotation.*;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface SamWithReceiver {
}

View File

@ -0,0 +1,63 @@
package io.gitlab.jfronny.commons.concurrent;
import io.gitlab.jfronny.commons.SamWithReceiver;
import io.gitlab.jfronny.commons.ref.R;
import java.util.Objects;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.function.Consumer;
public class AsyncRequest {
private final Action action;
private final Runnable finalize;
private final AsyncRequestState state = new AsyncRequestState();
private Future<Void> future = null;
public AsyncRequest(Action action, Runnable finalize) {
this.action = Objects.requireNonNull(action);
this.finalize = Objects.requireNonNull(finalize);
}
public AsyncRequest(Consumer<Context> runnable) {
this.action = (context, callback) -> new VoidFuture(ForkJoinPool.commonPool().submit(() -> {
runnable.accept(context);
callback.run();
}));
this.finalize = R::nop;
}
public void request() {
if (state.request().shouldStart()) start();
}
private void start() {
Future<Void>[] tasks = new Future[0];
future = tasks[0] = action.schedule(new Context() {
@Override
public boolean isCancelled() {
return tasks[0].isCancelled();
}
}, () -> {
if (!tasks[0].isCancelled()) {
finalize.run();
future = null;
}
if (state.emitFinished().shouldContinue()) start();
});
}
public void cancel() {
if (future != null) future.cancel(false);
state.cancel();
}
@SamWithReceiver
interface Action {
Future<Void> schedule(Context context, Runnable callback);
}
interface Context {
boolean isCancelled();
}
}

View File

@ -0,0 +1,54 @@
package io.gitlab.jfronny.commons.concurrent;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class AsyncRequestState {
private final Lock lock = new ReentrantLock();
private boolean isRunning = false;
private boolean isScheduled = false;
public ScanFinishedResponse emitFinished() {
lock.lock();
try {
if (isScheduled) {
isScheduled = false;
return new ScanFinishedResponse(true);
} else {
isRunning = false;
return new ScanFinishedResponse(false);
}
} finally {
lock.unlock();
}
}
public RequestScanResponse request() {
lock.lock();
try {
if (isRunning) {
isScheduled = true;
return new RequestScanResponse(false);
} else {
isRunning = true;
return new RequestScanResponse(true);
}
} finally {
lock.unlock();
}
}
public void cancel() {
lock.lock();
try {
isScheduled = false;
} finally {
lock.unlock();
}
}
public record ScanFinishedResponse(boolean shouldContinue) {}
public record RequestScanResponse(boolean shouldStart) {}
}

View File

@ -0,0 +1,34 @@
package io.gitlab.jfronny.commons.concurrent;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.*;
public record VoidFuture(Future<?> future) implements Future<Void> {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return future.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return future.isCancelled();
}
@Override
public boolean isDone() {
return future.isDone();
}
@Override
public Void get() throws InterruptedException, ExecutionException {
future.get();
return null;
}
@Override
public Void get(long timeout, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
future.get(timeout, unit);
return null;
}
}