1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  */
  25 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
  26 /* All Rights Reserved */
  27 /*
  28  * Portions of this source code were derived from Berkeley
  29  * 4.3 BSD under license from the Regents of the University of
  30  * California.
  31  */
  32 
  33 #include <sys/param.h>
  34 #include <sys/types.h>
  35 #include <sys/user.h>
  36 #include <sys/systm.h>
  37 #include <sys/sysmacros.h>
  38 #include <sys/errno.h>
  39 #include <sys/kmem.h>
  40 #include <sys/debug.h>
  41 #include <sys/systm.h>
  42 #include <sys/kstat.h>
  43 #include <sys/t_lock.h>
  44 #include <sys/ddi.h>
  45 #include <sys/cmn_err.h>
  46 #include <sys/time.h>
  47 #include <sys/isa_defs.h>
  48 #include <sys/zone.h>
  49 #include <sys/sdt.h>
  50 
  51 #include <rpc/types.h>
  52 #include <rpc/xdr.h>
  53 #include <rpc/auth.h>
  54 #include <rpc/clnt.h>
  55 #include <rpc/rpc_msg.h>
  56 #include <rpc/rpc_rdma.h>
  57 #include <nfs/nfs.h>
  58 #include <nfs/nfs4_kprot.h>
  59 
  60 static uint32_t rdma_bufs_rqst = RDMA_BUFS_RQST;
  61 
  62 static int clnt_compose_rpcmsg(CLIENT *, rpcproc_t, rdma_buf_t *,
  63                             XDR *, xdrproc_t, caddr_t);
  64 static int  clnt_compose_rdma_header(CONN *, CLIENT *, rdma_buf_t *,
  65                     XDR **, uint_t *);
  66 static int clnt_setup_rlist(CONN *, XDR *, XDR *);
  67 static int clnt_setup_wlist(CONN *, XDR *, XDR *, rdma_buf_t *);
  68 static int clnt_setup_long_reply(CONN *, struct clist **, uint_t);
  69 static void clnt_check_credit(CONN *);
  70 static void clnt_return_credit(CONN *);
  71 static void clnt_decode_long_reply(CONN *, struct clist *,
  72                 struct clist *, XDR *, XDR **, struct clist *,
  73                 struct clist *, uint_t, uint_t);
  74 
  75 static void clnt_update_credit(CONN *, uint32_t);
  76 
  77 static enum clnt_stat clnt_rdma_kcallit(CLIENT *, rpcproc_t, xdrproc_t,
  78     caddr_t, xdrproc_t, caddr_t, struct timeval);
  79 static void     clnt_rdma_kabort(CLIENT *);
  80 static void     clnt_rdma_kerror(CLIENT *, struct rpc_err *);
  81 static bool_t   clnt_rdma_kfreeres(CLIENT *, xdrproc_t, caddr_t);
  82 static void     clnt_rdma_kdestroy(CLIENT *);
  83 static bool_t   clnt_rdma_kcontrol(CLIENT *, int, char *);
  84 static int      clnt_rdma_ksettimers(CLIENT *, struct rpc_timers *,
  85     struct rpc_timers *, int, void(*)(int, int, caddr_t), caddr_t, uint32_t);
  86 
  87 /*
  88  * Operations vector for RDMA based RPC
  89  */
  90 static struct clnt_ops rdma_clnt_ops = {
  91         clnt_rdma_kcallit,      /* do rpc call */
  92         clnt_rdma_kabort,       /* abort call */
  93         clnt_rdma_kerror,       /* return error status */
  94         clnt_rdma_kfreeres,     /* free results */
  95         clnt_rdma_kdestroy,     /* destroy rpc handle */
  96         clnt_rdma_kcontrol,     /* the ioctl() of rpc */
  97         clnt_rdma_ksettimers,   /* set retry timers */
  98 };
  99 
 100 /*
 101  * The size of the preserialized RPC header information.
 102  */
 103 #define CKU_HDRSIZE     20
 104 #define CLNT_RDMA_SUCCESS 0
 105 #define CLNT_RDMA_FAIL (-1)
 106 
 107 #define AUTH_REFRESH_COUNT 2
 108 
 109 #define IS_RPCSEC_GSS(authh)                    \
 110         (authh->cl_auth->ah_cred.oa_flavor == RPCSEC_GSS)
 111 
 112 /*
 113  * Per RPC RDMA endpoint details
 114  */
 115 typedef struct cku_private {
 116         CLIENT                  cku_client;     /* client handle */
 117         rdma_mod_t              *cku_rd_mod;    /* underlying RDMA mod */
 118         void                    *cku_rd_handle; /* underlying RDMA device */
 119         struct netbuf           cku_srcaddr;    /* source address for retries */
 120         struct netbuf           cku_addr;       /* remote netbuf address */
 121         int                     cku_addrfmly;   /* for finding addr_type */
 122         struct rpc_err          cku_err;        /* error status */
 123         struct cred             *cku_cred;      /* credentials */
 124         XDR                     cku_outxdr;     /* xdr stream for output */
 125         uint32_t                cku_outsz;
 126         XDR                     cku_inxdr;      /* xdr stream for input */
 127         char                    cku_rpchdr[CKU_HDRSIZE+4]; /* rpc header */
 128         uint32_t                cku_xid;        /* current XID */
 129 } cku_private_t;
 130 
 131 #define CLNT_RDMA_DELAY 10      /* secs to delay after a connection failure */
 132 static int clnt_rdma_min_delay = CLNT_RDMA_DELAY;
 133 
 134 struct {
 135         kstat_named_t   rccalls;
 136         kstat_named_t   rcbadcalls;
 137         kstat_named_t   rcbadxids;
 138         kstat_named_t   rctimeouts;
 139         kstat_named_t   rcnewcreds;
 140         kstat_named_t   rcbadverfs;
 141         kstat_named_t   rctimers;
 142         kstat_named_t   rccantconn;
 143         kstat_named_t   rcnomem;
 144         kstat_named_t   rcintrs;
 145         kstat_named_t   rclongrpcs;
 146 } rdmarcstat = {
 147         { "calls",      KSTAT_DATA_UINT64 },
 148         { "badcalls",   KSTAT_DATA_UINT64 },
 149         { "badxids",    KSTAT_DATA_UINT64 },
 150         { "timeouts",   KSTAT_DATA_UINT64 },
 151         { "newcreds",   KSTAT_DATA_UINT64 },
 152         { "badverfs",   KSTAT_DATA_UINT64 },
 153         { "timers",     KSTAT_DATA_UINT64 },
 154         { "cantconn",   KSTAT_DATA_UINT64 },
 155         { "nomem",      KSTAT_DATA_UINT64 },
 156         { "interrupts", KSTAT_DATA_UINT64 },
 157         { "longrpc",    KSTAT_DATA_UINT64 }
 158 };
 159 
 160 kstat_named_t *rdmarcstat_ptr = (kstat_named_t *)&rdmarcstat;
 161 uint_t rdmarcstat_ndata = sizeof (rdmarcstat) / sizeof (kstat_named_t);
 162 
 163 #ifdef DEBUG
 164 int rdma_clnt_debug = 0;
 165 #endif
 166 
 167 #ifdef accurate_stats
 168 extern kmutex_t rdmarcstat_lock;    /* mutex for rcstat updates */
 169 
 170 #define RCSTAT_INCR(x)                  \
 171         mutex_enter(&rdmarcstat_lock);      \
 172         rdmarcstat.x.value.ui64++;      \
 173         mutex_exit(&rdmarcstat_lock);
 174 #else
 175 #define RCSTAT_INCR(x)                  \
 176         rdmarcstat.x.value.ui64++;
 177 #endif
 178 
 179 #define ptoh(p)         (&((p)->cku_client))
 180 #define htop(h)         ((cku_private_t *)((h)->cl_private))
 181 
 182 uint_t
 183 calc_length(uint_t len)
 184 {
 185         len = RNDUP(len);
 186 
 187         if (len <= 64 * 1024) {
 188                 if (len > 32 * 1024) {
 189                         len = 64 * 1024;
 190                 } else {
 191                         if (len > 16 * 1024) {
 192                                 len = 32 * 1024;
 193                         } else {
 194                                 if (len > 8 * 1024) {
 195                                         len = 16 * 1024;
 196                                 } else {
 197                                         len = 8 * 1024;
 198                                 }
 199                         }
 200                 }
 201         }
 202         return (len);
 203 }
 204 int
 205 clnt_rdma_kcreate(char *proto, void *handle, struct netbuf *raddr, int family,
 206     rpcprog_t pgm, rpcvers_t vers, struct cred *cred, CLIENT **cl)
 207 {
 208         CLIENT *h;
 209         struct cku_private *p;
 210         struct rpc_msg call_msg;
 211         rdma_registry_t *rp;
 212 
 213         ASSERT(INGLOBALZONE(curproc));
 214 
 215         if (cl == NULL)
 216                 return (EINVAL);
 217         *cl = NULL;
 218 
 219         p = kmem_zalloc(sizeof (*p), KM_SLEEP);
 220 
 221         /*
 222          * Find underlying RDMATF plugin
 223          */
 224         rw_enter(&rdma_lock, RW_READER);
 225         rp = rdma_mod_head;
 226         while (rp != NULL) {
 227                 if (strcmp(rp->r_mod->rdma_api, proto))
 228                         rp = rp->r_next;
 229                 else {
 230                         p->cku_rd_mod = rp->r_mod;
 231                         p->cku_rd_handle = handle;
 232                         break;
 233                 }
 234         }
 235         rw_exit(&rdma_lock);
 236 
 237         if (p->cku_rd_mod == NULL) {
 238                 /*
 239                  * Should not happen.
 240                  * No matching RDMATF plugin.
 241                  */
 242                 kmem_free(p, sizeof (struct cku_private));
 243                 return (EINVAL);
 244         }
 245 
 246         h = ptoh(p);
 247         h->cl_ops = &rdma_clnt_ops;
 248         h->cl_private = (caddr_t)p;
 249         h->cl_auth = authkern_create();
 250 
 251         /* call message, just used to pre-serialize below */
 252         call_msg.rm_xid = 0;
 253         call_msg.rm_direction = CALL;
 254         call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
 255         call_msg.rm_call.cb_prog = pgm;
 256         call_msg.rm_call.cb_vers = vers;
 257 
 258         xdrmem_create(&p->cku_outxdr, p->cku_rpchdr, CKU_HDRSIZE, XDR_ENCODE);
 259         /* pre-serialize call message header */
 260         if (!xdr_callhdr(&p->cku_outxdr, &call_msg)) {
 261                 XDR_DESTROY(&p->cku_outxdr);
 262                 auth_destroy(h->cl_auth);
 263                 kmem_free(p, sizeof (struct cku_private));
 264                 return (EINVAL);
 265         }
 266 
 267         /*
 268          * Set up the rpc information
 269          */
 270         p->cku_cred = cred;
 271         p->cku_srcaddr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
 272         p->cku_srcaddr.maxlen = raddr->maxlen;
 273         p->cku_srcaddr.len = 0;
 274         p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
 275         p->cku_addr.maxlen = raddr->maxlen;
 276         p->cku_addr.len = raddr->len;
 277         bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
 278         p->cku_addrfmly = family;
 279 
 280         *cl = h;
 281         return (0);
 282 }
 283 
 284 static void
 285 clnt_rdma_kdestroy(CLIENT *h)
 286 {
 287         struct cku_private *p = htop(h);
 288 
 289         kmem_free(p->cku_srcaddr.buf, p->cku_srcaddr.maxlen);
 290         kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
 291         kmem_free(p, sizeof (*p));
 292 }
 293 
 294 void
 295 clnt_rdma_kinit(CLIENT *h, char *proto, void *handle, struct netbuf *raddr,
 296     struct cred *cred)
 297 {
 298         struct cku_private *p = htop(h);
 299         rdma_registry_t *rp;
 300 
 301         ASSERT(INGLOBALZONE(curproc));
 302         /*
 303          * Find underlying RDMATF plugin
 304          */
 305         p->cku_rd_mod = NULL;
 306         rw_enter(&rdma_lock, RW_READER);
 307         rp = rdma_mod_head;
 308         while (rp != NULL) {
 309                 if (strcmp(rp->r_mod->rdma_api, proto))
 310                         rp = rp->r_next;
 311                 else {
 312                         p->cku_rd_mod = rp->r_mod;
 313                         p->cku_rd_handle = handle;
 314                         break;
 315                 }
 316 
 317         }
 318         rw_exit(&rdma_lock);
 319 
 320         /*
 321          * Set up the rpc information
 322          */
 323         p->cku_cred = cred;
 324         p->cku_xid = 0;
 325 
 326         if (p->cku_addr.maxlen < raddr->len) {
 327                 if (p->cku_addr.maxlen != 0 && p->cku_addr.buf != NULL)
 328                         kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
 329                 p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
 330                 p->cku_addr.maxlen = raddr->maxlen;
 331         }
 332 
 333         p->cku_srcaddr.len = 0;
 334 
 335         p->cku_addr.len = raddr->len;
 336         bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
 337         h->cl_ops = &rdma_clnt_ops;
 338 }
 339 
 340 static int
 341 clnt_compose_rpcmsg(CLIENT *h, rpcproc_t procnum,
 342     rdma_buf_t *rpcmsg, XDR *xdrs,
 343     xdrproc_t xdr_args, caddr_t argsp)
 344 {
 345         cku_private_t *p = htop(h);
 346 
 347         if (h->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
 348                 /*
 349                  * Copy in the preserialized RPC header
 350                  * information.
 351                  */
 352                 bcopy(p->cku_rpchdr, rpcmsg->addr, CKU_HDRSIZE);
 353 
 354                 /*
 355                  * transaction id is the 1st thing in the output
 356                  * buffer.
 357                  */
 358                 /* LINTED pointer alignment */
 359                 (*(uint32_t *)(rpcmsg->addr)) = p->cku_xid;
 360 
 361                 /* Skip the preserialized stuff. */
 362                 XDR_SETPOS(xdrs, CKU_HDRSIZE);
 363 
 364                 /* Serialize dynamic stuff into the output buffer. */
 365                 if ((!XDR_PUTINT32(xdrs, (int32_t *)&procnum)) ||
 366                     (!AUTH_MARSHALL(h->cl_auth, xdrs, p->cku_cred)) ||
 367                     (!(*xdr_args)(xdrs, argsp))) {
 368                         DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__dynargs);
 369                         return (CLNT_RDMA_FAIL);
 370                 }
 371                 p->cku_outsz = XDR_GETPOS(xdrs);
 372         } else {
 373                 uint32_t *uproc = (uint32_t *)&p->cku_rpchdr[CKU_HDRSIZE];
 374                 IXDR_PUT_U_INT32(uproc, procnum);
 375                 (*(uint32_t *)(&p->cku_rpchdr[0])) = p->cku_xid;
 376                 XDR_SETPOS(xdrs, 0);
 377 
 378                 /* Serialize the procedure number and the arguments. */
 379                 if (!AUTH_WRAP(h->cl_auth, (caddr_t)p->cku_rpchdr,
 380                     CKU_HDRSIZE+4, xdrs, xdr_args, argsp)) {
 381                         if (rpcmsg->addr != xdrs->x_base) {
 382                                 rpcmsg->addr = xdrs->x_base;
 383                                 rpcmsg->len = xdr_getbufsize(xdrs);
 384                         }
 385                         DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__procnum);
 386                         return (CLNT_RDMA_FAIL);
 387                 }
 388                 /*
 389                  * If we had to allocate a new buffer while encoding
 390                  * then update the addr and len.
 391                  */
 392                 if (rpcmsg->addr != xdrs->x_base) {
 393                         rpcmsg->addr = xdrs->x_base;
 394                         rpcmsg->len = xdr_getbufsize(xdrs);
 395                 }
 396 
 397                 p->cku_outsz = XDR_GETPOS(xdrs);
 398                 DTRACE_PROBE1(krpc__i__compose__size__sec, int, p->cku_outsz)
 399         }
 400 
 401         return (CLNT_RDMA_SUCCESS);
 402 }
 403 
 404 static int
 405 clnt_compose_rdma_header(CONN *conn, CLIENT *h, rdma_buf_t *clmsg,
 406     XDR **xdrs, uint_t *op)
 407 {
 408         cku_private_t *p = htop(h);
 409         uint_t vers;
 410         uint32_t rdma_credit = rdma_bufs_rqst;
 411 
 412         vers = RPCRDMA_VERS;
 413         clmsg->type = SEND_BUFFER;
 414 
 415         if (rdma_buf_alloc(conn, clmsg)) {
 416                 return (CLNT_RDMA_FAIL);
 417         }
 418 
 419         *xdrs = &p->cku_outxdr;
 420         xdrmem_create(*xdrs, clmsg->addr, clmsg->len, XDR_ENCODE);
 421 
 422         (*(uint32_t *)clmsg->addr) = p->cku_xid;
 423         XDR_SETPOS(*xdrs, sizeof (uint32_t));
 424         (void) xdr_u_int(*xdrs, &vers);
 425         (void) xdr_u_int(*xdrs, &rdma_credit);
 426         (void) xdr_u_int(*xdrs, op);
 427 
 428         return (CLNT_RDMA_SUCCESS);
 429 }
 430 
 431 /*
 432  * If xp_cl is NULL value, then the RPC payload will NOT carry
 433  * an RDMA READ chunk list, in this case we insert FALSE into
 434  * the XDR stream. Otherwise we use the clist and RDMA register
 435  * the memory and encode the clist into the outbound XDR stream.
 436  */
 437 static int
 438 clnt_setup_rlist(CONN *conn, XDR *xdrs, XDR *call_xdrp)
 439 {
 440         int status;
 441         struct clist *rclp;
 442         int32_t xdr_flag = XDR_RDMA_RLIST_REG;
 443 
 444         XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &rclp);
 445 
 446         if (rclp != NULL) {
 447                 status = clist_register(conn, rclp, CLIST_REG_SOURCE);
 448                 if (status != RDMA_SUCCESS) {
 449                         return (CLNT_RDMA_FAIL);
 450                 }
 451                 XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
 452         }
 453         (void) xdr_do_clist(xdrs, &rclp);
 454 
 455         return (CLNT_RDMA_SUCCESS);
 456 }
 457 
 458 /*
 459  * If xp_wcl is NULL value, then the RPC payload will NOT carry
 460  * an RDMA WRITE chunk list, in this case we insert FALSE into
 461  * the XDR stream. Otherwise we use the clist and  RDMA register
 462  * the memory and encode the clist into the outbound XDR stream.
 463  */
 464 static int
 465 clnt_setup_wlist(CONN *conn, XDR *xdrs, XDR *call_xdrp, rdma_buf_t *rndbuf)
 466 {
 467         int status;
 468         struct clist *wlist, *rndcl;
 469         int wlen, rndlen;
 470         int32_t xdr_flag = XDR_RDMA_WLIST_REG;
 471 
 472         XDR_CONTROL(call_xdrp, XDR_RDMA_GET_WLIST, &wlist);
 473 
 474         if (wlist != NULL) {
 475                 /*
 476                  * If we are sending a non 4-byte alligned length
 477                  * the server will roundup the length to 4-byte
 478                  * boundary. In such a case, a trailing chunk is
 479                  * added to take any spill over roundup bytes.
 480                  */
 481                 wlen = clist_len(wlist);
 482                 rndlen = (roundup(wlen, BYTES_PER_XDR_UNIT) - wlen);
 483                 if (rndlen) {
 484                         rndcl = clist_alloc();
 485                         /*
 486                          * calc_length() will allocate a PAGESIZE
 487                          * buffer below.
 488                          */
 489                         rndcl->c_len = calc_length(rndlen);
 490                         rndcl->rb_longbuf.type = RDMA_LONG_BUFFER;
 491                         rndcl->rb_longbuf.len = rndcl->c_len;
 492                         if (rdma_buf_alloc(conn, &rndcl->rb_longbuf)) {
 493                                 clist_free(rndcl);
 494                                 return (CLNT_RDMA_FAIL);
 495                         }
 496 
 497                         /* Roundup buffer freed back in caller */
 498                         *rndbuf = rndcl->rb_longbuf;
 499 
 500                         rndcl->u.c_daddr3 = rndcl->rb_longbuf.addr;
 501                         rndcl->c_next = NULL;
 502                         rndcl->c_dmemhandle = rndcl->rb_longbuf.handle;
 503                         wlist->c_next = rndcl;
 504                 }
 505 
 506                 status = clist_register(conn, wlist, CLIST_REG_DST);
 507                 if (status != RDMA_SUCCESS) {
 508                         rdma_buf_free(conn, rndbuf);
 509                         bzero(rndbuf, sizeof (rdma_buf_t));
 510                         return (CLNT_RDMA_FAIL);
 511                 }
 512                 XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
 513         }
 514 
 515         if (!xdr_encode_wlist(xdrs, wlist)) {
 516                 if (rndlen) {
 517                         rdma_buf_free(conn, rndbuf);
 518                         bzero(rndbuf, sizeof (rdma_buf_t));
 519                 }
 520                 return (CLNT_RDMA_FAIL);
 521         }
 522 
 523         return (CLNT_RDMA_SUCCESS);
 524 }
 525 
 526 static int
 527 clnt_setup_long_reply(CONN *conn, struct clist **clpp, uint_t length)
 528 {
 529         if (length == 0) {
 530                 *clpp = NULL;
 531                 return (CLNT_RDMA_SUCCESS);
 532         }
 533 
 534         *clpp = clist_alloc();
 535 
 536         (*clpp)->rb_longbuf.len = calc_length(length);
 537         (*clpp)->rb_longbuf.type = RDMA_LONG_BUFFER;
 538 
 539         if (rdma_buf_alloc(conn, &((*clpp)->rb_longbuf))) {
 540                 clist_free(*clpp);
 541                 *clpp = NULL;
 542                 return (CLNT_RDMA_FAIL);
 543         }
 544 
 545         (*clpp)->u.c_daddr3 = (*clpp)->rb_longbuf.addr;
 546         (*clpp)->c_len = (*clpp)->rb_longbuf.len;
 547         (*clpp)->c_next = NULL;
 548         (*clpp)->c_dmemhandle = (*clpp)->rb_longbuf.handle;
 549 
 550         if (clist_register(conn, *clpp, CLIST_REG_DST)) {
 551                 DTRACE_PROBE(krpc__e__clntrdma__longrep_regbuf);
 552                 rdma_buf_free(conn, &((*clpp)->rb_longbuf));
 553                 clist_free(*clpp);
 554                 *clpp = NULL;
 555                 return (CLNT_RDMA_FAIL);
 556         }
 557 
 558         return (CLNT_RDMA_SUCCESS);
 559 }
 560 
 561 /* ARGSUSED */
 562 static enum clnt_stat
 563 clnt_rdma_kcallit(CLIENT *h, rpcproc_t procnum, xdrproc_t xdr_args,
 564     caddr_t argsp, xdrproc_t xdr_results, caddr_t resultsp,
 565     struct timeval wait)
 566 {
 567         cku_private_t *p = htop(h);
 568 
 569         int     try_call_again;
 570         int     refresh_attempt = AUTH_REFRESH_COUNT;
 571         int     status;
 572         int     msglen;
 573 
 574         XDR     *call_xdrp, callxdr; /* for xdrrdma encoding the RPC call */
 575         XDR     *reply_xdrp, replyxdr; /* for xdrrdma decoding the RPC reply */
 576         XDR     *rdmahdr_o_xdrs, *rdmahdr_i_xdrs;
 577 
 578         struct rpc_msg  reply_msg;
 579         rdma_registry_t *m;
 580 
 581         struct clist *cl_sendlist;
 582         struct clist *cl_recvlist;
 583         struct clist *cl;
 584         struct clist *cl_rpcmsg;
 585         struct clist *cl_rdma_reply;
 586         struct clist *cl_rpcreply_wlist;
 587         struct clist *cl_long_reply;
 588         rdma_buf_t  rndup;
 589 
 590         uint_t vers;
 591         uint_t op;
 592         uint_t off;
 593         uint32_t seg_array_len;
 594         uint_t long_reply_len;
 595         uint_t rpcsec_gss;
 596         uint_t gss_i_or_p;
 597 
 598         CONN *conn = NULL;
 599         rdma_buf_t clmsg;
 600         rdma_buf_t rpcmsg;
 601         rdma_chunkinfo_lengths_t rcil;
 602 
 603         clock_t ticks;
 604         bool_t wlist_exists_reply;
 605 
 606         uint32_t rdma_credit = rdma_bufs_rqst;
 607 
 608         RCSTAT_INCR(rccalls);
 609 
 610 call_again:
 611 
 612         bzero(&clmsg, sizeof (clmsg));
 613         bzero(&rpcmsg, sizeof (rpcmsg));
 614         bzero(&rndup, sizeof (rndup));
 615         try_call_again = 0;
 616         cl_sendlist = NULL;
 617         cl_recvlist = NULL;
 618         cl = NULL;
 619         cl_rpcmsg = NULL;
 620         cl_rdma_reply = NULL;
 621         call_xdrp = NULL;
 622         reply_xdrp = NULL;
 623         wlist_exists_reply  = FALSE;
 624         cl_rpcreply_wlist = NULL;
 625         cl_long_reply = NULL;
 626         rcil.rcil_len = 0;
 627         rcil.rcil_len_alt = 0;
 628         long_reply_len = 0;
 629 
 630         rw_enter(&rdma_lock, RW_READER);
 631         m = (rdma_registry_t *)p->cku_rd_handle;
 632         if (m->r_mod_state == RDMA_MOD_INACTIVE) {
 633                 /*
 634                  * If we didn't find a matching RDMA module in the registry
 635                  * then there is no transport.
 636                  */
 637                 rw_exit(&rdma_lock);
 638                 p->cku_err.re_status = RPC_CANTSEND;
 639                 p->cku_err.re_errno = EIO;
 640                 ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
 641                 if (h->cl_nosignal == TRUE) {
 642                         delay(ticks);
 643                 } else {
 644                         if (delay_sig(ticks) == EINTR) {
 645                                 p->cku_err.re_status = RPC_INTR;
 646                                 p->cku_err.re_errno = EINTR;
 647                         }
 648                 }
 649                 return (RPC_CANTSEND);
 650         }
 651         /*
 652          * Get unique xid
 653          */
 654         if (p->cku_xid == 0)
 655                 p->cku_xid = alloc_xid();
 656 
 657         status = RDMA_GET_CONN(p->cku_rd_mod->rdma_ops, &p->cku_srcaddr,
 658             &p->cku_addr, p->cku_addrfmly, p->cku_rd_handle, &conn);
 659         rw_exit(&rdma_lock);
 660 
 661         /*
 662          * If there is a problem with the connection reflect the issue
 663          * back to the higher level to address, we MAY delay for a short
 664          * period so that we are kind to the transport.
 665          */
 666         if (conn == NULL) {
 667                 /*
 668                  * Connect failed to server. Could be because of one
 669                  * of several things. In some cases we don't want
 670                  * the caller to retry immediately - delay before
 671                  * returning to caller.
 672                  */
 673                 switch (status) {
 674                 case RDMA_TIMEDOUT:
 675                         /*
 676                          * Already timed out. No need to delay
 677                          * some more.
 678                          */
 679                         p->cku_err.re_status = RPC_TIMEDOUT;
 680                         p->cku_err.re_errno = ETIMEDOUT;
 681                         break;
 682                 case RDMA_INTR:
 683                         /*
 684                          * Failed because of an signal. Very likely
 685                          * the caller will not retry.
 686                          */
 687                         p->cku_err.re_status = RPC_INTR;
 688                         p->cku_err.re_errno = EINTR;
 689                         break;
 690                 default:
 691                         /*
 692                          * All other failures - server down or service
 693                          * down or temporary resource failure. Delay before
 694                          * returning to caller.
 695                          */
 696                         ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
 697                         p->cku_err.re_status = RPC_CANTCONNECT;
 698                         p->cku_err.re_errno = EIO;
 699 
 700                         if (h->cl_nosignal == TRUE) {
 701                                 delay(ticks);
 702                         } else {
 703                                 if (delay_sig(ticks) == EINTR) {
 704                                         p->cku_err.re_status = RPC_INTR;
 705                                         p->cku_err.re_errno = EINTR;
 706                                 }
 707                         }
 708                         break;
 709                 }
 710 
 711                 return (p->cku_err.re_status);
 712         }
 713 
 714         if (p->cku_srcaddr.maxlen < conn->c_laddr.len) {
 715                 if ((p->cku_srcaddr.maxlen != 0) &&
 716                     (p->cku_srcaddr.buf != NULL))
 717                         kmem_free(p->cku_srcaddr.buf, p->cku_srcaddr.maxlen);
 718                 p->cku_srcaddr.buf = kmem_zalloc(conn->c_laddr.maxlen,
 719                     KM_SLEEP);
 720                 p->cku_srcaddr.maxlen = conn->c_laddr.maxlen;
 721         }
 722 
 723         p->cku_srcaddr.len = conn->c_laddr.len;
 724         bcopy(conn->c_laddr.buf, p->cku_srcaddr.buf, conn->c_laddr.len);
 725 
 726         clnt_check_credit(conn);
 727 
 728         status = CLNT_RDMA_FAIL;
 729 
 730         rpcsec_gss = gss_i_or_p = FALSE;
 731 
 732         if (IS_RPCSEC_GSS(h)) {
 733                 rpcsec_gss = TRUE;
 734                 if (rpc_gss_get_service_type(h->cl_auth) ==
 735                     rpc_gss_svc_integrity ||
 736                     rpc_gss_get_service_type(h->cl_auth) ==
 737                     rpc_gss_svc_privacy)
 738                         gss_i_or_p = TRUE;
 739         }
 740 
 741         /*
 742          * Try a regular RDMA message if RPCSEC_GSS is not being used
 743          * or if RPCSEC_GSS is being used for authentication only.
 744          */
 745         if (rpcsec_gss == FALSE ||
 746             (rpcsec_gss == TRUE && gss_i_or_p == FALSE)) {
 747                 /*
 748                  * Grab a send buffer for the request.  Try to
 749                  * encode it to see if it fits. If not, then it
 750                  * needs to be sent in a chunk.
 751                  */
 752                 rpcmsg.type = SEND_BUFFER;
 753                 if (rdma_buf_alloc(conn, &rpcmsg)) {
 754                         DTRACE_PROBE(krpc__e__clntrdma__callit_nobufs);
 755                         goto done;
 756                 }
 757 
 758                 /* First try to encode into regular send buffer */
 759                 op = RDMA_MSG;
 760 
 761                 call_xdrp = &callxdr;
 762 
 763                 xdrrdma_create(call_xdrp, rpcmsg.addr, rpcmsg.len,
 764                     rdma_minchunk, NULL, XDR_ENCODE, conn);
 765 
 766                 status = clnt_compose_rpcmsg(h, procnum, &rpcmsg, call_xdrp,
 767                     xdr_args, argsp);
 768 
 769                 if (status != CLNT_RDMA_SUCCESS) {
 770                         /* Clean up from previous encode attempt */
 771                         rdma_buf_free(conn, &rpcmsg);
 772                         XDR_DESTROY(call_xdrp);
 773                 } else {
 774                         XDR_CONTROL(call_xdrp, XDR_RDMA_GET_CHUNK_LEN, &rcil);
 775                 }
 776         }
 777 
 778         /* If the encode didn't work, then try a NOMSG */
 779         if (status != CLNT_RDMA_SUCCESS) {
 780 
 781                 msglen = CKU_HDRSIZE + BYTES_PER_XDR_UNIT + MAX_AUTH_BYTES +
 782                     xdr_sizeof(xdr_args, argsp);
 783 
 784                 msglen = calc_length(msglen);
 785 
 786                 /* pick up the lengths for the reply buffer needed */
 787                 (void) xdrrdma_sizeof(xdr_args, argsp, 0,
 788                     &rcil.rcil_len, &rcil.rcil_len_alt);
 789 
 790                 /*
 791                  * Construct a clist to describe the CHUNK_BUFFER
 792                  * for the rpcmsg.
 793                  */
 794                 cl_rpcmsg = clist_alloc();
 795                 cl_rpcmsg->c_len = msglen;
 796                 cl_rpcmsg->rb_longbuf.type = RDMA_LONG_BUFFER;
 797                 cl_rpcmsg->rb_longbuf.len = msglen;
 798                 if (rdma_buf_alloc(conn, &cl_rpcmsg->rb_longbuf)) {
 799                         clist_free(cl_rpcmsg);
 800                         goto done;
 801                 }
 802                 cl_rpcmsg->w.c_saddr3 = cl_rpcmsg->rb_longbuf.addr;
 803 
 804                 op = RDMA_NOMSG;
 805                 call_xdrp = &callxdr;
 806 
 807                 xdrrdma_create(call_xdrp, cl_rpcmsg->rb_longbuf.addr,
 808                     cl_rpcmsg->rb_longbuf.len, 0,
 809                     cl_rpcmsg, XDR_ENCODE, conn);
 810 
 811                 status = clnt_compose_rpcmsg(h, procnum, &cl_rpcmsg->rb_longbuf,
 812                     call_xdrp, xdr_args, argsp);
 813 
 814                 DTRACE_PROBE2(krpc__i__clntrdma__callit__longbuf, int, status,
 815                     int, msglen);
 816                 if (status != CLNT_RDMA_SUCCESS) {
 817                         p->cku_err.re_status = RPC_CANTENCODEARGS;
 818                         p->cku_err.re_errno = EIO;
 819                         DTRACE_PROBE(krpc__e__clntrdma__callit__composemsg);
 820                         goto done;
 821                 }
 822         }
 823 
 824         /*
 825          * During the XDR_ENCODE we may have "allocated" an RDMA READ or
 826          * RDMA WRITE clist.
 827          *
 828          * First pull the RDMA READ chunk list from the XDR private
 829          * area to keep it handy.
 830          */
 831         XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &cl);
 832 
 833         if (gss_i_or_p) {
 834                 long_reply_len = rcil.rcil_len + rcil.rcil_len_alt;
 835                 long_reply_len += MAX_AUTH_BYTES;
 836         } else {
 837                 long_reply_len = rcil.rcil_len;
 838         }
 839 
 840         /*
 841          * Update the chunk size information for the Long RPC msg.
 842          */
 843         if (cl && op == RDMA_NOMSG)
 844                 cl->c_len = p->cku_outsz;
 845 
 846         /*
 847          * Prepare the RDMA header. On success xdrs will hold the result
 848          * of xdrmem_create() for a SEND_BUFFER.
 849          */
 850         status = clnt_compose_rdma_header(conn, h, &clmsg,
 851             &rdmahdr_o_xdrs, &op);
 852 
 853         if (status != CLNT_RDMA_SUCCESS) {
 854                 p->cku_err.re_status = RPC_CANTSEND;
 855                 p->cku_err.re_errno = EIO;
 856                 RCSTAT_INCR(rcnomem);
 857                 DTRACE_PROBE(krpc__e__clntrdma__callit__nobufs2);
 858                 goto done;
 859         }
 860 
 861         /*
 862          * Now insert the RDMA READ list iff present
 863          */
 864         status = clnt_setup_rlist(conn, rdmahdr_o_xdrs, call_xdrp);
 865         if (status != CLNT_RDMA_SUCCESS) {
 866                 DTRACE_PROBE(krpc__e__clntrdma__callit__clistreg);
 867                 rdma_buf_free(conn, &clmsg);
 868                 p->cku_err.re_status = RPC_CANTSEND;
 869                 p->cku_err.re_errno = EIO;
 870                 goto done;
 871         }
 872 
 873         /*
 874          * Setup RDMA WRITE chunk list for nfs read operation
 875          * other operations will have a NULL which will result
 876          * as a NULL list in the XDR stream.
 877          */
 878         status = clnt_setup_wlist(conn, rdmahdr_o_xdrs, call_xdrp, &rndup);
 879         if (status != CLNT_RDMA_SUCCESS) {
 880                 rdma_buf_free(conn, &clmsg);
 881                 p->cku_err.re_status = RPC_CANTSEND;
 882                 p->cku_err.re_errno = EIO;
 883                 goto done;
 884         }
 885 
 886         /*
 887          * If NULL call and RPCSEC_GSS, provide a chunk such that
 888          * large responses can flow back to the client.
 889          * If RPCSEC_GSS with integrity or privacy is in use, get chunk.
 890          */
 891         if ((procnum == 0 && rpcsec_gss == TRUE) ||
 892             (rpcsec_gss == TRUE && gss_i_or_p == TRUE))
 893                 long_reply_len += 1024;
 894 
 895         status = clnt_setup_long_reply(conn, &cl_long_reply, long_reply_len);
 896 
 897         DTRACE_PROBE2(krpc__i__clntrdma__callit__longreply, int, status,
 898             int, long_reply_len);
 899 
 900         if (status != CLNT_RDMA_SUCCESS) {
 901                 rdma_buf_free(conn, &clmsg);
 902                 p->cku_err.re_status = RPC_CANTSEND;
 903                 p->cku_err.re_errno = EIO;
 904                 goto done;
 905         }
 906 
 907         /*
 908          * XDR encode the RDMA_REPLY write chunk
 909          */
 910         seg_array_len = (cl_long_reply ? 1 : 0);
 911         (void) xdr_encode_reply_wchunk(rdmahdr_o_xdrs, cl_long_reply,
 912             seg_array_len);
 913 
 914         /*
 915          * Construct a clist in "sendlist" that represents what we
 916          * will push over the wire.
 917          *
 918          * Start with the RDMA header and clist (if any)
 919          */
 920         clist_add(&cl_sendlist, 0, XDR_GETPOS(rdmahdr_o_xdrs), &clmsg.handle,
 921             clmsg.addr, NULL, NULL);
 922 
 923         /*
 924          * Put the RPC call message in  sendlist if small RPC
 925          */
 926         if (op == RDMA_MSG) {
 927                 clist_add(&cl_sendlist, 0, p->cku_outsz, &rpcmsg.handle,
 928                     rpcmsg.addr, NULL, NULL);
 929         } else {
 930                 /* Long RPC already in chunk list */
 931                 RCSTAT_INCR(rclongrpcs);
 932         }
 933 
 934         /*
 935          * Set up a reply buffer ready for the reply
 936          */
 937         status = rdma_clnt_postrecv(conn, p->cku_xid);
 938         if (status != RDMA_SUCCESS) {
 939                 rdma_buf_free(conn, &clmsg);
 940                 p->cku_err.re_status = RPC_CANTSEND;
 941                 p->cku_err.re_errno = EIO;
 942                 goto done;
 943         }
 944 
 945         /*
 946          * sync the memory for dma
 947          */
 948         if (cl != NULL) {
 949                 status = clist_syncmem(conn, cl, CLIST_REG_SOURCE);
 950                 if (status != RDMA_SUCCESS) {
 951                         (void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
 952                         rdma_buf_free(conn, &clmsg);
 953                         p->cku_err.re_status = RPC_CANTSEND;
 954                         p->cku_err.re_errno = EIO;
 955                         goto done;
 956                 }
 957         }
 958 
 959         /*
 960          * Send the RDMA Header and RPC call message to the server
 961          */
 962         status = RDMA_SEND(conn, cl_sendlist, p->cku_xid);
 963         if (status != RDMA_SUCCESS) {
 964                 (void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
 965                 p->cku_err.re_status = RPC_CANTSEND;
 966                 p->cku_err.re_errno = EIO;
 967                 goto done;
 968         }
 969 
 970         /*
 971          * RDMA plugin now owns the send msg buffers.
 972          * Clear them out and don't free them.
 973          */
 974         clmsg.addr = NULL;
 975         if (rpcmsg.type == SEND_BUFFER)
 976                 rpcmsg.addr = NULL;
 977 
 978         /*
 979          * Recv rpc reply
 980          */
 981         status = RDMA_RECV(conn, &cl_recvlist, p->cku_xid);
 982 
 983         /*
 984          * Now check recv status
 985          */
 986         if (status != 0) {
 987                 if (status == RDMA_INTR) {
 988                         p->cku_err.re_status = RPC_INTR;
 989                         p->cku_err.re_errno = EINTR;
 990                         RCSTAT_INCR(rcintrs);
 991                 } else if (status == RPC_TIMEDOUT) {
 992                         p->cku_err.re_status = RPC_TIMEDOUT;
 993                         p->cku_err.re_errno = ETIMEDOUT;
 994                         RCSTAT_INCR(rctimeouts);
 995                 } else {
 996                         p->cku_err.re_status = RPC_CANTRECV;
 997                         p->cku_err.re_errno = EIO;
 998                 }
 999                 goto done;
1000         }
1001 
1002         /*
1003          * Process the reply message.
1004          *
1005          * First the chunk list (if any)
1006          */
1007         rdmahdr_i_xdrs = &(p->cku_inxdr);
1008         xdrmem_create(rdmahdr_i_xdrs,
1009             (caddr_t)(uintptr_t)cl_recvlist->w.c_saddr3,
1010             cl_recvlist->c_len, XDR_DECODE);
1011 
1012         /*
1013          * Treat xid as opaque (xid is the first entity
1014          * in the rpc rdma message).
1015          * Skip xid and set the xdr position accordingly.
1016          */
1017         XDR_SETPOS(rdmahdr_i_xdrs, sizeof (uint32_t));
1018         (void) xdr_u_int(rdmahdr_i_xdrs, &vers);
1019         (void) xdr_u_int(rdmahdr_i_xdrs, &rdma_credit);
1020         (void) xdr_u_int(rdmahdr_i_xdrs, &op);
1021         (void) xdr_do_clist(rdmahdr_i_xdrs, &cl);
1022 
1023         clnt_update_credit(conn, rdma_credit);
1024 
1025         wlist_exists_reply = FALSE;
1026         if (! xdr_decode_wlist(rdmahdr_i_xdrs, &cl_rpcreply_wlist,
1027             &wlist_exists_reply)) {
1028                 DTRACE_PROBE(krpc__e__clntrdma__callit__wlist_decode);
1029                 p->cku_err.re_status = RPC_CANTDECODERES;
1030                 p->cku_err.re_errno = EIO;
1031                 goto done;
1032         }
1033 
1034         /*
1035          * The server shouldn't have sent a RDMA_SEND that
1036          * the client needs to RDMA_WRITE a reply back to
1037          * the server.  So silently ignoring what the
1038          * server returns in the rdma_reply section of the
1039          * header.
1040          */
1041         (void) xdr_decode_reply_wchunk(rdmahdr_i_xdrs, &cl_rdma_reply);
1042         off = xdr_getpos(rdmahdr_i_xdrs);
1043 
1044         clnt_decode_long_reply(conn, cl_long_reply,
1045             cl_rdma_reply, &replyxdr, &reply_xdrp,
1046             cl, cl_recvlist, op, off);
1047 
1048         if (reply_xdrp == NULL)
1049                 goto done;
1050 
1051         if (wlist_exists_reply) {
1052                 XDR_CONTROL(reply_xdrp, XDR_RDMA_SET_WLIST, cl_rpcreply_wlist);
1053         }
1054 
1055         reply_msg.rm_direction = REPLY;
1056         reply_msg.rm_reply.rp_stat = MSG_ACCEPTED;
1057         reply_msg.acpted_rply.ar_stat = SUCCESS;
1058         reply_msg.acpted_rply.ar_verf = _null_auth;
1059 
1060         /*
1061          *  xdr_results will be done in AUTH_UNWRAP.
1062          */
1063         reply_msg.acpted_rply.ar_results.where = NULL;
1064         reply_msg.acpted_rply.ar_results.proc = xdr_void;
1065 
1066         /*
1067          * Decode and validate the response.
1068          */
1069         if (xdr_replymsg(reply_xdrp, &reply_msg)) {
1070                 enum clnt_stat re_status;
1071 
1072                 _seterr_reply(&reply_msg, &(p->cku_err));
1073 
1074                 re_status = p->cku_err.re_status;
1075                 if (re_status == RPC_SUCCESS) {
1076                         /*
1077                          * Reply is good, check auth.
1078                          */
1079                         if (!AUTH_VALIDATE(h->cl_auth,
1080                             &reply_msg.acpted_rply.ar_verf)) {
1081                                 p->cku_err.re_status = RPC_AUTHERROR;
1082                                 p->cku_err.re_why = AUTH_INVALIDRESP;
1083                                 RCSTAT_INCR(rcbadverfs);
1084                                 DTRACE_PROBE(
1085                                     krpc__e__clntrdma__callit__authvalidate);
1086                         } else if (!AUTH_UNWRAP(h->cl_auth, reply_xdrp,
1087                             xdr_results, resultsp)) {
1088                                 p->cku_err.re_status = RPC_CANTDECODERES;
1089                                 p->cku_err.re_errno = EIO;
1090                                 DTRACE_PROBE(
1091                                     krpc__e__clntrdma__callit__authunwrap);
1092                         }
1093                 } else {
1094                         /* set errno in case we can't recover */
1095                         if (re_status != RPC_VERSMISMATCH &&
1096                             re_status != RPC_AUTHERROR &&
1097                             re_status != RPC_PROGVERSMISMATCH)
1098                                 p->cku_err.re_errno = EIO;
1099 
1100                         if (re_status == RPC_AUTHERROR) {
1101                                 if ((refresh_attempt > 0) &&
1102                                     AUTH_REFRESH(h->cl_auth, &reply_msg,
1103                                     p->cku_cred)) {
1104                                         refresh_attempt--;
1105                                         try_call_again = 1;
1106                                         goto done;
1107                                 }
1108 
1109                                 try_call_again = 0;
1110 
1111                                 /*
1112                                  * We have used the client handle to
1113                                  * do an AUTH_REFRESH and the RPC status may
1114                                  * be set to RPC_SUCCESS; Let's make sure to
1115                                  * set it to RPC_AUTHERROR.
1116                                  */
1117                                 p->cku_err.re_status = RPC_AUTHERROR;
1118 
1119                                 /*
1120                                  * Map recoverable and unrecoverable
1121                                  * authentication errors to appropriate
1122                                  * errno
1123                                  */
1124                                 switch (p->cku_err.re_why) {
1125                                 case AUTH_BADCRED:
1126                                 case AUTH_BADVERF:
1127                                 case AUTH_INVALIDRESP:
1128                                 case AUTH_TOOWEAK:
1129                                 case AUTH_FAILED:
1130                                 case RPCSEC_GSS_NOCRED:
1131                                 case RPCSEC_GSS_FAILED:
1132                                         p->cku_err.re_errno = EACCES;
1133                                         break;
1134                                 case AUTH_REJECTEDCRED:
1135                                 case AUTH_REJECTEDVERF:
1136                                 default:
1137                                         p->cku_err.re_errno = EIO;
1138                                         break;
1139                                 }
1140                         }
1141                         DTRACE_PROBE1(krpc__e__clntrdma__callit__rpcfailed,
1142                             int, p->cku_err.re_why);
1143                 }
1144         } else {
1145                 p->cku_err.re_status = RPC_CANTDECODERES;
1146                 p->cku_err.re_errno = EIO;
1147                 DTRACE_PROBE(krpc__e__clntrdma__callit__replymsg);
1148         }
1149 
1150 done:
1151         clnt_return_credit(conn);
1152 
1153         if (cl_sendlist != NULL)
1154                 clist_free(cl_sendlist);
1155 
1156         /*
1157          * If rpc reply is in a chunk, free it now.
1158          */
1159         if (cl_long_reply) {
1160                 (void) clist_deregister(conn, cl_long_reply);
1161                 rdma_buf_free(conn, &cl_long_reply->rb_longbuf);
1162                 clist_free(cl_long_reply);
1163         }
1164 
1165         if (call_xdrp)
1166                 XDR_DESTROY(call_xdrp);
1167 
1168         if (rndup.rb_private) {
1169                 rdma_buf_free(conn, &rndup);
1170         }
1171 
1172         if (reply_xdrp) {
1173                 (void) xdr_rpc_free_verifier(reply_xdrp, &reply_msg);
1174                 XDR_DESTROY(reply_xdrp);
1175         }
1176 
1177         if (cl_rdma_reply) {
1178                 clist_free(cl_rdma_reply);
1179         }
1180 
1181         if (cl_recvlist) {
1182                 rdma_buf_t      recvmsg = {0};
1183                 recvmsg.addr = (caddr_t)(uintptr_t)cl_recvlist->w.c_saddr3;
1184                 recvmsg.type = RECV_BUFFER;
1185                 RDMA_BUF_FREE(conn, &recvmsg);
1186                 clist_free(cl_recvlist);
1187         }
1188 
1189         RDMA_REL_CONN(conn);
1190 
1191         if (try_call_again)
1192                 goto call_again;
1193 
1194         if (p->cku_err.re_status != RPC_SUCCESS) {
1195                 RCSTAT_INCR(rcbadcalls);
1196         }
1197         return (p->cku_err.re_status);
1198 }
1199 
1200 
1201 static void
1202 clnt_decode_long_reply(CONN *conn,
1203     struct clist *cl_long_reply,
1204     struct clist *cl_rdma_reply, XDR *xdrs,
1205     XDR **rxdrp, struct clist *cl,
1206     struct clist *cl_recvlist,
1207     uint_t  op, uint_t off)
1208 {
1209         if (op != RDMA_NOMSG) {
1210                 DTRACE_PROBE1(krpc__i__longrepl__rdmamsg__len,
1211                     int, cl_recvlist->c_len - off);
1212                 xdrrdma_create(xdrs,
1213                     (caddr_t)(uintptr_t)(cl_recvlist->w.c_saddr3 + off),
1214                     cl_recvlist->c_len - off, 0, cl, XDR_DECODE, conn);
1215                 *rxdrp = xdrs;
1216                 return;
1217         }
1218 
1219         /* op must be RDMA_NOMSG */
1220         if (cl) {
1221                 DTRACE_PROBE(krpc__e__clntrdma__declongreply__serverreadlist);
1222                 return;
1223         }
1224 
1225         if (cl_long_reply->u.c_daddr) {
1226                 DTRACE_PROBE1(krpc__i__longrepl__rdmanomsg__len,
1227                     int, cl_rdma_reply->c_len);
1228 
1229                 xdrrdma_create(xdrs, (caddr_t)cl_long_reply->u.c_daddr3,
1230                     cl_rdma_reply->c_len, 0, NULL, XDR_DECODE, conn);
1231 
1232                 *rxdrp = xdrs;
1233         }
1234 }
1235 
1236 static void
1237 clnt_return_credit(CONN *conn)
1238 {
1239         rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1240 
1241         mutex_enter(&conn->c_lock);
1242         cc_info->clnt_cc_in_flight_ops--;
1243         cv_signal(&cc_info->clnt_cc_cv);
1244         mutex_exit(&conn->c_lock);
1245 }
1246 
1247 static void
1248 clnt_update_credit(CONN *conn, uint32_t rdma_credit)
1249 {
1250         rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1251 
1252         /*
1253          * If the granted has not altered, avoid taking the
1254          * mutex, to essentially do nothing..
1255          */
1256         if (cc_info->clnt_cc_granted_ops == rdma_credit)
1257                 return;
1258         /*
1259          * Get the granted number of buffers for credit control.
1260          */
1261         mutex_enter(&conn->c_lock);
1262         cc_info->clnt_cc_granted_ops = rdma_credit;
1263         mutex_exit(&conn->c_lock);
1264 }
1265 
1266 static void
1267 clnt_check_credit(CONN *conn)
1268 {
1269         rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1270 
1271         /*
1272          * Make sure we are not going over our allowed buffer use
1273          * (and make sure we have gotten a granted value before).
1274          */
1275         mutex_enter(&conn->c_lock);
1276         while (cc_info->clnt_cc_in_flight_ops >= cc_info->clnt_cc_granted_ops &&
1277             cc_info->clnt_cc_granted_ops != 0) {
1278                 /*
1279                  * Client has maxed out its granted buffers due to
1280                  * credit control.  Current handling is to block and wait.
1281                  */
1282                 cv_wait(&cc_info->clnt_cc_cv, &conn->c_lock);
1283         }
1284         cc_info->clnt_cc_in_flight_ops++;
1285         mutex_exit(&conn->c_lock);
1286 }
1287 
1288 /* ARGSUSED */
1289 static void
1290 clnt_rdma_kabort(CLIENT *h)
1291 {
1292 }
1293 
1294 static void
1295 clnt_rdma_kerror(CLIENT *h, struct rpc_err *err)
1296 {
1297         struct cku_private *p = htop(h);
1298         *err = p->cku_err;
1299 }
1300 
1301 static bool_t
1302 clnt_rdma_kfreeres(CLIENT *h, xdrproc_t xdr_res, caddr_t res_ptr)
1303 {
1304         struct cku_private *p = htop(h);
1305         XDR *xdrs;
1306 
1307         xdrs = &(p->cku_outxdr);
1308         xdrs->x_op = XDR_FREE;
1309         return ((*xdr_res)(xdrs, res_ptr));
1310 }
1311 
1312 /* ARGSUSED */
1313 static bool_t
1314 clnt_rdma_kcontrol(CLIENT *h, int cmd, char *arg)
1315 {
1316         return (TRUE);
1317 }
1318 
1319 /* ARGSUSED */
1320 static int
1321 clnt_rdma_ksettimers(CLIENT *h, struct rpc_timers *t, struct rpc_timers *all,
1322         int minimum, void(*feedback)(int, int, caddr_t), caddr_t arg,
1323         uint32_t xid)
1324 {
1325         RCSTAT_INCR(rctimers);
1326         return (0);
1327 }
1328 
1329 int
1330 rdma_reachable(int addr_type, struct netbuf *addr, struct knetconfig **knconf)
1331 {
1332         rdma_registry_t *rp;
1333         void *handle = NULL;
1334         struct knetconfig *knc;
1335         char *pf, *p;
1336         rdma_stat status;
1337         int error = 0;
1338 
1339         if (!INGLOBALZONE(curproc))
1340                 return (-1);
1341 
1342         /*
1343          * modload the RDMA plugins if not already done.
1344          */
1345         if (!rdma_modloaded) {
1346                 mutex_enter(&rdma_modload_lock);
1347                 if (!rdma_modloaded) {
1348                         error = rdma_modload();
1349                 }
1350                 mutex_exit(&rdma_modload_lock);
1351                 if (error)
1352                         return (-1);
1353         }
1354 
1355         if (!rdma_dev_available)
1356                 return (-1);
1357 
1358         rw_enter(&rdma_lock, RW_READER);
1359         rp = rdma_mod_head;
1360         while (rp != NULL) {
1361                 if (rp->r_mod_state == RDMA_MOD_INACTIVE) {
1362                         rp = rp->r_next;
1363                         continue;
1364                 }
1365                 status = RDMA_REACHABLE(rp->r_mod->rdma_ops, addr_type, addr,
1366                     &handle);
1367                 if (status == RDMA_SUCCESS) {
1368                         knc = kmem_zalloc(sizeof (struct knetconfig),
1369                             KM_SLEEP);
1370                         knc->knc_semantics = NC_TPI_RDMA;
1371                         pf = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
1372                         p = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
1373                         if (addr_type == AF_INET)
1374                                 (void) strncpy(pf, NC_INET, KNC_STRSIZE);
1375                         else if (addr_type == AF_INET6)
1376                                 (void) strncpy(pf, NC_INET6, KNC_STRSIZE);
1377                         pf[KNC_STRSIZE - 1] = '\0';
1378 
1379                         (void) strncpy(p, rp->r_mod->rdma_api, KNC_STRSIZE);
1380                         p[KNC_STRSIZE - 1] = '\0';
1381 
1382                         knc->knc_protofmly = pf;
1383                         knc->knc_proto = p;
1384                         knc->knc_rdev = (dev_t)rp;
1385                         *knconf = knc;
1386                         rw_exit(&rdma_lock);
1387                         return (0);
1388                 }
1389                 rp = rp->r_next;
1390         }
1391         rw_exit(&rdma_lock);
1392         return (-1);
1393 }