perf: reprioritize instead of creating new thread in MdsPipeline
Some checks failed
ci/woodpecker/push/woodpecker Pipeline failed
ci/woodpecker/push/docs Pipeline was successful

This commit is contained in:
Johannes Frohnmeyer 2024-07-10 10:04:26 +02:00
parent d2c1efe63a
commit 6d063c0d7e
Signed by: Johannes
GPG Key ID: E76429612C2929F4
3 changed files with 48 additions and 26 deletions

View File

@ -36,16 +36,22 @@ public class MdsPipeline {
}
private void work(MdsThreadFactory factory, Tuple<Path, Mod> tuple, Queue<Tuple<ScanStage, ThrowingConsumer<Tuple<Path, Mod>, IOException>>> tasks, Consumer<Throwable> fail) {
if (tasks.isEmpty()) return;
Tuple<ScanStage, ThrowingConsumer<Tuple<Path, Mod>, IOException>> task = tasks.poll();
factory.newThread(MdsThreadFactory.prioritize(
task.right()
.compose(() -> tuple)
.andThen(() -> {
work(factory, tuple, tasks, fail);
}).addHandler(fail),
task.left()
)).start();
Thread we = Thread.currentThread();
while (!tasks.isEmpty()) {
Tuple<ScanStage, ThrowingConsumer<Tuple<Path, Mod>, IOException>> task = tasks.poll();
factory.reprioritize(we, task.left());
try {
Thread.sleep(0);
} catch (InterruptedException e) {
we.interrupt();
}
try {
task.right().accept(tuple);
} catch (IOException e) {
fail.accept(e);
return;
}
}
}
public List<CompletableFuture<Void>> run(MdsThreadFactory factory, Set<Path> paths, ThrowingFunction<Path, Mod, IOException> seed) {
@ -71,7 +77,7 @@ public class MdsPipeline {
Consumer<Throwable> fail = finished::completeExceptionally;
factory.newThread(MdsThreadFactory.prioritize(() -> {
work(factory, s, taskQueue, fail);
}, ScanStage.DISCOVER)).start();
}, taskQueue.peek().left())).start();
return finished;
}).toList();
}

View File

@ -24,7 +24,11 @@ public class MdsThreadFactory implements ThreadFactory {
}
public static Runnable prioritize(Runnable runnable, ScanStage stage) {
return PrioritizedRunnable.of(runnable, 5 - stage.ordinal());
return PrioritizedRunnable.of(runnable, toPriority(stage));
}
public static int toPriority(ScanStage stage) {
return 5 - stage.ordinal();
}
@Override
@ -34,21 +38,19 @@ public class MdsThreadFactory implements ThreadFactory {
public Thread newThread(@NotNull Runnable runnable, @Nullable String name) {
int priority = runnable instanceof PrioritizedRunnable pr ? pr.getPriority() : 0;
OwnedThread ownedRunnable = new OwnedThread(runnable, new Thread[1], priority, this);
OwnedThread ownedRunnable = new OwnedThread(runnable, new Thread[1], this);
Thread t = ScheduledVirtualThreadBuilder.ofVirtual(scheduler)
.name(this.namePrefix + this.threadNumber.getAndIncrement() + (name == null ? "": "-" + name))
.unstarted(ownedRunnable);
ownedRunnable.pfThread[0] = t;
ownedRunnables.add(ownedRunnable);
ScheduledVirtualThreadBuilder.setPriority(t, priority);
synchronized (focusClaim) {
ownedRunnables.add(ownedRunnable);
ScheduledVirtualThreadBuilder.setPriority(t, focusClaim.isEmpty() ? priority : priority + 5);
}
return t;
}
record OwnedThread(Runnable runnable, Thread[] pfThread, int basePriority, MdsThreadFactory pool) implements Runnable {
int priority(boolean focus) {
return focus ? basePriority + 5 : basePriority;
}
record OwnedThread(Runnable runnable, Thread[] pfThread, MdsThreadFactory pool) implements Runnable {
@Override
public void run() {
try {
@ -59,11 +61,21 @@ public class MdsThreadFactory implements ThreadFactory {
}
}
public static void reprioritize(Thread thread, int delta) {
ScheduledVirtualThreadBuilder.setPriority(thread, ScheduledVirtualThreadBuilder.getPriority(thread) + delta);
}
public void reprioritize(Thread thread, ScanStage stage) {
synchronized (focusClaim) {
ScheduledVirtualThreadBuilder.setPriority(thread, focusClaim.isEmpty() ? toPriority(stage) : toPriority(stage) + 5);
}
}
private void focus() {
ownedRunnables.forEach(r -> ScheduledVirtualThreadBuilder.setPriority(r.pfThread[0], r.priority(true)));
ownedRunnables.forEach(r -> reprioritize(r.pfThread[0], 5));
}
private void defocus() {
ownedRunnables.forEach(r -> ScheduledVirtualThreadBuilder.setPriority(r.pfThread[0], r.priority(false)));
ownedRunnables.forEach(r -> reprioritize(r.pfThread[0], -5));
}
}

View File

@ -5,9 +5,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class VoidClaimPool {
private final AtomicInteger content = new AtomicInteger(0);
private final Runnable onClaim;
private final Runnable onRelease;
private int content = 0;
public VoidClaimPool(Runnable onClaim, Runnable onRelease) {
this.onClaim = onClaim;
@ -19,21 +19,25 @@ public class VoidClaimPool {
}
public boolean isEmpty() {
return content.get() == 0;
return content == 0;
}
public class Claim implements Closeable {
private final AtomicBoolean active = new AtomicBoolean(true);
private Claim() {
if (content.getAndIncrement() == 0) onClaim.run();
synchronized (VoidClaimPool.this) {
if (content++ == 0) onClaim.run();
}
}
@Override
public void close() {
if (!active.getAndSet(false))
throw new UnsupportedOperationException("Cannot release claim that is already released");
if (content.decrementAndGet() == 0) onRelease.run();
synchronized (VoidClaimPool.this) {
if (--content == 0) onRelease.run();
}
}
}
}