summaryrefslogtreecommitdiff
path: root/net
diff options
context:
space:
mode:
Diffstat (limited to 'net')
-rw-r--r--net/ceph/ceph_common.c3
-rw-r--r--net/ceph/messenger.c107
-rw-r--r--net/ceph/osd_client.c59
-rw-r--r--net/ceph/osdmap.c47
-rw-r--r--net/sunrpc/rpcb_clnt.c1
-rw-r--r--net/sunrpc/svc.c8
-rw-r--r--net/sunrpc/svcsock.c98
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c10
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c4
9 files changed, 166 insertions, 171 deletions
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index a8020293f342..ee71ea26777a 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -305,7 +305,6 @@ ceph_parse_options(char *options, const char *dev_name,
/* start with defaults */
opt->flags = CEPH_OPT_DEFAULT;
- opt->osd_timeout = CEPH_OSD_TIMEOUT_DEFAULT;
opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */
opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* seconds */
@@ -391,7 +390,7 @@ ceph_parse_options(char *options, const char *dev_name,
/* misc */
case Opt_osdtimeout:
- opt->osd_timeout = intval;
+ pr_warning("ignoring deprecated osdtimeout option\n");
break;
case Opt_osdkeepalivetimeout:
opt->osd_keepalive_timeout = intval;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 3ef1759403b4..4d111fd2b492 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -2244,22 +2244,62 @@ bad_tag:
/*
- * Atomically queue work on a connection. Bump @con reference to
- * avoid races with connection teardown.
+ * Atomically queue work on a connection after the specified delay.
+ * Bump @con reference to avoid races with connection teardown.
+ * Returns 0 if work was queued, or an error code otherwise.
*/
-static void queue_con(struct ceph_connection *con)
+static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
{
if (!con->ops->get(con)) {
- dout("queue_con %p ref count 0\n", con);
- return;
+ dout("%s %p ref count 0\n", __func__, con);
+
+ return -ENOENT;
}
- if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) {
- dout("queue_con %p - already queued\n", con);
+ if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
+ dout("%s %p - already queued\n", __func__, con);
con->ops->put(con);
- } else {
- dout("queue_con %p\n", con);
+
+ return -EBUSY;
}
+
+ dout("%s %p %lu\n", __func__, con, delay);
+
+ return 0;
+}
+
+static void queue_con(struct ceph_connection *con)
+{
+ (void) queue_con_delay(con, 0);
+}
+
+static bool con_sock_closed(struct ceph_connection *con)
+{
+ if (!test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags))
+ return false;
+
+#define CASE(x) \
+ case CON_STATE_ ## x: \
+ con->error_msg = "socket closed (con state " #x ")"; \
+ break;
+
+ switch (con->state) {
+ CASE(CLOSED);
+ CASE(PREOPEN);
+ CASE(CONNECTING);
+ CASE(NEGOTIATING);
+ CASE(OPEN);
+ CASE(STANDBY);
+ default:
+ pr_warning("%s con %p unrecognized state %lu\n",
+ __func__, con, con->state);
+ con->error_msg = "unrecognized con state";
+ BUG();
+ break;
+ }
+#undef CASE
+
+ return true;
}
/*
@@ -2273,35 +2313,16 @@ static void con_work(struct work_struct *work)
mutex_lock(&con->mutex);
restart:
- if (test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) {
- switch (con->state) {
- case CON_STATE_CONNECTING:
- con->error_msg = "connection failed";
- break;
- case CON_STATE_NEGOTIATING:
- con->error_msg = "negotiation failed";
- break;
- case CON_STATE_OPEN:
- con->error_msg = "socket closed";
- break;
- default:
- dout("unrecognized con state %d\n", (int)con->state);
- con->error_msg = "unrecognized con state";
- BUG();
- }
+ if (con_sock_closed(con))
goto fault;
- }
if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) {
dout("con_work %p backing off\n", con);
- if (queue_delayed_work(ceph_msgr_wq, &con->work,
- round_jiffies_relative(con->delay))) {
- dout("con_work %p backoff %lu\n", con, con->delay);
- mutex_unlock(&con->mutex);
- return;
- } else {
+ ret = queue_con_delay(con, round_jiffies_relative(con->delay));
+ if (ret) {
dout("con_work %p FAILED to back off %lu\n", con,
con->delay);
+ BUG_ON(ret == -ENOENT);
set_bit(CON_FLAG_BACKOFF, &con->flags);
}
goto done;
@@ -2356,7 +2377,7 @@ fault:
static void ceph_fault(struct ceph_connection *con)
__releases(con->mutex)
{
- pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
+ pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
dout("fault %p state %lu to peer %s\n",
con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
@@ -2398,24 +2419,8 @@ static void ceph_fault(struct ceph_connection *con)
con->delay = BASE_DELAY_INTERVAL;
else if (con->delay < MAX_DELAY_INTERVAL)
con->delay *= 2;
- con->ops->get(con);
- if (queue_delayed_work(ceph_msgr_wq, &con->work,
- round_jiffies_relative(con->delay))) {
- dout("fault queued %p delay %lu\n", con, con->delay);
- } else {
- con->ops->put(con);
- dout("fault failed to queue %p delay %lu, backoff\n",
- con, con->delay);
- /*
- * In many cases we see a socket state change
- * while con_work is running and end up
- * queuing (non-delayed) work, such that we
- * can't backoff with a delay. Set a flag so
- * that when con_work restarts we schedule the
- * delay then.
- */
- set_bit(CON_FLAG_BACKOFF, &con->flags);
- }
+ set_bit(CON_FLAG_BACKOFF, &con->flags);
+ queue_con(con);
}
out_unlock:
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index c1d756cc7448..780caf6b0491 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -221,6 +221,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
kref_init(&req->r_kref);
init_completion(&req->r_completion);
init_completion(&req->r_safe_completion);
+ RB_CLEAR_NODE(&req->r_node);
INIT_LIST_HEAD(&req->r_unsafe_item);
INIT_LIST_HEAD(&req->r_linger_item);
INIT_LIST_HEAD(&req->r_linger_osd);
@@ -580,7 +581,7 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
dout("__kick_osd_requests osd%d\n", osd->o_osd);
err = __reset_osd(osdc, osd);
- if (err == -EAGAIN)
+ if (err)
return;
list_for_each_entry(req, &osd->o_requests, r_osd_item) {
@@ -607,14 +608,6 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
}
}
-static void kick_osd_requests(struct ceph_osd_client *osdc,
- struct ceph_osd *kickosd)
-{
- mutex_lock(&osdc->request_mutex);
- __kick_osd_requests(osdc, kickosd);
- mutex_unlock(&osdc->request_mutex);
-}
-
/*
* If the osd connection drops, we need to resubmit all requests.
*/
@@ -628,7 +621,9 @@ static void osd_reset(struct ceph_connection *con)
dout("osd_reset osd%d\n", osd->o_osd);
osdc = osd->o_osdc;
down_read(&osdc->map_sem);
- kick_osd_requests(osdc, osd);
+ mutex_lock(&osdc->request_mutex);
+ __kick_osd_requests(osdc, osd);
+ mutex_unlock(&osdc->request_mutex);
send_queued(osdc);
up_read(&osdc->map_sem);
}
@@ -647,6 +642,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
atomic_set(&osd->o_ref, 1);
osd->o_osdc = osdc;
osd->o_osd = onum;
+ RB_CLEAR_NODE(&osd->o_node);
INIT_LIST_HEAD(&osd->o_requests);
INIT_LIST_HEAD(&osd->o_linger_requests);
INIT_LIST_HEAD(&osd->o_osd_lru);
@@ -750,6 +746,7 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
if (list_empty(&osd->o_requests) &&
list_empty(&osd->o_linger_requests)) {
__remove_osd(osdc, osd);
+ ret = -ENODEV;
} else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
&osd->o_con.peer_addr,
sizeof(osd->o_con.peer_addr)) == 0 &&
@@ -876,9 +873,9 @@ static void __unregister_request(struct ceph_osd_client *osdc,
req->r_osd = NULL;
}
+ list_del_init(&req->r_req_lru_item);
ceph_osdc_put_request(req);
- list_del_init(&req->r_req_lru_item);
if (osdc->num_requests == 0) {
dout(" no requests, canceling timeout\n");
__cancel_osd_timeout(osdc);
@@ -910,8 +907,8 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req)
{
dout("__unregister_linger_request %p\n", req);
+ list_del_init(&req->r_linger_item);
if (req->r_osd) {
- list_del_init(&req->r_linger_item);
list_del_init(&req->r_linger_osd);
if (list_empty(&req->r_osd->o_requests) &&
@@ -1090,12 +1087,10 @@ static void handle_timeout(struct work_struct *work)
{
struct ceph_osd_client *osdc =
container_of(work, struct ceph_osd_client, timeout_work.work);
- struct ceph_osd_request *req, *last_req = NULL;
+ struct ceph_osd_request *req;
struct ceph_osd *osd;
- unsigned long timeout = osdc->client->options->osd_timeout * HZ;
unsigned long keepalive =
osdc->client->options->osd_keepalive_timeout * HZ;
- unsigned long last_stamp = 0;
struct list_head slow_osds;
dout("timeout\n");
down_read(&osdc->map_sem);
@@ -1105,37 +1100,6 @@ static void handle_timeout(struct work_struct *work)
mutex_lock(&osdc->request_mutex);
/*
- * reset osds that appear to be _really_ unresponsive. this
- * is a failsafe measure.. we really shouldn't be getting to
- * this point if the system is working properly. the monitors
- * should mark the osd as failed and we should find out about
- * it from an updated osd map.
- */
- while (timeout && !list_empty(&osdc->req_lru)) {
- req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
- r_req_lru_item);
-
- /* hasn't been long enough since we sent it? */
- if (time_before(jiffies, req->r_stamp + timeout))
- break;
-
- /* hasn't been long enough since it was acked? */
- if (req->r_request->ack_stamp == 0 ||
- time_before(jiffies, req->r_request->ack_stamp + timeout))
- break;
-
- BUG_ON(req == last_req && req->r_stamp == last_stamp);
- last_req = req;
- last_stamp = req->r_stamp;
-
- osd = req->r_osd;
- BUG_ON(!osd);
- pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
- req->r_tid, osd->o_osd);
- __kick_osd_requests(osdc, osd);
- }
-
- /*
* ping osds that are a bit slow. this ensures that if there
* is a break in the TCP connection we will notice, and reopen
* a connection with that osd (from the fault callback).
@@ -1364,8 +1328,8 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
req->r_osd ? req->r_osd->o_osd : -1);
- __unregister_linger_request(osdc, req);
__register_request(osdc, req);
+ __unregister_linger_request(osdc, req);
}
mutex_unlock(&osdc->request_mutex);
@@ -1599,6 +1563,7 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
event->data = data;
event->osdc = osdc;
INIT_LIST_HEAD(&event->osd_node);
+ RB_CLEAR_NODE(&event->node);
kref_init(&event->kref); /* one ref for us */
kref_get(&event->kref); /* one ref for the caller */
init_completion(&event->completion);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 5433fb0eb3c6..de73214b5d26 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -469,6 +469,22 @@ static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id)
return NULL;
}
+const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
+{
+ struct ceph_pg_pool_info *pi;
+
+ if (id == CEPH_NOPOOL)
+ return NULL;
+
+ if (WARN_ON_ONCE(id > (u64) INT_MAX))
+ return NULL;
+
+ pi = __lookup_pg_pool(&map->pg_pools, (int) id);
+
+ return pi ? pi->name : NULL;
+}
+EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
+
int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name)
{
struct rb_node *rbp;
@@ -645,10 +661,12 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
ceph_decode_32_safe(p, end, max, bad);
while (max--) {
ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad);
+ err = -ENOMEM;
pi = kzalloc(sizeof(*pi), GFP_NOFS);
if (!pi)
goto bad;
pi->id = ceph_decode_32(p);
+ err = -EINVAL;
ev = ceph_decode_8(p); /* encoding version */
if (ev > CEPH_PG_POOL_VERSION) {
pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
@@ -664,8 +682,13 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
__insert_pg_pool(&map->pg_pools, pi);
}
- if (version >= 5 && __decode_pool_names(p, end, map) < 0)
- goto bad;
+ if (version >= 5) {
+ err = __decode_pool_names(p, end, map);
+ if (err < 0) {
+ dout("fail to decode pool names");
+ goto bad;
+ }
+ }
ceph_decode_32_safe(p, end, map->pool_max, bad);
@@ -745,7 +768,7 @@ struct ceph_osdmap *osdmap_decode(void **p, void *end)
return map;
bad:
- dout("osdmap_decode fail\n");
+ dout("osdmap_decode fail err %d\n", err);
ceph_osdmap_destroy(map);
return ERR_PTR(err);
}
@@ -839,6 +862,7 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
if (ev > CEPH_PG_POOL_VERSION) {
pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
ev, CEPH_PG_POOL_VERSION);
+ err = -EINVAL;
goto bad;
}
pi = __lookup_pg_pool(&map->pg_pools, pool);
@@ -855,8 +879,11 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
if (err < 0)
goto bad;
}
- if (version >= 5 && __decode_pool_names(p, end, map) < 0)
- goto bad;
+ if (version >= 5) {
+ err = __decode_pool_names(p, end, map);
+ if (err < 0)
+ goto bad;
+ }
/* old_pool */
ceph_decode_32_safe(p, end, len, bad);
@@ -932,15 +959,13 @@ struct ceph_osdmap *osdmap_apply_incremental(void **p, void *end,
(void) __remove_pg_mapping(&map->pg_temp, pgid);
/* insert */
- if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) {
- err = -EINVAL;
+ err = -EINVAL;
+ if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
goto bad;
- }
+ err = -ENOMEM;
pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS);
- if (!pg) {
- err = -ENOMEM;
+ if (!pg)
goto bad;
- }
pg->pgid = pgid;
pg->len = pglen;
for (j = 0; j < pglen; j++)
diff --git a/net/sunrpc/rpcb_clnt.c b/net/sunrpc/rpcb_clnt.c
index 411f332de0b3..795a0f4e920b 100644
--- a/net/sunrpc/rpcb_clnt.c
+++ b/net/sunrpc/rpcb_clnt.c
@@ -23,7 +23,6 @@
#include <linux/errno.h>
#include <linux/mutex.h>
#include <linux/slab.h>
-#include <linux/nsproxy.h>
#include <net/ipv6.h>
#include <linux/sunrpc/clnt.h>
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index dfa4ba69ff45..dbf12ac5ecb7 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -20,7 +20,6 @@
#include <linux/module.h>
#include <linux/kthread.h>
#include <linux/slab.h>
-#include <linux/nsproxy.h>
#include <linux/sunrpc/types.h>
#include <linux/sunrpc/xdr.h>
@@ -1041,7 +1040,7 @@ static void svc_unregister(const struct svc_serv *serv, struct net *net)
}
/*
- * Printk the given error with the address of the client that caused it.
+ * dprintk the given error with the address of the client that caused it.
*/
static __printf(2, 3)
void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
@@ -1055,8 +1054,7 @@ void svc_printk(struct svc_rqst *rqstp, const char *fmt, ...)
vaf.fmt = fmt;
vaf.va = &args;
- net_warn_ratelimited("svc: %s: %pV",
- svc_print_addr(rqstp, buf, sizeof(buf)), &vaf);
+ dprintk("svc: %s: %pV", svc_print_addr(rqstp, buf, sizeof(buf)), &vaf);
va_end(args);
}
@@ -1305,7 +1303,7 @@ svc_process(struct svc_rqst *rqstp)
* Setup response xdr_buf.
* Initially it has just one page
*/
- rqstp->rq_resused = 1;
+ rqstp->rq_next_page = &rqstp->rq_respages[1];
resv->iov_base = page_address(rqstp->rq_respages[0]);
resv->iov_len = 0;
rqstp->rq_res.pages = rqstp->rq_respages + 1;
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index cc3020d16789..0a148c9d2a5c 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -605,6 +605,7 @@ static int svc_udp_recvfrom(struct svc_rqst *rqstp)
rqstp->rq_respages = rqstp->rq_pages + 1 +
DIV_ROUND_UP(rqstp->rq_arg.page_len, PAGE_SIZE);
}
+ rqstp->rq_next_page = rqstp->rq_respages+1;
if (serv->sv_stats)
serv->sv_stats->netudpcnt++;
@@ -878,9 +879,9 @@ static unsigned int svc_tcp_restore_pages(struct svc_sock *svsk, struct svc_rqst
{
unsigned int i, len, npages;
- if (svsk->sk_tcplen <= sizeof(rpc_fraghdr))
+ if (svsk->sk_datalen == 0)
return 0;
- len = svsk->sk_tcplen - sizeof(rpc_fraghdr);
+ len = svsk->sk_datalen;
npages = (len + PAGE_SIZE - 1) >> PAGE_SHIFT;
for (i = 0; i < npages; i++) {
if (rqstp->rq_pages[i] != NULL)
@@ -897,9 +898,9 @@ static void svc_tcp_save_pages(struct svc_sock *svsk, struct svc_rqst *rqstp)
{
unsigned int i, len, npages;
- if (svsk->sk_tcplen <= sizeof(rpc_fraghdr))
+ if (svsk->sk_datalen == 0)
return;
- len = svsk->sk_tcplen - sizeof(rpc_fraghdr);
+ len = svsk->sk_datalen;
npages = (len + PAGE_SIZE - 1) >> PAGE_SHIFT;
for (i = 0; i < npages; i++) {
svsk->sk_pages[i] = rqstp->rq_pages[i];
@@ -911,9 +912,9 @@ static void svc_tcp_clear_pages(struct svc_sock *svsk)
{
unsigned int i, len, npages;
- if (svsk->sk_tcplen <= sizeof(rpc_fraghdr))
+ if (svsk->sk_datalen == 0)
goto out;
- len = svsk->sk_tcplen - sizeof(rpc_fraghdr);
+ len = svsk->sk_datalen;
npages = (len + PAGE_SIZE - 1) >> PAGE_SHIFT;
for (i = 0; i < npages; i++) {
BUG_ON(svsk->sk_pages[i] == NULL);
@@ -922,13 +923,12 @@ static void svc_tcp_clear_pages(struct svc_sock *svsk)
}
out:
svsk->sk_tcplen = 0;
+ svsk->sk_datalen = 0;
}
/*
- * Receive data.
+ * Receive fragment record header.
* If we haven't gotten the record length yet, get the next four bytes.
- * Otherwise try to gobble up as much as possible up to the complete
- * record length.
*/
static int svc_tcp_recv_record(struct svc_sock *svsk, struct svc_rqst *rqstp)
{
@@ -954,32 +954,16 @@ static int svc_tcp_recv_record(struct svc_sock *svsk, struct svc_rqst *rqstp)
return -EAGAIN;
}
- svsk->sk_reclen = ntohl(svsk->sk_reclen);
- if (!(svsk->sk_reclen & RPC_LAST_STREAM_FRAGMENT)) {
- /* FIXME: technically, a record can be fragmented,
- * and non-terminal fragments will not have the top
- * bit set in the fragment length header.
- * But apparently no known nfs clients send fragmented
- * records. */
- net_notice_ratelimited("RPC: multiple fragments per record not supported\n");
- goto err_delete;
- }
-
- svsk->sk_reclen &= RPC_FRAGMENT_SIZE_MASK;
- dprintk("svc: TCP record, %d bytes\n", svsk->sk_reclen);
- if (svsk->sk_reclen > serv->sv_max_mesg) {
- net_notice_ratelimited("RPC: fragment too large: 0x%08lx\n",
- (unsigned long)svsk->sk_reclen);
+ dprintk("svc: TCP record, %d bytes\n", svc_sock_reclen(svsk));
+ if (svc_sock_reclen(svsk) + svsk->sk_datalen >
+ serv->sv_max_mesg) {
+ net_notice_ratelimited("RPC: fragment too large: %d\n",
+ svc_sock_reclen(svsk));
goto err_delete;
}
}
- if (svsk->sk_reclen < 8)
- goto err_delete; /* client is nuts. */
-
- len = svsk->sk_reclen;
-
- return len;
+ return svc_sock_reclen(svsk);
error:
dprintk("RPC: TCP recv_record got %d\n", len);
return len;
@@ -1023,7 +1007,7 @@ static int receive_cb_reply(struct svc_sock *svsk, struct svc_rqst *rqstp)
if (dst->iov_len < src->iov_len)
return -EAGAIN; /* whatever; just giving up. */
memcpy(dst->iov_base, src->iov_base, src->iov_len);
- xprt_complete_rqst(req->rq_task, svsk->sk_reclen);
+ xprt_complete_rqst(req->rq_task, rqstp->rq_arg.len);
rqstp->rq_arg.len = 0;
return 0;
}
@@ -1042,6 +1026,17 @@ static int copy_pages_to_kvecs(struct kvec *vec, struct page **pages, int len)
return i;
}
+static void svc_tcp_fragment_received(struct svc_sock *svsk)
+{
+ /* If we have more data, signal svc_xprt_enqueue() to try again */
+ if (svc_recv_available(svsk) > sizeof(rpc_fraghdr))
+ set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
+ dprintk("svc: TCP %s record (%d bytes)\n",
+ svc_sock_final_rec(svsk) ? "final" : "nonfinal",
+ svc_sock_reclen(svsk));
+ svsk->sk_tcplen = 0;
+ svsk->sk_reclen = 0;
+}
/*
* Receive data from a TCP socket.
@@ -1068,29 +1063,39 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
goto error;
base = svc_tcp_restore_pages(svsk, rqstp);
- want = svsk->sk_reclen - base;
+ want = svc_sock_reclen(svsk) - (svsk->sk_tcplen - sizeof(rpc_fraghdr));
vec = rqstp->rq_vec;
pnum = copy_pages_to_kvecs(&vec[0], &rqstp->rq_pages[0],
- svsk->sk_reclen);
+ svsk->sk_datalen + want);
rqstp->rq_respages = &rqstp->rq_pages[pnum];
+ rqstp->rq_next_page = rqstp->rq_respages + 1;
/* Now receive data */
len = svc_partial_recvfrom(rqstp, vec, pnum, want, base);
- if (len >= 0)
+ if (len >= 0) {
svsk->sk_tcplen += len;
- if (len != want) {
+ svsk->sk_datalen += len;
+ }
+ if (len != want || !svc_sock_final_rec(svsk)) {
svc_tcp_save_pages(svsk, rqstp);
if (len < 0 && len != -EAGAIN)
- goto err_other;
- dprintk("svc: incomplete TCP record (%d of %d)\n",
- svsk->sk_tcplen, svsk->sk_reclen);
+ goto err_delete;
+ if (len == want)
+ svc_tcp_fragment_received(svsk);
+ else
+ dprintk("svc: incomplete TCP record (%d of %d)\n",
+ (int)(svsk->sk_tcplen - sizeof(rpc_fraghdr)),
+ svc_sock_reclen(svsk));
goto err_noclose;
}
- rqstp->rq_arg.len = svsk->sk_reclen;
+ if (svc_sock_reclen(svsk) < 8)
+ goto err_delete; /* client is nuts. */
+
+ rqstp->rq_arg.len = svsk->sk_datalen;
rqstp->rq_arg.page_base = 0;
if (rqstp->rq_arg.len <= rqstp->rq_arg.head[0].iov_len) {
rqstp->rq_arg.head[0].iov_len = rqstp->rq_arg.len;
@@ -1107,11 +1112,8 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
len = receive_cb_reply(svsk, rqstp);
/* Reset TCP read info */
- svsk->sk_reclen = 0;
- svsk->sk_tcplen = 0;
- /* If we have more data, signal svc_xprt_enqueue() to try again */
- if (svc_recv_available(svsk) > sizeof(rpc_fraghdr))
- set_bit(XPT_DATA, &svsk->sk_xprt.xpt_flags);
+ svsk->sk_datalen = 0;
+ svc_tcp_fragment_received(svsk);
if (len < 0)
goto error;
@@ -1120,15 +1122,14 @@ static int svc_tcp_recvfrom(struct svc_rqst *rqstp)
if (serv->sv_stats)
serv->sv_stats->nettcpcnt++;
- dprintk("svc: TCP complete record (%d bytes)\n", rqstp->rq_arg.len);
return rqstp->rq_arg.len;
error:
if (len != -EAGAIN)
- goto err_other;
+ goto err_delete;
dprintk("RPC: TCP recvfrom got EAGAIN\n");
return 0;
-err_other:
+err_delete:
printk(KERN_NOTICE "%s: recvfrom returned errno %d\n",
svsk->sk_xprt.xpt_server->sv_name, -len);
set_bit(XPT_CLOSE, &svsk->sk_xprt.xpt_flags);
@@ -1305,6 +1306,7 @@ static void svc_tcp_init(struct svc_sock *svsk, struct svc_serv *serv)
svsk->sk_reclen = 0;
svsk->sk_tcplen = 0;
+ svsk->sk_datalen = 0;
memset(&svsk->sk_pages[0], 0, sizeof(svsk->sk_pages));
tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 41cb63b623df..0ce75524ed21 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -521,11 +521,11 @@ next_sge:
rqstp->rq_pages[ch_no] = NULL;
/*
- * Detach res pages. svc_release must see a resused count of
- * zero or it will attempt to put them.
+ * Detach res pages. If svc_release sees any it will attempt to
+ * put them.
*/
- while (rqstp->rq_resused)
- rqstp->rq_respages[--rqstp->rq_resused] = NULL;
+ while (rqstp->rq_next_page != rqstp->rq_respages)
+ *(--rqstp->rq_next_page) = NULL;
return err;
}
@@ -550,7 +550,7 @@ static int rdma_read_complete(struct svc_rqst *rqstp,
/* rq_respages starts after the last arg page */
rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
- rqstp->rq_resused = 0;
+ rqstp->rq_next_page = &rqstp->rq_arg.pages[page_no];
/* Rebuild rq_arg head and tail. */
rqstp->rq_arg.head[0] = head->arg.head[0];
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 42eb7ba0b903..c1d124dc772b 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -548,6 +548,7 @@ static int send_reply(struct svcxprt_rdma *rdma,
int sge_no;
int sge_bytes;
int page_no;
+ int pages;
int ret;
/* Post a recv buffer to handle another request. */
@@ -611,7 +612,8 @@ static int send_reply(struct svcxprt_rdma *rdma,
* respages array. They are our pages until the I/O
* completes.
*/
- for (page_no = 0; page_no < rqstp->rq_resused; page_no++) {
+ pages = rqstp->rq_next_page - rqstp->rq_respages;
+ for (page_no = 0; page_no < pages; page_no++) {
ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
ctxt->count++;
rqstp->rq_respages[page_no] = NULL;