summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
authorAlexei Starovoitov <ast@kernel.org>2023-07-19 09:56:51 -0700
committerAlexei Starovoitov <ast@kernel.org>2023-07-19 09:56:51 -0700
commit3226e3139dfe02d5892562976a649a54ada12a13 (patch)
treea88c7e4e4a7561ed8c3987fa627481c2e3d2903e /net
parent492e797fdab25f2d8eb1b6bb3236f4aac474f878 (diff)
parent3666bccab43a33904372eaa26936dabef3232497 (diff)
Merge branch 'xsk-multi-buffer-support'
Maciej Fijalkowski says: ==================== xsk: multi-buffer support v6->v7: - rebase...[Alexei] v5->v6: - update bpf_xdp_query_opts__last_field in patch 10 [Alexei] v4->v5: - align options argument size to match options from xdp_desc [Benjamin] - cleanup skb from xdp_sock on socket termination [Toke] - introduce new netlink attribute for letting user space know about Tx frag limit; this substitutes xdp_features flag previously dedicated for setting ZC multi-buffer support [Toke, Jakub] - include i40e ZC multi-buffer support - enable TOO_MANY_FRAGS for ZC on xskxceiver; this is now possible due to netlink attribute mentioned two bullets above v3->v4: -rely on ynl for adding new xdp_features flag [Jakub] - move xskb_list to xsk_buff_pool v2->v3: - Fix issue with the next valid packet getting dropped after an invalid packet with MAX_SKB_FRAGS + 1 frags [Magnus] - query NETDEV_XDP_ACT_ZC_SG flag within xskxceiver and act on it - remove redundant include in xsk.c [kernel test robot] - s/NETDEV_XDP_ACT_NDO_ZC_SG/NETDEV_XDP_ACT_ZC_SG + kernel doc [Magnus, Simon] v1->v2: - fix spelling issues in commit messages [Simon] - remove XSK_DESC_MAX_FRAGS, use MAX_SKB_FRAGS instead [Stan, Alexei] - add documentation patch - fix build error from kernel test robot on patch 10 This series of patches add multi-buffer support for AF_XDP. XDP and various NIC drivers already have support for multi-buffer packets. With this patch set, programs using AF_XDP sockets can now also receive and transmit multi-buffer packets both in copy as well as zero-copy mode. ZC multi-buffer implementation is based on ice driver. Some definitions to put us all on the same page: * A packet consists of one or more frames * A descriptor in one of the AF_XDP rings always refers to a single frame. In the case the packet consists of a single frame, the descriptor refers to the whole packet. To represent a packet consisting of multiple frames, we introduce a new flag called XDP_PKT_CONTD in the options field of the Rx and Tx descriptors. If it is true (1) the packet continues with the next descriptor and if it is false (0) it means this is the last descriptor of the packet. Why the reverse logic of end-of-packet (eop) flag found in many NICs? Just to preserve compatibility with non-multi-buffer applications that have this bit set to false for all packets on Rx, and the apps set the options field to zero for Tx, as anything else will be treated as an invalid descriptor. These are the semantics for producing packets onto XSK Tx ring consisting of multiple frames: * When an invalid descriptor is found, all the other descriptors/frames of this packet are marked as invalid and not completed. The next descriptor is treated as the start of a new packet, even if this was not the intent (because we cannot guess the intent). As before, if your program is producing invalid descriptors you have a bug that must be fixed. * Zero length descriptors are treated as invalid descriptors. * For copy mode, the maximum supported number of frames in a packet is equal to CONFIG_MAX_SKB_FRAGS + 1. If it is exceeded, all descriptors accumulated so far are dropped and treated as invalid. To produce an application that will work on any system regardless of this config setting, limit the number of frags to 18, as the minimum value of the config is 17. * For zero-copy mode, the limit is up to what the NIC HW supports. User space can discover this via newly introduced NETDEV_A_DEV_XDP_ZC_MAX_SEGS netlink attribute. Here is an example Tx path pseudo-code (using libxdp interfaces for simplicity) ignoring that the umem is finite in size, and that we eventually will run out of packets to send. Also assumes pkts.addr points to a valid location in the umem. void tx_packets(struct xsk_socket_info *xsk, struct pkt *pkts, int batch_size) { u32 idx, i, pkt_nb = 0; xsk_ring_prod__reserve(&xsk->tx, batch_size, &idx); for (i = 0; i < batch_size;) { u64 addr = pkts[pkt_nb].addr; u32 len = pkts[pkt_nb].size; do { struct xdp_desc *tx_desc; tx_desc = xsk_ring_prod__tx_desc(&xsk->tx, idx + i++); tx_desc->addr = addr; if (len > xsk_frame_size) { tx_desc->len = xsk_frame_size; tx_desc->options |= XDP_PKT_CONTD; } else { tx_desc->len = len; tx_desc->options = 0; pkt_nb++; } len -= tx_desc->len; addr += xsk_frame_size; if (i == batch_size) { /* Remember len, addr, pkt_nb for next * iteration. Skipped for simplicity. */ break; } } while (len); } xsk_ring_prod__submit(&xsk->tx, i); } On the Rx path in copy mode, the xsk core copies the XDP data into multiple descriptors, if needed, and sets the XDP_PKT_CONTD flag as detailed before. Zero-copy mode in order to avoid the copies has to maintain a chain of xdp_buff_xsk structs that represent whole packet. This is because what actually is redirected is the xdp_buff and we currently have no equivalent mechanism that is used for copy mode (embedded skb_shared_info in xdp_buff) to carry the frags. This means xdp_buff_xsk grows in size but these members are at the end and should not be touched when data path is not dealing with fragmented packets. This solution kept us within assumed performance impact, hence we decided to proceed with it. When the application gets a descriptor with the XDP_PKT_CONTD flag set to one, it means that the packet consists of multiple buffers and it continues with the next buffer in the following descriptor. When a descriptor with XDP_PKT_CONTD == 0 is received, it means that this is the last buffer of the packet. AF_XDP guarantees that only a complete packet (all frames in the packet) is sent to the application. If application reads a batch of descriptors, using for example the libxdp interfaces, it is not guaranteed that the batch will end with a full packet. It might end in the middle of a packet and the rest of the buffers of that packet will arrive at the beginning of the next batch, since the libxdp interface does not read the whole ring (unless you have an enormous batch size or a very small ring size). Here is a simple Rx path pseudo-code example (using libxdp interfaces for simplicity). Error paths have been excluded for simplicity: void rx_packets(struct xsk_socket_info *xsk) { static bool new_packet = true; u32 idx_rx = 0, idx_fq = 0; static char *pkt; int rcvd = xsk_ring_cons__peek(&xsk->rx, opt_batch_size, &idx_rx); xsk_ring_prod__reserve(&xsk->umem->fq, rcvd, &idx_fq); for (int i = 0; i < rcvd; i++) { struct xdp_desc *desc = xsk_ring_cons__rx_desc(&xsk->rx, idx_rx++); char *frag = xsk_umem__get_data(xsk->umem->buffer, desc->addr); bool eop = !(desc->options & XDP_PKT_CONTD); if (new_packet) pkt = frag; else add_frag_to_pkt(pkt, frag); if (eop) process_pkt(pkt); new_packet = eop; *xsk_ring_prod__fill_addr(&xsk->umem->fq, idx_fq++) = desc->addr; } xsk_ring_prod__submit(&xsk->umem->fq, rcvd); xsk_ring_cons__release(&xsk->rx, rcvd); } We had to introduce a new bind flag (XDP_USE_SG) on the AF_XDP level to enable multi-buffer support. The reason we need to differentiate between non multi-buffer and multi-buffer is the behaviour when the kernel gets a packet that is larger than the frame size. Without multi-buffer, this packet is dropped and marked in the stats. With multi-buffer on, we want to split it up into multiple frames instead. At the start, we thought that riding on the .frags section name of the XDP program was a good idea. You do not have to introduce yet another flag and all AF_XDP users must load an XDP program anyway to get any traffic up to the socket, so why not just say that the XDP program decides if the AF_XDP socket should get multi-buffer packets or not? The problem is that we can create an AF_XDP socket that is Tx only and that works without having to load an XDP program at all. Another problem is that the XDP program might change during the execution, so we would have to check this for every single packet. Here is the observed throughput when compared to a codebase without any multi-buffer changes and measured with xdpsock for 64B packets. Apparently ZC Tx takes a hit from explicit zero length descriptors validation. Overall, in terms of ZC performance, there is a room for improvement, but for now we think this work is in a good shape in terms of correctness and functionality. We were targetting for up to 5% overhead though. Note that ZC performance drops come from core + driver support being combined, whereas copy mode had already driver support in place. Mode rxdrop l2fwd txonly ice-zc -4% -7% -6% i40e-zc -7% -6% -7% drv -1.2% 0% +2% skb -0.6% -1% +2% Thank you, Tirthendu, Magnus and Maciej ==================== Link: https://lore.kernel.org/r/20230719132421.584801-1-maciej.fijalkowski@intel.com Signed-off-by: Alexei Starovoitov <ast@kernel.org>
Diffstat (limited to 'net')
-rw-r--r--net/core/dev.c1
-rw-r--r--net/core/filter.c7
-rw-r--r--net/core/netdev-genl.c8
-rw-r--r--net/xdp/xsk.c365
-rw-r--r--net/xdp/xsk_buff_pool.c7
-rw-r--r--net/xdp/xsk_queue.h95
6 files changed, 363 insertions, 120 deletions
diff --git a/net/core/dev.c b/net/core/dev.c
index d6e1b786c5c5..dd4f114a7cbf 100644
--- a/net/core/dev.c
+++ b/net/core/dev.c
@@ -10613,6 +10613,7 @@ struct net_device *alloc_netdev_mqs(int sizeof_priv, const char *name,
dev_net_set(dev, &init_net);
dev->gso_max_size = GSO_LEGACY_MAX_SIZE;
+ dev->xdp_zc_max_segs = 1;
dev->gso_max_segs = GSO_MAX_SEGS;
dev->gro_max_size = GRO_LEGACY_MAX_SIZE;
dev->gso_ipv4_max_size = GSO_LEGACY_MAX_SIZE;
diff --git a/net/core/filter.c b/net/core/filter.c
index 06ba0e56e369..b4410dc841a0 100644
--- a/net/core/filter.c
+++ b/net/core/filter.c
@@ -4345,13 +4345,8 @@ int xdp_do_redirect(struct net_device *dev, struct xdp_buff *xdp,
struct bpf_redirect_info *ri = this_cpu_ptr(&bpf_redirect_info);
enum bpf_map_type map_type = ri->map_type;
- if (map_type == BPF_MAP_TYPE_XSKMAP) {
- /* XDP_REDIRECT is not supported AF_XDP yet. */
- if (unlikely(xdp_buff_has_frags(xdp)))
- return -EOPNOTSUPP;
-
+ if (map_type == BPF_MAP_TYPE_XSKMAP)
return __xdp_do_redirect_xsk(ri, dev, xdp, xdp_prog);
- }
return __xdp_do_redirect_frame(ri, dev, xdp_convert_buff_to_frame(xdp),
xdp_prog);
diff --git a/net/core/netdev-genl.c b/net/core/netdev-genl.c
index a4270fafdf11..65ef4867fc49 100644
--- a/net/core/netdev-genl.c
+++ b/net/core/netdev-genl.c
@@ -25,6 +25,14 @@ netdev_nl_dev_fill(struct net_device *netdev, struct sk_buff *rsp,
return -EINVAL;
}
+ if (netdev->xdp_features & NETDEV_XDP_ACT_XSK_ZEROCOPY) {
+ if (nla_put_u32(rsp, NETDEV_A_DEV_XDP_ZC_MAX_SEGS,
+ netdev->xdp_zc_max_segs)) {
+ genlmsg_cancel(rsp, hdr);
+ return -EINVAL;
+ }
+ }
+
genlmsg_end(rsp, hdr);
return 0;
diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index 31dca4ecb2c5..4f1e0599146e 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -135,14 +135,14 @@ int xsk_reg_pool_at_qid(struct net_device *dev, struct xsk_buff_pool *pool,
return 0;
}
-static int __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
+static int __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff_xsk *xskb, u32 len,
+ u32 flags)
{
- struct xdp_buff_xsk *xskb = container_of(xdp, struct xdp_buff_xsk, xdp);
u64 addr;
int err;
addr = xp_get_handle(xskb);
- err = xskq_prod_reserve_desc(xs->rx, addr, len);
+ err = xskq_prod_reserve_desc(xs->rx, addr, len, flags);
if (err) {
xs->rx_queue_full++;
return err;
@@ -152,48 +152,138 @@ static int __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
return 0;
}
-static void xsk_copy_xdp(struct xdp_buff *to, struct xdp_buff *from, u32 len)
+static int xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
{
- void *from_buf, *to_buf;
- u32 metalen;
+ struct xdp_buff_xsk *xskb = container_of(xdp, struct xdp_buff_xsk, xdp);
+ u32 frags = xdp_buff_has_frags(xdp);
+ struct xdp_buff_xsk *pos, *tmp;
+ struct list_head *xskb_list;
+ u32 contd = 0;
+ int err;
- if (unlikely(xdp_data_meta_unsupported(from))) {
- from_buf = from->data;
- to_buf = to->data;
- metalen = 0;
- } else {
- from_buf = from->data_meta;
- metalen = from->data - from->data_meta;
- to_buf = to->data - metalen;
+ if (frags)
+ contd = XDP_PKT_CONTD;
+
+ err = __xsk_rcv_zc(xs, xskb, len, contd);
+ if (err || likely(!frags))
+ goto out;
+
+ xskb_list = &xskb->pool->xskb_list;
+ list_for_each_entry_safe(pos, tmp, xskb_list, xskb_list_node) {
+ if (list_is_singular(xskb_list))
+ contd = 0;
+ len = pos->xdp.data_end - pos->xdp.data;
+ err = __xsk_rcv_zc(xs, pos, len, contd);
+ if (err)
+ return err;
+ list_del(&pos->xskb_list_node);
}
- memcpy(to_buf, from_buf, len + metalen);
+out:
+ return err;
}
-static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
+static void *xsk_copy_xdp_start(struct xdp_buff *from)
{
+ if (unlikely(xdp_data_meta_unsupported(from)))
+ return from->data;
+ else
+ return from->data_meta;
+}
+
+static u32 xsk_copy_xdp(void *to, void **from, u32 to_len,
+ u32 *from_len, skb_frag_t **frag, u32 rem)
+{
+ u32 copied = 0;
+
+ while (1) {
+ u32 copy_len = min_t(u32, *from_len, to_len);
+
+ memcpy(to, *from, copy_len);
+ copied += copy_len;
+ if (rem == copied)
+ return copied;
+
+ if (*from_len == copy_len) {
+ *from = skb_frag_address(*frag);
+ *from_len = skb_frag_size((*frag)++);
+ } else {
+ *from += copy_len;
+ *from_len -= copy_len;
+ }
+ if (to_len == copy_len)
+ return copied;
+
+ to_len -= copy_len;
+ to += copy_len;
+ }
+}
+
+static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
+{
+ u32 frame_size = xsk_pool_get_rx_frame_size(xs->pool);
+ void *copy_from = xsk_copy_xdp_start(xdp), *copy_to;
+ u32 from_len, meta_len, rem, num_desc;
+ struct xdp_buff_xsk *xskb;
struct xdp_buff *xsk_xdp;
- int err;
- u32 len;
+ skb_frag_t *frag;
- len = xdp->data_end - xdp->data;
- if (len > xsk_pool_get_rx_frame_size(xs->pool)) {
- xs->rx_dropped++;
- return -ENOSPC;
+ from_len = xdp->data_end - copy_from;
+ meta_len = xdp->data - copy_from;
+ rem = len + meta_len;
+
+ if (len <= frame_size && !xdp_buff_has_frags(xdp)) {
+ int err;
+
+ xsk_xdp = xsk_buff_alloc(xs->pool);
+ if (!xsk_xdp) {
+ xs->rx_dropped++;
+ return -ENOMEM;
+ }
+ memcpy(xsk_xdp->data - meta_len, copy_from, rem);
+ xskb = container_of(xsk_xdp, struct xdp_buff_xsk, xdp);
+ err = __xsk_rcv_zc(xs, xskb, len, 0);
+ if (err) {
+ xsk_buff_free(xsk_xdp);
+ return err;
+ }
+
+ return 0;
}
- xsk_xdp = xsk_buff_alloc(xs->pool);
- if (!xsk_xdp) {
+ num_desc = (len - 1) / frame_size + 1;
+
+ if (!xsk_buff_can_alloc(xs->pool, num_desc)) {
xs->rx_dropped++;
return -ENOMEM;
}
+ if (xskq_prod_nb_free(xs->rx, num_desc) < num_desc) {
+ xs->rx_queue_full++;
+ return -ENOBUFS;
+ }
- xsk_copy_xdp(xsk_xdp, xdp, len);
- err = __xsk_rcv_zc(xs, xsk_xdp, len);
- if (err) {
- xsk_buff_free(xsk_xdp);
- return err;
+ if (xdp_buff_has_frags(xdp)) {
+ struct skb_shared_info *sinfo;
+
+ sinfo = xdp_get_shared_info_from_buff(xdp);
+ frag = &sinfo->frags[0];
}
+
+ do {
+ u32 to_len = frame_size + meta_len;
+ u32 copied;
+
+ xsk_xdp = xsk_buff_alloc(xs->pool);
+ copy_to = xsk_xdp->data - meta_len;
+
+ copied = xsk_copy_xdp(copy_to, &copy_from, to_len, &from_len, &frag, rem);
+ rem -= copied;
+
+ xskb = container_of(xsk_xdp, struct xdp_buff_xsk, xdp);
+ __xsk_rcv_zc(xs, xskb, copied - meta_len, rem ? XDP_PKT_CONTD : 0);
+ meta_len = 0;
+ } while (rem);
+
return 0;
}
@@ -215,7 +305,7 @@ static bool xsk_is_bound(struct xdp_sock *xs)
return false;
}
-static int xsk_rcv_check(struct xdp_sock *xs, struct xdp_buff *xdp)
+static int xsk_rcv_check(struct xdp_sock *xs, struct xdp_buff *xdp, u32 len)
{
if (!xsk_is_bound(xs))
return -ENXIO;
@@ -223,6 +313,11 @@ static int xsk_rcv_check(struct xdp_sock *xs, struct xdp_buff *xdp)
if (xs->dev != xdp->rxq->dev || xs->queue_id != xdp->rxq->queue_index)
return -EINVAL;
+ if (len > xsk_pool_get_rx_frame_size(xs->pool) && !xs->sg) {
+ xs->rx_dropped++;
+ return -ENOSPC;
+ }
+
sk_mark_napi_id_once_xdp(&xs->sk, xdp);
return 0;
}
@@ -236,12 +331,13 @@ static void xsk_flush(struct xdp_sock *xs)
int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
{
+ u32 len = xdp_get_buff_len(xdp);
int err;
spin_lock_bh(&xs->rx_lock);
- err = xsk_rcv_check(xs, xdp);
+ err = xsk_rcv_check(xs, xdp, len);
if (!err) {
- err = __xsk_rcv(xs, xdp);
+ err = __xsk_rcv(xs, xdp, len);
xsk_flush(xs);
}
spin_unlock_bh(&xs->rx_lock);
@@ -250,19 +346,19 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
static int xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
{
+ u32 len = xdp_get_buff_len(xdp);
int err;
- u32 len;
- err = xsk_rcv_check(xs, xdp);
+ err = xsk_rcv_check(xs, xdp, len);
if (err)
return err;
if (xdp->rxq->mem.type == MEM_TYPE_XSK_BUFF_POOL) {
len = xdp->data_end - xdp->data;
- return __xsk_rcv_zc(xs, xdp, len);
+ return xsk_rcv_zc(xs, xdp, len);
}
- err = __xsk_rcv(xs, xdp);
+ err = __xsk_rcv(xs, xdp, len);
if (!err)
xdp_return_buff(xdp);
return err;
@@ -321,7 +417,8 @@ bool xsk_tx_peek_desc(struct xsk_buff_pool *pool, struct xdp_desc *desc)
rcu_read_lock();
list_for_each_entry_rcu(xs, &pool->xsk_tx_list, tx_list) {
if (!xskq_cons_peek_desc(xs->tx, desc, pool)) {
- xs->tx->queue_empty_descs++;
+ if (xskq_has_descs(xs->tx))
+ xskq_cons_release(xs->tx);
continue;
}
@@ -408,37 +505,91 @@ static int xsk_wakeup(struct xdp_sock *xs, u8 flags)
return dev->netdev_ops->ndo_xsk_wakeup(dev, xs->queue_id, flags);
}
-static void xsk_destruct_skb(struct sk_buff *skb)
+static int xsk_cq_reserve_addr_locked(struct xdp_sock *xs, u64 addr)
+{
+ unsigned long flags;
+ int ret;
+
+ spin_lock_irqsave(&xs->pool->cq_lock, flags);
+ ret = xskq_prod_reserve_addr(xs->pool->cq, addr);
+ spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
+
+ return ret;
+}
+
+static void xsk_cq_submit_locked(struct xdp_sock *xs, u32 n)
{
- u64 addr = (u64)(long)skb_shinfo(skb)->destructor_arg;
- struct xdp_sock *xs = xdp_sk(skb->sk);
unsigned long flags;
spin_lock_irqsave(&xs->pool->cq_lock, flags);
- xskq_prod_submit_addr(xs->pool->cq, addr);
+ xskq_prod_submit_n(xs->pool->cq, n);
spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
+}
+
+static void xsk_cq_cancel_locked(struct xdp_sock *xs, u32 n)
+{
+ unsigned long flags;
+ spin_lock_irqsave(&xs->pool->cq_lock, flags);
+ xskq_prod_cancel_n(xs->pool->cq, n);
+ spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
+}
+
+static u32 xsk_get_num_desc(struct sk_buff *skb)
+{
+ return skb ? (long)skb_shinfo(skb)->destructor_arg : 0;
+}
+
+static void xsk_destruct_skb(struct sk_buff *skb)
+{
+ xsk_cq_submit_locked(xdp_sk(skb->sk), xsk_get_num_desc(skb));
sock_wfree(skb);
}
+static void xsk_set_destructor_arg(struct sk_buff *skb)
+{
+ long num = xsk_get_num_desc(xdp_sk(skb->sk)->skb) + 1;
+
+ skb_shinfo(skb)->destructor_arg = (void *)num;
+}
+
+static void xsk_consume_skb(struct sk_buff *skb)
+{
+ struct xdp_sock *xs = xdp_sk(skb->sk);
+
+ skb->destructor = sock_wfree;
+ xsk_cq_cancel_locked(xs, xsk_get_num_desc(skb));
+ /* Free skb without triggering the perf drop trace */
+ consume_skb(skb);
+ xs->skb = NULL;
+}
+
+static void xsk_drop_skb(struct sk_buff *skb)
+{
+ xdp_sk(skb->sk)->tx->invalid_descs += xsk_get_num_desc(skb);
+ xsk_consume_skb(skb);
+}
+
static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
struct xdp_desc *desc)
{
struct xsk_buff_pool *pool = xs->pool;
u32 hr, len, ts, offset, copy, copied;
- struct sk_buff *skb;
+ struct sk_buff *skb = xs->skb;
struct page *page;
void *buffer;
int err, i;
u64 addr;
- hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(xs->dev->needed_headroom));
+ if (!skb) {
+ hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(xs->dev->needed_headroom));
- skb = sock_alloc_send_skb(&xs->sk, hr, 1, &err);
- if (unlikely(!skb))
- return ERR_PTR(err);
+ skb = sock_alloc_send_skb(&xs->sk, hr, 1, &err);
+ if (unlikely(!skb))
+ return ERR_PTR(err);
- skb_reserve(skb, hr);
+ skb_reserve(skb, hr);
+ }
addr = desc->addr;
len = desc->len;
@@ -448,7 +599,10 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
offset = offset_in_page(buffer);
addr = buffer - pool->addrs;
- for (copied = 0, i = 0; copied < len; i++) {
+ for (copied = 0, i = skb_shinfo(skb)->nr_frags; copied < len; i++) {
+ if (unlikely(i >= MAX_SKB_FRAGS))
+ return ERR_PTR(-EFAULT);
+
page = pool->umem->pgs[addr >> PAGE_SHIFT];
get_page(page);
@@ -473,43 +627,77 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
struct xdp_desc *desc)
{
struct net_device *dev = xs->dev;
- struct sk_buff *skb;
+ struct sk_buff *skb = xs->skb;
+ int err;
if (dev->priv_flags & IFF_TX_SKB_NO_LINEAR) {
skb = xsk_build_skb_zerocopy(xs, desc);
- if (IS_ERR(skb))
- return skb;
+ if (IS_ERR(skb)) {
+ err = PTR_ERR(skb);
+ goto free_err;
+ }
} else {
u32 hr, tr, len;
void *buffer;
- int err;
- hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(dev->needed_headroom));
- tr = dev->needed_tailroom;
+ buffer = xsk_buff_raw_get_data(xs->pool, desc->addr);
len = desc->len;
- skb = sock_alloc_send_skb(&xs->sk, hr + len + tr, 1, &err);
- if (unlikely(!skb))
- return ERR_PTR(err);
+ if (!skb) {
+ hr = max(NET_SKB_PAD, L1_CACHE_ALIGN(dev->needed_headroom));
+ tr = dev->needed_tailroom;
+ skb = sock_alloc_send_skb(&xs->sk, hr + len + tr, 1, &err);
+ if (unlikely(!skb))
+ goto free_err;
- skb_reserve(skb, hr);
- skb_put(skb, len);
+ skb_reserve(skb, hr);
+ skb_put(skb, len);
- buffer = xsk_buff_raw_get_data(xs->pool, desc->addr);
- err = skb_store_bits(skb, 0, buffer, len);
- if (unlikely(err)) {
- kfree_skb(skb);
- return ERR_PTR(err);
+ err = skb_store_bits(skb, 0, buffer, len);
+ if (unlikely(err))
+ goto free_err;
+ } else {
+ int nr_frags = skb_shinfo(skb)->nr_frags;
+ struct page *page;
+ u8 *vaddr;
+
+ if (unlikely(nr_frags == (MAX_SKB_FRAGS - 1) && xp_mb_desc(desc))) {
+ err = -EFAULT;
+ goto free_err;
+ }
+
+ page = alloc_page(xs->sk.sk_allocation);
+ if (unlikely(!page)) {
+ err = -EAGAIN;
+ goto free_err;
+ }
+
+ vaddr = kmap_local_page(page);
+ memcpy(vaddr, buffer, len);
+ kunmap_local(vaddr);
+
+ skb_add_rx_frag(skb, nr_frags, page, 0, len, 0);
}
}
skb->dev = dev;
skb->priority = xs->sk.sk_priority;
skb->mark = xs->sk.sk_mark;
- skb_shinfo(skb)->destructor_arg = (void *)(long)desc->addr;
skb->destructor = xsk_destruct_skb;
+ xsk_set_destructor_arg(skb);
return skb;
+
+free_err:
+ if (err == -EAGAIN) {
+ xsk_cq_cancel_locked(xs, 1);
+ } else {
+ xsk_set_destructor_arg(skb);
+ xsk_drop_skb(skb);
+ xskq_cons_release(xs->tx);
+ }
+
+ return ERR_PTR(err);
}
static int __xsk_generic_xmit(struct sock *sk)
@@ -519,7 +707,6 @@ static int __xsk_generic_xmit(struct sock *sk)
bool sent_frame = false;
struct xdp_desc desc;
struct sk_buff *skb;
- unsigned long flags;
int err = 0;
mutex_lock(&xs->mutex);
@@ -544,47 +731,51 @@ static int __xsk_generic_xmit(struct sock *sk)
* if there is space in it. This avoids having to implement
* any buffering in the Tx path.
*/
- spin_lock_irqsave(&xs->pool->cq_lock, flags);
- if (xskq_prod_reserve(xs->pool->cq)) {
- spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
+ if (xsk_cq_reserve_addr_locked(xs, desc.addr))
goto out;
- }
- spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
skb = xsk_build_skb(xs, &desc);
if (IS_ERR(skb)) {
err = PTR_ERR(skb);
- spin_lock_irqsave(&xs->pool->cq_lock, flags);
- xskq_prod_cancel(xs->pool->cq);
- spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
- goto out;
+ if (err == -EAGAIN)
+ goto out;
+ err = 0;
+ continue;
+ }
+
+ xskq_cons_release(xs->tx);
+
+ if (xp_mb_desc(&desc)) {
+ xs->skb = skb;
+ continue;
}
err = __dev_direct_xmit(skb, xs->queue_id);
if (err == NETDEV_TX_BUSY) {
/* Tell user-space to retry the send */
- skb->destructor = sock_wfree;
- spin_lock_irqsave(&xs->pool->cq_lock, flags);
- xskq_prod_cancel(xs->pool->cq);
- spin_unlock_irqrestore(&xs->pool->cq_lock, flags);
- /* Free skb without triggering the perf drop trace */
- consume_skb(skb);
+ xskq_cons_cancel_n(xs->tx, xsk_get_num_desc(skb));
+ xsk_consume_skb(skb);
err = -EAGAIN;
goto out;
}
- xskq_cons_release(xs->tx);
/* Ignore NET_XMIT_CN as packet might have been sent */
if (err == NET_XMIT_DROP) {
/* SKB completed but not sent */
err = -EBUSY;
+ xs->skb = NULL;
goto out;
}
sent_frame = true;
+ xs->skb = NULL;
}
- xs->tx->queue_empty_descs++;
+ if (xskq_has_descs(xs->tx)) {
+ if (xs->skb)
+ xsk_drop_skb(xs->skb);
+ xskq_cons_release(xs->tx);
+ }
out:
if (sent_frame)
@@ -834,6 +1025,9 @@ static int xsk_release(struct socket *sock)
net = sock_net(sk);
+ if (xs->skb)
+ xsk_drop_skb(xs->skb);
+
mutex_lock(&net->xdp.lock);
sk_del_node_init_rcu(sk);
mutex_unlock(&net->xdp.lock);
@@ -897,7 +1091,7 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
flags = sxdp->sxdp_flags;
if (flags & ~(XDP_SHARED_UMEM | XDP_COPY | XDP_ZEROCOPY |
- XDP_USE_NEED_WAKEUP))
+ XDP_USE_NEED_WAKEUP | XDP_USE_SG))
return -EINVAL;
bound_dev_if = READ_ONCE(sk->sk_bound_dev_if);
@@ -929,7 +1123,7 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
struct socket *sock;
if ((flags & XDP_COPY) || (flags & XDP_ZEROCOPY) ||
- (flags & XDP_USE_NEED_WAKEUP)) {
+ (flags & XDP_USE_NEED_WAKEUP) || (flags & XDP_USE_SG)) {
/* Cannot specify flags for shared sockets. */
err = -EINVAL;
goto out_unlock;
@@ -1028,6 +1222,7 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
xs->dev = dev;
xs->zc = xs->umem->zc;
+ xs->sg = !!(flags & XDP_USE_SG);
xs->queue_id = qid;
xp_add_xsk(xs->pool, xs);
diff --git a/net/xdp/xsk_buff_pool.c b/net/xdp/xsk_buff_pool.c
index 26f6d304451e..b3f7b310811e 100644
--- a/net/xdp/xsk_buff_pool.c
+++ b/net/xdp/xsk_buff_pool.c
@@ -86,6 +86,7 @@ struct xsk_buff_pool *xp_create_and_assign_umem(struct xdp_sock *xs,
pool->umem = umem;
pool->addrs = umem->addrs;
INIT_LIST_HEAD(&pool->free_list);
+ INIT_LIST_HEAD(&pool->xskb_list);
INIT_LIST_HEAD(&pool->xsk_tx_list);
spin_lock_init(&pool->xsk_tx_list_lock);
spin_lock_init(&pool->cq_lock);
@@ -99,6 +100,7 @@ struct xsk_buff_pool *xp_create_and_assign_umem(struct xdp_sock *xs,
xskb->pool = pool;
xskb->xdp.frame_sz = umem->chunk_size - umem->headroom;
INIT_LIST_HEAD(&xskb->free_list_node);
+ INIT_LIST_HEAD(&xskb->xskb_list_node);
if (pool->unaligned)
pool->free_heads[i] = xskb;
else
@@ -187,6 +189,11 @@ int xp_assign_dev(struct xsk_buff_pool *pool,
goto err_unreg_pool;
}
+ if (netdev->xdp_zc_max_segs == 1 && (flags & XDP_USE_SG)) {
+ err = -EOPNOTSUPP;
+ goto err_unreg_pool;
+ }
+
bpf.command = XDP_SETUP_XSK_POOL;
bpf.xsk.pool = pool;
bpf.xsk.queue_id = queue_id;
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index 6d40a77fccbe..13354a1e4280 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -48,6 +48,11 @@ struct xsk_queue {
size_t ring_vmalloc_size;
};
+struct parsed_desc {
+ u32 mb;
+ u32 valid;
+};
+
/* The structure of the shared state of the rings are a simple
* circular buffer, as outlined in
* Documentation/core-api/circular-buffers.rst. For the Rx and
@@ -130,18 +135,26 @@ static inline bool xskq_cons_read_addr_unchecked(struct xsk_queue *q, u64 *addr)
return false;
}
+static inline bool xp_unused_options_set(u32 options)
+{
+ return options & ~XDP_PKT_CONTD;
+}
+
static inline bool xp_aligned_validate_desc(struct xsk_buff_pool *pool,
struct xdp_desc *desc)
{
u64 offset = desc->addr & (pool->chunk_size - 1);
+ if (!desc->len)
+ return false;
+
if (offset + desc->len > pool->chunk_size)
return false;
if (desc->addr >= pool->addrs_cnt)
return false;
- if (desc->options)
+ if (xp_unused_options_set(desc->options))
return false;
return true;
}
@@ -151,6 +164,9 @@ static inline bool xp_unaligned_validate_desc(struct xsk_buff_pool *pool,
{
u64 addr = xp_unaligned_add_offset_to_addr(desc->addr);
+ if (!desc->len)
+ return false;
+
if (desc->len > pool->chunk_size)
return false;
@@ -158,7 +174,7 @@ static inline bool xp_unaligned_validate_desc(struct xsk_buff_pool *pool,
xp_desc_crosses_non_contig_pg(pool, addr, desc->len))
return false;
- if (desc->options)
+ if (xp_unused_options_set(desc->options))
return false;
return true;
}
@@ -170,6 +186,11 @@ static inline bool xp_validate_desc(struct xsk_buff_pool *pool,
xp_aligned_validate_desc(pool, desc);
}
+static inline bool xskq_has_descs(struct xsk_queue *q)
+{
+ return q->cached_cons != q->cached_prod;
+}
+
static inline bool xskq_cons_is_valid_desc(struct xsk_queue *q,
struct xdp_desc *d,
struct xsk_buff_pool *pool)
@@ -185,17 +206,15 @@ static inline bool xskq_cons_read_desc(struct xsk_queue *q,
struct xdp_desc *desc,
struct xsk_buff_pool *pool)
{
- while (q->cached_cons != q->cached_prod) {
+ if (q->cached_cons != q->cached_prod) {
struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
u32 idx = q->cached_cons & q->ring_mask;
*desc = ring->desc[idx];
- if (xskq_cons_is_valid_desc(q, desc, pool))
- return true;
-
- q->cached_cons++;
+ return xskq_cons_is_valid_desc(q, desc, pool);
}
+ q->queue_empty_descs++;
return false;
}
@@ -204,30 +223,52 @@ static inline void xskq_cons_release_n(struct xsk_queue *q, u32 cnt)
q->cached_cons += cnt;
}
-static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool,
- u32 max)
+static inline void parse_desc(struct xsk_queue *q, struct xsk_buff_pool *pool,
+ struct xdp_desc *desc, struct parsed_desc *parsed)
+{
+ parsed->valid = xskq_cons_is_valid_desc(q, desc, pool);
+ parsed->mb = xp_mb_desc(desc);
+}
+
+static inline
+u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool,
+ u32 max)
{
u32 cached_cons = q->cached_cons, nb_entries = 0;
struct xdp_desc *descs = pool->tx_descs;
+ u32 total_descs = 0, nr_frags = 0;
+ /* track first entry, if stumble upon *any* invalid descriptor, rewind
+ * current packet that consists of frags and stop the processing
+ */
while (cached_cons != q->cached_prod && nb_entries < max) {
struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
u32 idx = cached_cons & q->ring_mask;
+ struct parsed_desc parsed;
descs[nb_entries] = ring->desc[idx];
- if (unlikely(!xskq_cons_is_valid_desc(q, &descs[nb_entries], pool))) {
- /* Skip the entry */
- cached_cons++;
- continue;
+ cached_cons++;
+ parse_desc(q, pool, &descs[nb_entries], &parsed);
+ if (unlikely(!parsed.valid))
+ break;
+
+ if (likely(!parsed.mb)) {
+ total_descs += (nr_frags + 1);
+ nr_frags = 0;
+ } else {
+ nr_frags++;
+ if (nr_frags == pool->netdev->xdp_zc_max_segs) {
+ nr_frags = 0;
+ break;
+ }
}
-
nb_entries++;
- cached_cons++;
}
+ cached_cons -= nr_frags;
/* Release valid plus any invalid entries */
xskq_cons_release_n(q, cached_cons - q->cached_cons);
- return nb_entries;
+ return total_descs;
}
/* Functions for consumers */
@@ -292,6 +333,11 @@ static inline void xskq_cons_release(struct xsk_queue *q)
q->cached_cons++;
}
+static inline void xskq_cons_cancel_n(struct xsk_queue *q, u32 cnt)
+{
+ q->cached_cons -= cnt;
+}
+
static inline u32 xskq_cons_present_entries(struct xsk_queue *q)
{
/* No barriers needed since data is not accessed */
@@ -319,9 +365,9 @@ static inline bool xskq_prod_is_full(struct xsk_queue *q)
return xskq_prod_nb_free(q, 1) ? false : true;
}
-static inline void xskq_prod_cancel(struct xsk_queue *q)
+static inline void xskq_prod_cancel_n(struct xsk_queue *q, u32 cnt)
{
- q->cached_prod--;
+ q->cached_prod -= cnt;
}
static inline int xskq_prod_reserve(struct xsk_queue *q)
@@ -360,7 +406,7 @@ static inline void xskq_prod_write_addr_batch(struct xsk_queue *q, struct xdp_de
}
static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
- u64 addr, u32 len)
+ u64 addr, u32 len, u32 flags)
{
struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
u32 idx;
@@ -372,6 +418,7 @@ static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
idx = q->cached_prod++ & q->ring_mask;
ring->desc[idx].addr = addr;
ring->desc[idx].len = len;
+ ring->desc[idx].options = flags;
return 0;
}
@@ -386,16 +433,6 @@ static inline void xskq_prod_submit(struct xsk_queue *q)
__xskq_prod_submit(q, q->cached_prod);
}
-static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr)
-{
- struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
- u32 idx = q->ring->producer;
-
- ring->desc[idx++ & q->ring_mask] = addr;
-
- __xskq_prod_submit(q, idx);
-}
-
static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries)
{
__xskq_prod_submit(q, q->ring->producer + nb_entries);