|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
|
|
/* |
|
* This file is available under and governed by the GNU General Public |
|
* License version 2 only, as published by the Free Software Foundation. |
|
* However, the following notice accompanied the original version of this |
|
* file: |
|
* |
|
* Written by Doug Lea, Bill Scherer, and Michael Scott with |
|
* assistance from members of JCP JSR-166 Expert Group and released to |
|
* the public domain, as explained at |
|
* http://creativecommons.org/publicdomain/zero/1.0/ |
|
*/ |
|
|
|
package java.util.concurrent; |
|
import java.util.concurrent.atomic.AtomicInteger; |
|
import java.util.concurrent.atomic.AtomicReference; |
|
import java.util.concurrent.locks.LockSupport; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
public class Exchanger<V> { |
|
|
|
/* |
|
* Overview: The core algorithm is, for an exchange "slot", |
|
* and a participant (caller) with an item: |
|
* |
|
* for (;;) { |
|
* if (slot is empty) { // offer |
|
* place item in a Node; |
|
* if (can CAS slot from empty to node) { |
|
* wait for release; |
|
* return matching item in node; |
|
* } |
|
* } |
|
* else if (can CAS slot from node to empty) { // release |
|
* get the item in node; |
|
* set matching item in node; |
|
* release waiting thread; |
|
* } |
|
* // else retry on CAS failure |
|
* } |
|
* |
|
* This is among the simplest forms of a "dual data structure" -- |
|
* see Scott and Scherer's DISC 04 paper and |
|
* http://www.cs.rochester.edu/research/synchronization/pseudocode/duals.html |
|
* |
|
* This works great in principle. But in practice, like many |
|
* algorithms centered on atomic updates to a single location, it |
|
* scales horribly when there are more than a few participants |
|
* using the same Exchanger. So the implementation instead uses a |
|
* form of elimination arena, that spreads out this contention by |
|
* arranging that some threads typically use different slots, |
|
* while still ensuring that eventually, any two parties will be |
|
* able to exchange items. That is, we cannot completely partition |
|
* across threads, but instead give threads arena indices that |
|
* will on average grow under contention and shrink under lack of |
|
* contention. We approach this by defining the Nodes that we need |
|
* anyway as ThreadLocals, and include in them per-thread index |
|
* and related bookkeeping state. (We can safely reuse per-thread |
|
* nodes rather than creating them fresh each time because slots |
|
* alternate between pointing to a node vs null, so cannot |
|
* encounter ABA problems. However, we do need some care in |
|
* resetting them between uses.) |
|
* |
|
* Implementing an effective arena requires allocating a bunch of |
|
* space, so we only do so upon detecting contention (except on |
|
* uniprocessors, where they wouldn't help, so aren't used). |
|
* Otherwise, exchanges use the single-slot slotExchange method. |
|
* On contention, not only must the slots be in different |
|
* locations, but the locations must not encounter memory |
|
* contention due to being on the same cache line (or more |
|
* generally, the same coherence unit). Because, as of this |
|
* writing, there is no way to determine cacheline size, we define |
|
* a value that is enough for common platforms. Additionally, |
|
* extra care elsewhere is taken to avoid other false/unintended |
|
* sharing and to enhance locality, including adding padding (via |
|
* sun.misc.Contended) to Nodes, embedding "bound" as an Exchanger |
|
* field, and reworking some park/unpark mechanics compared to |
|
* LockSupport versions. |
|
* |
|
* The arena starts out with only one used slot. We expand the |
|
* effective arena size by tracking collisions; i.e., failed CASes |
|
* while trying to exchange. By nature of the above algorithm, the |
|
* only kinds of collision that reliably indicate contention are |
|
* when two attempted releases collide -- one of two attempted |
|
* offers can legitimately fail to CAS without indicating |
|
* contention by more than one other thread. (Note: it is possible |
|
* but not worthwhile to more precisely detect contention by |
|
* reading slot values after CAS failures.) When a thread has |
|
* collided at each slot within the current arena bound, it tries |
|
* to expand the arena size by one. We track collisions within |
|
* bounds by using a version (sequence) number on the "bound" |
|
* field, and conservatively reset collision counts when a |
|
* participant notices that bound has been updated (in either |
|
* direction). |
|
* |
|
* The effective arena size is reduced (when there is more than |
|
* one slot) by giving up on waiting after a while and trying to |
|
* decrement the arena size on expiration. The value of "a while" |
|
* is an empirical matter. We implement by piggybacking on the |
|
* use of spin->yield->block that is essential for reasonable |
|
* waiting performance anyway -- in a busy exchanger, offers are |
|
* usually almost immediately released, in which case context |
|
* switching on multiprocessors is extremely slow/wasteful. Arena |
|
* waits just omit the blocking part, and instead cancel. The spin |
|
* count is empirically chosen to be a value that avoids blocking |
|
* 99% of the time under maximum sustained exchange rates on a |
|
* range of test machines. Spins and yields entail some limited |
|
* randomness (using a cheap xorshift) to avoid regular patterns |
|
* that can induce unproductive grow/shrink cycles. (Using a |
|
* pseudorandom also helps regularize spin cycle duration by |
|
* making branches unpredictable.) Also, during an offer, a |
|
* waiter can "know" that it will be released when its slot has |
|
* changed, but cannot yet proceed until match is set. In the |
|
* mean time it cannot cancel the offer, so instead spins/yields. |
|
* Note: It is possible to avoid this secondary check by changing |
|
* the linearization point to be a CAS of the match field (as done |
|
* in one case in the Scott & Scherer DISC paper), which also |
|
* increases asynchrony a bit, at the expense of poorer collision |
|
* detection and inability to always reuse per-thread nodes. So |
|
* the current scheme is typically a better tradeoff. |
|
* |
|
* On collisions, indices traverse the arena cyclically in reverse |
|
* order, restarting at the maximum index (which will tend to be |
|
* sparsest) when bounds change. (On expirations, indices instead |
|
* are halved until reaching 0.) It is possible (and has been |
|
* tried) to use randomized, prime-value-stepped, or double-hash |
|
* style traversal instead of simple cyclic traversal to reduce |
|
* bunching. But empirically, whatever benefits these may have |
|
* don't overcome their added overhead: We are managing operations |
|
* that occur very quickly unless there is sustained contention, |
|
* so simpler/faster control policies work better than more |
|
* accurate but slower ones. |
|
* |
|
* Because we use expiration for arena size control, we cannot |
|
* throw TimeoutExceptions in the timed version of the public |
|
* exchange method until the arena size has shrunken to zero (or |
|
* the arena isn't enabled). This may delay response to timeout |
|
* but is still within spec. |
|
* |
|
* Essentially all of the implementation is in methods |
|
* slotExchange and arenaExchange. These have similar overall |
|
* structure, but differ in too many details to combine. The |
|
* slotExchange method uses the single Exchanger field "slot" |
|
* rather than arena array elements. However, it still needs |
|
* minimal collision detection to trigger arena construction. |
|
* (The messiest part is making sure interrupt status and |
|
* InterruptedExceptions come out right during transitions when |
|
* both methods may be called. This is done by using null return |
|
* as a sentinel to recheck interrupt status.) |
|
* |
|
* As is too common in this sort of code, methods are monolithic |
|
* because most of the logic relies on reads of fields that are |
|
* maintained as local variables so can't be nicely factored -- |
|
* mainly, here, bulky spin->yield->block/cancel code), and |
|
* heavily dependent on intrinsics (Unsafe) to use inlined |
|
* embedded CAS and related memory access operations (that tend |
|
* not to be as readily inlined by dynamic compilers when they are |
|
* hidden behind other methods that would more nicely name and |
|
* encapsulate the intended effects). This includes the use of |
|
* putOrderedX to clear fields of the per-thread Nodes between |
|
* uses. Note that field Node.item is not declared as volatile |
|
* even though it is read by releasing threads, because they only |
|
* do so after CAS operations that must precede access, and all |
|
* uses by the owning thread are otherwise acceptably ordered by |
|
* other operations. (Because the actual points of atomicity are |
|
* slot CASes, it would also be legal for the write to Node.match |
|
* in a release to be weaker than a full volatile write. However, |
|
* this is not done because it could allow further postponement of |
|
* the write, delaying progress.) |
|
*/ |
|
|
|
|
|
|
|
|
|
*/ |
|
private static final int ASHIFT = 7; |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
private static final int MMASK = 0xff; |
|
|
|
|
|
|
|
|
|
*/ |
|
private static final int SEQ = MMASK + 1; |
|
|
|
|
|
private static final int NCPU = Runtime.getRuntime().availableProcessors(); |
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; |
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
private static final int SPINS = 1 << 10; |
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
private static final Object NULL_ITEM = new Object(); |
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
private static final Object TIMED_OUT = new Object(); |
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
@sun.misc.Contended static final class Node { |
|
int index; |
|
int bound; |
|
int collides; |
|
int hash; |
|
Object item; |
|
volatile Object match; |
|
volatile Thread parked; |
|
} |
|
|
|
|
|
static final class Participant extends ThreadLocal<Node> { |
|
public Node initialValue() { return new Node(); } |
|
} |
|
|
|
|
|
|
|
*/ |
|
private final Participant participant; |
|
|
|
|
|
|
|
|
|
*/ |
|
private volatile Node[] arena; |
|
|
|
|
|
|
|
*/ |
|
private volatile Node slot; |
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
private volatile int bound; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
private final Object arenaExchange(Object item, boolean timed, long ns) { |
|
Node[] a = arena; |
|
Node p = participant.get(); |
|
for (int i = p.index;;) { // access slot at i |
|
int b, m, c; long j; |
|
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE); |
|
if (q != null && U.compareAndSwapObject(a, j, q, null)) { |
|
Object v = q.item; |
|
q.match = item; |
|
Thread w = q.parked; |
|
if (w != null) |
|
U.unpark(w); |
|
return v; |
|
} |
|
else if (i <= (m = (b = bound) & MMASK) && q == null) { |
|
p.item = item; |
|
if (U.compareAndSwapObject(a, j, null, p)) { |
|
long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; |
|
Thread t = Thread.currentThread(); |
|
for (int h = p.hash, spins = SPINS;;) { |
|
Object v = p.match; |
|
if (v != null) { |
|
U.putOrderedObject(p, MATCH, null); |
|
p.item = null; |
|
p.hash = h; |
|
return v; |
|
} |
|
else if (spins > 0) { |
|
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; |
|
if (h == 0) |
|
h = SPINS | (int)t.getId(); |
|
else if (h < 0 && |
|
(--spins & ((SPINS >>> 1) - 1)) == 0) |
|
Thread.yield(); |
|
} |
|
else if (U.getObjectVolatile(a, j) != p) |
|
spins = SPINS; |
|
else if (!t.isInterrupted() && m == 0 && |
|
(!timed || |
|
(ns = end - System.nanoTime()) > 0L)) { |
|
U.putObject(t, BLOCKER, this); |
|
p.parked = t; |
|
if (U.getObjectVolatile(a, j) == p) |
|
U.park(false, ns); |
|
p.parked = null; |
|
U.putObject(t, BLOCKER, null); |
|
} |
|
else if (U.getObjectVolatile(a, j) == p && |
|
U.compareAndSwapObject(a, j, p, null)) { |
|
if (m != 0) |
|
U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); |
|
p.item = null; |
|
p.hash = h; |
|
i = p.index >>>= 1; |
|
if (Thread.interrupted()) |
|
return null; |
|
if (timed && m == 0 && ns <= 0L) |
|
return TIMED_OUT; |
|
break; |
|
} |
|
} |
|
} |
|
else |
|
p.item = null; |
|
} |
|
else { |
|
if (p.bound != b) { |
|
p.bound = b; |
|
p.collides = 0; |
|
i = (i != m || m == 0) ? m : m - 1; |
|
} |
|
else if ((c = p.collides) < m || m == FULL || |
|
!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { |
|
p.collides = c + 1; |
|
i = (i == 0) ? m : i - 1; |
|
} |
|
else |
|
i = m + 1; |
|
p.index = i; |
|
} |
|
} |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
private final Object slotExchange(Object item, boolean timed, long ns) { |
|
Node p = participant.get(); |
|
Thread t = Thread.currentThread(); |
|
if (t.isInterrupted()) |
|
return null; |
|
|
|
for (Node q;;) { |
|
if ((q = slot) != null) { |
|
if (U.compareAndSwapObject(this, SLOT, q, null)) { |
|
Object v = q.item; |
|
q.match = item; |
|
Thread w = q.parked; |
|
if (w != null) |
|
U.unpark(w); |
|
return v; |
|
} |
|
|
|
if (NCPU > 1 && bound == 0 && |
|
U.compareAndSwapInt(this, BOUND, 0, SEQ)) |
|
arena = new Node[(FULL + 2) << ASHIFT]; |
|
} |
|
else if (arena != null) |
|
return null; |
|
else { |
|
p.item = item; |
|
if (U.compareAndSwapObject(this, SLOT, null, p)) |
|
break; |
|
p.item = null; |
|
} |
|
} |
|
|
|
|
|
int h = p.hash; |
|
long end = timed ? System.nanoTime() + ns : 0L; |
|
int spins = (NCPU > 1) ? SPINS : 1; |
|
Object v; |
|
while ((v = p.match) == null) { |
|
if (spins > 0) { |
|
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; |
|
if (h == 0) |
|
h = SPINS | (int)t.getId(); |
|
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0) |
|
Thread.yield(); |
|
} |
|
else if (slot != p) |
|
spins = SPINS; |
|
else if (!t.isInterrupted() && arena == null && |
|
(!timed || (ns = end - System.nanoTime()) > 0L)) { |
|
U.putObject(t, BLOCKER, this); |
|
p.parked = t; |
|
if (slot == p) |
|
U.park(false, ns); |
|
p.parked = null; |
|
U.putObject(t, BLOCKER, null); |
|
} |
|
else if (U.compareAndSwapObject(this, SLOT, p, null)) { |
|
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null; |
|
break; |
|
} |
|
} |
|
U.putOrderedObject(p, MATCH, null); |
|
p.item = null; |
|
p.hash = h; |
|
return v; |
|
} |
|
|
|
|
|
|
|
*/ |
|
public Exchanger() { |
|
participant = new Participant(); |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
@SuppressWarnings("unchecked") |
|
public V exchange(V x) throws InterruptedException { |
|
Object v; |
|
Object item = (x == null) ? NULL_ITEM : x; |
|
if ((arena != null || |
|
(v = slotExchange(item, false, 0L)) == null) && |
|
((Thread.interrupted() || |
|
(v = arenaExchange(item, false, 0L)) == null))) |
|
throw new InterruptedException(); |
|
return (v == NULL_ITEM) ? null : (V)v; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
*/ |
|
@SuppressWarnings("unchecked") |
|
public V exchange(V x, long timeout, TimeUnit unit) |
|
throws InterruptedException, TimeoutException { |
|
Object v; |
|
Object item = (x == null) ? NULL_ITEM : x; |
|
long ns = unit.toNanos(timeout); |
|
if ((arena != null || |
|
(v = slotExchange(item, true, ns)) == null) && |
|
((Thread.interrupted() || |
|
(v = arenaExchange(item, true, ns)) == null))) |
|
throw new InterruptedException(); |
|
if (v == TIMED_OUT) |
|
throw new TimeoutException(); |
|
return (v == NULL_ITEM) ? null : (V)v; |
|
} |
|
|
|
|
|
private static final sun.misc.Unsafe U; |
|
private static final long BOUND; |
|
private static final long SLOT; |
|
private static final long MATCH; |
|
private static final long BLOCKER; |
|
private static final int ABASE; |
|
static { |
|
int s; |
|
try { |
|
U = sun.misc.Unsafe.getUnsafe(); |
|
Class<?> ek = Exchanger.class; |
|
Class<?> nk = Node.class; |
|
Class<?> ak = Node[].class; |
|
Class<?> tk = Thread.class; |
|
BOUND = U.objectFieldOffset |
|
(ek.getDeclaredField("bound")); |
|
SLOT = U.objectFieldOffset |
|
(ek.getDeclaredField("slot")); |
|
MATCH = U.objectFieldOffset |
|
(nk.getDeclaredField("match")); |
|
BLOCKER = U.objectFieldOffset |
|
(tk.getDeclaredField("parkBlocker")); |
|
s = U.arrayIndexScale(ak); |
|
|
|
ABASE = U.arrayBaseOffset(ak) + (1 << ASHIFT); |
|
|
|
} catch (Exception e) { |
|
throw new Error(e); |
|
} |
|
if ((s & (s-1)) != 0 || s > (1 << ASHIFT)) |
|
throw new Error("Unsupported array scale"); |
|
} |
|
|
|
} |