Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 44 additions & 22 deletions src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -1254,18 +1254,17 @@ final int queueSize() {
* @throws RejectedExecutionException if array could not be resized
*/
final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a, na;
int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;
if ((a = array) != null && (cap = a.length) > 0) { // else disabled
int k = (m = cap - 1) & s;
if ((room = m - (s - b)) >= 0) {
if ((room = (m = cap - 1) - (s - b)) >= 0) {
top = s + 1;
long pos = slotOffset(k);
long pos = slotOffset(m & s);
if (!internal)
U.putReference(a, pos, task); // inside lock
else
U.getAndSetReference(a, pos, task); // fully fenced
if (room == 0 && (na = growArray(a, cap, s)) != null)
k = ((a = na).length - 1) & s; // resize
if (room == 0)
growArray(a, cap, s);
}
if (!internal)
unlockPhase();
Expand All @@ -1274,7 +1273,7 @@ final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
if (pool != null &&
(room == 0 ||
U.getReferenceAcquire(a, slotOffset(m & (s - 1))) == null))
pool.signalWork(a, k); // may have appeared empty
pool.signalWork(this, s); // may have appeared empty
}
}

Expand All @@ -1283,9 +1282,8 @@ final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {
* @param a old array
* @param cap old array capacity
* @param s current top
* @return new array, or null on failure
*/
private ForkJoinTask<?>[] growArray(ForkJoinTask<?>[] a, int cap, int s) {
private void growArray(ForkJoinTask<?>[] a, int cap, int s) {
int newCap = (cap >= 1 << 16) ? cap << 1 : cap << 2;
ForkJoinTask<?>[] newArray = null;
if (a != null && a.length == cap && cap > 0 && newCap > 0) {
Expand All @@ -1305,7 +1303,6 @@ a, slotOffset(k & mask), null)) == null)
updateArray(newArray); // fully fenced
}
}
return newArray;
}

/**
Expand Down Expand Up @@ -1434,7 +1431,31 @@ else if (U.compareAndSetReference(a, k, t, null)) {
final void topLevelExec(ForkJoinTask<?> task, int fifo) {
while (task != null) {
task.doExec();
task = (fifo != 0) ? localPoll() : localPop();
task = null;
int p = top, cap; ForkJoinTask<?>[] a;
if ((a = array) == null || (cap = a.length) <= 0)
break; // currently impossible
if (fifo == 0) { // specialized localPop
int s = p - 1; long k;
if (U.getReference(
a, k = slotOffset((cap - 1) & s)) != null &&
(task = (ForkJoinTask<?>)
U.getAndSetReference(a, k, null)) != null)
top = s;
} else { // specialized localPoll
for (int b = base; p - b > 0; ) {
int nb = b + 1;
if ((task = (ForkJoinTask<?>)U.getAndSetReference(
a, slotOffset((cap - 1) & b), null)) != null) {
base = nb;
break;
}
if (nb == p)
break;
while (b == (b = U.getIntAcquire(this, BASE)))
Thread.onSpinWait();
}
}
}
}

Expand Down Expand Up @@ -1854,9 +1875,12 @@ final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {

/**
* Releases an idle worker, or creates one if not enough exist,
* giving up if array a is nonnull and task at a[k] already taken.
* giving up q is nonull and signalled slot already taken.
*
* @param q, if nonnull, the WorkQueue containing signalled task
* @param qbase q's base index for the task
*/
final void signalWork(ForkJoinTask<?>[] a, int k) {
final void signalWork(WorkQueue q, int qbase) {
int pc = parallelism;
for (long c = ctl;;) {
WorkQueue[] qs = queues;
Expand All @@ -1878,9 +1902,7 @@ else if ((v = w) == null)
break;
else
nc = (v.stackPred & LMASK) | (c & TC_MASK) | ac;
if (a != null && k < a.length && k >= 0 && a[k] == null)
break;
if (c == (c = ctl) && c == (c = compareAndExchangeCtl(c, nc))) {
if (c == (c = compareAndExchangeCtl(c, nc))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the risk here is that this introduces more cache-coordination contention, esp. on multisocket aarch64.

if (v == null)
createWorker();
else {
Expand All @@ -1890,6 +1912,8 @@ else if ((v = w) == null)
}
break;
}
if (q != null && q.base - qbase > 0)
break;
}
}

Expand Down Expand Up @@ -1976,10 +2000,10 @@ final void runWorker(WorkQueue w) {
if ((q = qs[qid = i & (n - 1)]) != null) {
ForkJoinTask<?>[] a; int cap; // poll queue
while ((a = q.array) != null && (cap = a.length) > 0) {
int b, nb, nk; long bp; ForkJoinTask<?> t;
int b, nb; long bp; ForkJoinTask<?> t;
t = (ForkJoinTask<?>)U.getReferenceAcquire(
a, bp = slotOffset((cap - 1) & (b = q.base)));
long np = slotOffset(nk = (nb = b + 1) & (cap - 1));
long np = slotOffset((nb = b + 1) & (cap - 1));
if (q.base == b) { // else inconsistent
if (t == null) {
if (q.array == a) { // else resized
Expand All @@ -2001,13 +2025,11 @@ else if (inactive != 0) {
}
else if (U.compareAndSetReference(a, bp, t, null)) {
q.base = nb;
Object nt = U.getReferenceAcquire(a, np);
w.source = qid;
rescans = 1;
++taken;
if (nt != null && // confirm a[nk]
U.getReferenceAcquire(a, np) == nt)
signalWork(a, nk); // propagate
if (U.getReferenceAcquire(a, np) != null)
signalWork(q, nb); // propagate
w.topLevelExec(t, fifo);
}
}
Expand Down