fix(flow): resubmit prioritized runnables
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful

This commit is contained in:
Johannes Frohnmeyer 2024-06-21 08:03:38 +02:00
parent b7718fae96
commit 7e12f31062
Signed by: Johannes
GPG Key ID: E76429612C2929F4
3 changed files with 56 additions and 7 deletions

View File

@ -1,6 +1,7 @@
package io.gitlab.jfronny.commons.flow.unsafe;
import io.gitlab.jfronny.commons.flow.PrioritizedRunnable;
import io.gitlab.jfronny.commons.flow.RemovableExecutor;
import io.gitlab.jfronny.commons.throwable.Try;
import io.gitlab.jfronny.commons.unsafe.Unsafe;
import io.gitlab.jfronny.commons.unsafe.reflect.LambdaFactory;
@ -20,6 +21,7 @@ public class UnsafeScheduledVirtualThreadBuilder {
private static final Function<Executor, Thread.Builder.OfVirtual> NEW_VIRTUAL_THREAD_BUILDER;
private static final Function<ForkJoinPool, ForkJoinWorkerThread> NEW_CARRIER_THREAD;
private static final long VIRTUAL_THREAD_CONTINUATION;
private static final long VIRTUAL_THREAD_SCHEDULER;
static {
// java.lang.VirtualThread.startVirtualThread();
@ -44,8 +46,11 @@ public class UnsafeScheduledVirtualThreadBuilder {
});
NEW_VIRTUAL_THREAD_BUILDER = Try.orThrow(() -> Reflect.constructor("java.lang.ThreadBuilders$VirtualThreadBuilder", Executor.class));
NEW_CARRIER_THREAD = Try.orThrow(() -> Reflect.constructor("jdk.internal.misc.CarrierThread", ForkJoinPool.class));
Field field = Try.orThrow(() -> Reflect.getDeclaredField(Class.forName("java.lang.VirtualThread"), "runContinuation"));
VIRTUAL_THREAD_CONTINUATION = Try.orThrow(() -> Unsafe.objectFieldOffset(field));
Class<?> vtClass = Try.orThrow(() -> Class.forName("java.lang.VirtualThread"));
Field cntField = Try.orThrow(() -> Reflect.getDeclaredField(vtClass, "runContinuation"));
VIRTUAL_THREAD_CONTINUATION = Try.orThrow(() -> Unsafe.objectFieldOffset(cntField));
Field shdField = Try.orThrow(() -> Reflect.getDeclaredField(vtClass, "scheduler"));
VIRTUAL_THREAD_SCHEDULER = Try.orThrow(() -> Unsafe.objectFieldOffset(shdField));
}
@FunctionalInterface
@ -69,11 +74,18 @@ public class UnsafeScheduledVirtualThreadBuilder {
}
public static void setPriority(Thread thread, int priority) {
Unsafe.putObject(
thread,
VIRTUAL_THREAD_CONTINUATION,
PrioritizedRunnable.of(Unsafe.getObject(thread, VIRTUAL_THREAD_CONTINUATION), priority)
);
Runnable threadContinuation = Unsafe.getObject(thread, VIRTUAL_THREAD_CONTINUATION);
PrioritizedRunnable prioritizedThreadContinuation = PrioritizedRunnable.of(threadContinuation);
if (prioritizedThreadContinuation.getPriority() == priority) return;
PrioritizedRunnable newThreadContinuation = PrioritizedRunnable.of(prioritizedThreadContinuation, priority);
Unsafe.putObject(thread, VIRTUAL_THREAD_CONTINUATION, newThreadContinuation);
Executor scheduler = Unsafe.getObject(thread, VIRTUAL_THREAD_SCHEDULER);
RemovableExecutor removableScheduler = RemovableExecutor.of(scheduler);
if (removableScheduler != null) {
if (removableScheduler.remove(threadContinuation) || removableScheduler.remove(prioritizedThreadContinuation)) {
removableScheduler.execute(newThreadContinuation);
}
}
}
public static int getPriority(Thread thread) {

View File

@ -0,0 +1,19 @@
package io.gitlab.jfronny.commons.flow;
import io.gitlab.jfronny.commons.flow.impl.RemovableThreadPoolExecutorWrapper;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
public interface RemovableExecutor extends Executor {
static @Nullable RemovableExecutor of(Executor executor) {
return switch (executor) {
case RemovableExecutor xt -> xt;
case ThreadPoolExecutor tp -> new RemovableThreadPoolExecutorWrapper(tp);
case null, default -> null;
};
}
boolean remove(Runnable runnable);
}

View File

@ -0,0 +1,18 @@
package io.gitlab.jfronny.commons.flow.impl;
import io.gitlab.jfronny.commons.flow.RemovableExecutor;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.ThreadPoolExecutor;
public record RemovableThreadPoolExecutorWrapper(ThreadPoolExecutor tpr) implements RemovableExecutor {
@Override
public boolean remove(Runnable runnable) {
return tpr.remove(runnable);
}
@Override
public void execute(@NotNull Runnable command) {
tpr.execute(command);
}
}