diff options
| author | Andrii Nakryiko <andrii@kernel.org> | 2021-03-08 08:51:01 -0800 |
|---|---|---|
| committer | Andrii Nakryiko <andrii@kernel.org> | 2021-03-08 08:52:45 -0800 |
| commit | bbb41728e61a602ec76cbfec2a49ccc763d305b7 (patch) | |
| tree | af863fd112a199148f11c746556393b48d9029d7 /net | |
| parent | 299194a91451263020c73dd2a3b7e0218c88dbd0 (diff) | |
| parent | 291471dd1559528a4c2ad5026eff94ed1030562b (diff) | |
Merge branch 'load-acquire/store-release barriers for'
Björn Töpel says:
====================
This two-patch series introduces load-acquire/store-release barriers
for the AF_XDP rings.
For most contemporary architectures, this is more effective than a
SPSC ring based on smp_{r,w,}mb() barriers. More importantly,
load-acquire/store-release semantics make the ring code easier to
follow.
This is effectively the change done in commit 6c43c091bdc5
("documentation: Update circular buffer for
load-acquire/store-release"), but for the AF_XDP rings.
Both libbpf and the kernel-side are updated.
Full details are outlined in the commits!
Thanks to the LKMM-folks (Paul/Alan/Will) for helping me out in this
complicated matter!
Changelog
v1[1]->v2:
* Expanded the commit message for patch 1, and included the LKMM
litmus tests. Hopefully this clear things up. (Daniel)
* Clarified why the smp_mb()/smp_load_acquire() is not needed in (A);
control dependency with load to store. (Toke)
[1] https://lore.kernel.org/bpf/20210301104318.263262-1-bjorn.topel@gmail.com/
Thanks,
Björn
====================
Signed-off-by: Andrii Nakryiko <andrii@kernel.org>
Diffstat (limited to 'net')
| -rw-r--r-- | net/xdp/xsk_queue.h | 30 |
1 files changed, 13 insertions, 17 deletions
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 2823b7c3302d..2ac3802c2cd7 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -47,19 +47,18 @@ struct xsk_queue { u64 queue_empty_descs; }; -/* The structure of the shared state of the rings are the same as the - * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion - * ring, the kernel is the producer and user space is the consumer. For - * the Tx and fill rings, the kernel is the consumer and user space is - * the producer. +/* The structure of the shared state of the rings are a simple + * circular buffer, as outlined in + * Documentation/core-api/circular-buffers.rst. For the Rx and + * completion ring, the kernel is the producer and user space is the + * consumer. For the Tx and fill rings, the kernel is the consumer and + * user space is the producer. * * producer consumer * - * if (LOAD ->consumer) { LOAD ->producer - * (A) smp_rmb() (C) + * if (LOAD ->consumer) { (A) LOAD.acq ->producer (C) * STORE $data LOAD $data - * smp_wmb() (B) smp_mb() (D) - * STORE ->producer STORE ->consumer + * STORE.rel ->producer (B) STORE.rel ->consumer (D) * } * * (A) pairs with (D), and (B) pairs with (C). @@ -78,7 +77,8 @@ struct xsk_queue { * * (A) is a control dependency that separates the load of ->consumer * from the stores of $data. In case ->consumer indicates there is no - * room in the buffer to store $data we do not. So no barrier is needed. + * room in the buffer to store $data we do not. The dependency will + * order both of the stores after the loads. So no barrier is needed. * * (D) protects the load of the data to be observed to happen after the * store of the consumer pointer. If we did not have this memory @@ -227,15 +227,13 @@ static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q, static inline void __xskq_cons_release(struct xsk_queue *q) { - smp_mb(); /* D, matches A */ - WRITE_ONCE(q->ring->consumer, q->cached_cons); + smp_store_release(&q->ring->consumer, q->cached_cons); /* D, matchees A */ } static inline void __xskq_cons_peek(struct xsk_queue *q) { /* Refresh the local pointer */ - q->cached_prod = READ_ONCE(q->ring->producer); - smp_rmb(); /* C, matches B */ + q->cached_prod = smp_load_acquire(&q->ring->producer); /* C, matches B */ } static inline void xskq_cons_get_entries(struct xsk_queue *q) @@ -397,9 +395,7 @@ static inline int xskq_prod_reserve_desc(struct xsk_queue *q, static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx) { - smp_wmb(); /* B, matches C */ - - WRITE_ONCE(q->ring->producer, idx); + smp_store_release(&q->ring->producer, idx); /* B, matches C */ } static inline void xskq_prod_submit(struct xsk_queue *q) |
