Skip to content

Commit b99eb58

Browse files
committed
Add new scheduler, refactor scheduler system
Rename the old ScheduledTaskThreadPool class to EDFSchedulerThreadPool and remove its deprecation. Move important scheduling methods to the new abstract Scheduler class. Move the schedulable tick abstraction to its own class, SchedulableTick. Move DEADLINE_NOT_SET to TimeUtil, where it really should have been in the first place. Add a simple interface, OSNuma, to interact with the OS's NUMA and affinity APIs. It looks like Linux will be the only supported platform (provided libnuma is installed), as the Windows API for this is atrocious. This change makes use of JNA as a new dependency. Add a new work stealing scheduler which, unlike the EDF scheduler, supports NUMA aware scheduling, intermediate task execution, and dynamic thread adjustment.
1 parent 7dabeea commit b99eb58

File tree

16 files changed

+2135
-585
lines changed

16 files changed

+2135
-585
lines changed

.github/workflows/build.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ jobs:
1212
if: ${{ github.event_name != 'pull_request' || github.repository != github.event.pull_request.head.repo.full_name }}
1313
runs-on: ubuntu-latest
1414
steps:
15-
- uses: actions/checkout@v5
15+
- uses: actions/checkout@v6
1616
- name: "jdk"
1717
uses: actions/setup-java@v5
1818
with:

build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ repositories {
1111
dependencies {
1212
implementation(libs.org.slf4j.slf4j.api)
1313
implementation(libs.it.unimi.dsi.fastutil)
14+
implementation(libs.net.java.dev.jna)
1415
}
1516

1617
tasks.jar {

gradle/libs.versions.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
[versions]
22
it-unimi-dsi-fastutil = "8.5.15"
33
org-slf4j-slf4j-api = "2.0.17"
4+
net-java-dev-jna = "5.17.0"
45

56
[libraries]
67
it-unimi-dsi-fastutil = { module = "it.unimi.dsi:fastutil", version.ref = "it-unimi-dsi-fastutil" }
78
org-slf4j-slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "org-slf4j-slf4j-api" }
9+
net-java-dev-jna = { module = "net.java.dev.jna:jna", version.ref = "net-java-dev-jna" }

gradle/wrapper/gradle-wrapper.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
distributionBase=GRADLE_USER_HOME
22
distributionPath=wrapper/dists
3-
distributionUrl=https\://services.gradle.org/distributions/gradle-9.2.0-bin.zip
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-9.2.1-bin.zip
44
networkTimeout=10000
55
validateDistributionUrl=true
66
zipStoreBase=GRADLE_USER_HOME

src/main/java/ca/spottedleaf/concurrentutil/executor/thread/BalancedPrioritisedThreadPool.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77
import ca.spottedleaf.concurrentutil.util.TimeUtil;
88
import org.slf4j.Logger;
99
import org.slf4j.LoggerFactory;
10+
import java.time.Duration;
1011
import java.util.ArrayList;
1112
import java.util.Arrays;
1213
import java.util.List;
14+
import java.util.concurrent.TimeUnit;
1315
import java.util.concurrent.atomic.AtomicLong;
1416
import java.util.function.Consumer;
1517

@@ -19,23 +21,22 @@ public final class BalancedPrioritisedThreadPool {
1921

2022
private static final Logger LOGGER = LoggerFactory.getLogger(BalancedPrioritisedThreadPool.class);
2123

22-
private final Consumer<Thread> threadModifier;
23-
2424
private final COWArrayList<OrderedStreamGroup> groups = new COWArrayList<>(OrderedStreamGroup.class);
2525
private final COWArrayList<WorkerThread> threads = new COWArrayList<>(WorkerThread.class);
2626
private final COWArrayList<WorkerThread> aliveThreads = new COWArrayList<>(WorkerThread.class);
2727

2828
private final long groupTimeSliceNS;
29+
private final Consumer<Thread> threadModifier;
2930

3031
private boolean shutdown;
3132

3233
public BalancedPrioritisedThreadPool(final long groupTimeSliceNS, final Consumer<Thread> threadModifier) {
34+
this.groupTimeSliceNS = groupTimeSliceNS;
3335
this.threadModifier = threadModifier;
3436

3537
if (threadModifier == null) {
3638
throw new NullPointerException("Thread factory may not be null");
3739
}
38-
this.groupTimeSliceNS = groupTimeSliceNS;
3940
}
4041

4142
private void wakeupIdleThread() {
@@ -103,20 +104,29 @@ public boolean joinInterruptable(final long msToWait) throws InterruptedExceptio
103104
}
104105

105106
private boolean join(final long msToWait, final boolean interruptable) throws InterruptedException {
106-
final long nsToWait = msToWait * (1000 * 1000);
107+
synchronized (this) {
108+
if (!this.shutdown) {
109+
throw new IllegalStateException("Attempting to join on non-shutdown pool");
110+
}
111+
}
112+
113+
final long nsToWait = TimeUnit.MILLISECONDS.toNanos(msToWait);
107114
final long start = System.nanoTime();
108115
final long deadline = start + nsToWait;
109116
boolean interrupted = false;
110117
try {
111118
for (final WorkerThread thread : this.aliveThreads.getArray()) {
112119
while (thread.isAlive()) {
113-
final long current = System.nanoTime();
114-
if (current - deadline >= 0L && msToWait > 0L) {
115-
return false;
116-
}
117-
118120
try {
119-
thread.join(msToWait <= 0L ? 0L : Math.max(1L, (deadline - current) / (1000 * 1000)));
121+
if (msToWait > 0L) {
122+
final long current = System.nanoTime();
123+
if (current - deadline >= 0L) {
124+
return false;
125+
}
126+
thread.join(Duration.ofNanos(deadline - current));
127+
} else {
128+
thread.join();
129+
}
120130
} catch (final InterruptedException ex) {
121131
if (interruptable) {
122132
throw ex;

src/main/java/ca/spottedleaf/concurrentutil/executor/thread/StreamOrderedThreadPool.java

Lines changed: 0 additions & 167 deletions
This file was deleted.

src/main/java/ca/spottedleaf/concurrentutil/list/COWArrayList.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,21 @@ public E[] getArray() {
1515
return this.array;
1616
}
1717

18+
public void clear() {
19+
synchronized (this) {
20+
this.array = Arrays.copyOf(this.array, 0);
21+
}
22+
}
23+
24+
public void set(final E[] array) {
25+
if (array.getClass() != this.array.getClass()) {
26+
throw new IllegalStateException();
27+
}
28+
synchronized (this) {
29+
this.array = array;
30+
}
31+
}
32+
1833
public boolean contains(final E test) {
1934
for (final E elem : this.array) {
2035
if (elem == test) {

0 commit comments

Comments
 (0)