1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  */
  25 
  26 /*
  27  * Copyright (c) 2013, Joyent, Inc.  All rights reserved.
  28  */
  29 
  30 #include <sys/param.h>
  31 #include <sys/thread.h>
  32 #include <sys/cmn_err.h>
  33 #include <sys/debug.h>
  34 #include <sys/cpuvar.h>
  35 #include <sys/sobject.h>
  36 #include <sys/turnstile.h>
  37 #include <sys/rwlock.h>
  38 #include <sys/rwlock_impl.h>
  39 #include <sys/atomic.h>
  40 #include <sys/lockstat.h>
  41 
  42 /*
  43  * Big Theory Statement for readers/writer locking primitives.
  44  *
  45  * An rwlock provides exclusive access to a single thread ("writer") or
  46  * concurrent access to multiple threads ("readers").  See rwlock(9F)
  47  * for a full description of the interfaces and programming model.
  48  * The rest of this comment describes the implementation.
  49  *
  50  * An rwlock is a single word with the following structure:
  51  *
  52  *      ---------------------------------------------------------------------
  53  *      | OWNER (writer) or HOLD COUNT (readers)   | WRLOCK | WRWANT | WAIT |
  54  *      ---------------------------------------------------------------------
  55  *                      63 / 31 .. 3                    2       1       0
  56  *
  57  * The waiters bit (0) indicates whether any threads are blocked waiting
  58  * for the lock.  The write-wanted bit (1) indicates whether any threads
  59  * are blocked waiting for write access.  The write-locked bit (2) indicates
  60  * whether the lock is held by a writer, which determines whether the upper
  61  * bits (3..31 in ILP32, 3..63 in LP64) should be interpreted as the owner
  62  * (thread pointer) or the hold count (number of readers).
  63  *
  64  * In the absence of any contention, a writer gets the lock by setting
  65  * this word to (curthread | RW_WRITE_LOCKED); a reader gets the lock
  66  * by incrementing the hold count (i.e. adding 8, aka RW_READ_LOCK).
  67  *
  68  * A writer will fail to acquire the lock if any other thread owns it.
  69  * A reader will fail if the lock is either owned (in the RW_READER and
  70  * RW_READER_STARVEWRITER cases) or wanted by a writer (in the RW_READER
  71  * case). rw_tryenter() returns 0 in these cases; rw_enter() blocks until
  72  * the lock becomes available.
  73  *
  74  * When a thread blocks it acquires the rwlock's hashed turnstile lock and
  75  * attempts to set RW_HAS_WAITERS (and RW_WRITE_WANTED in the writer case)
  76  * atomically *only if the lock still appears busy*.  A thread must never
  77  * accidentally block for an available lock since there would be no owner
  78  * to awaken it.  casip() provides the required atomicity.  Once casip()
  79  * succeeds, the decision to block becomes final and irreversible.  The
  80  * thread will not become runnable again until it has been granted ownership
  81  * of the lock via direct handoff from a former owner as described below.
  82  *
  83  * In the absence of any waiters, rw_exit() just clears the lock (if it
  84  * is write-locked) or decrements the hold count (if it is read-locked).
  85  * Note that even if waiters are present, decrementing the hold count
  86  * to a non-zero value requires no special action since the lock is still
  87  * held by at least one other thread.
  88  *
  89  * On the "final exit" (transition to unheld state) of a lock with waiters,
  90  * rw_exit_wakeup() grabs the turnstile lock and transfers ownership directly
  91  * to the next writer or set of readers.  There are several advantages to this
  92  * approach: (1) it closes all windows for priority inversion (when a new
  93  * writer has grabbed the lock but has not yet inherited from blocked readers);
  94  * (2) it prevents starvation of equal-priority threads by granting the lock
  95  * in FIFO order; (3) it eliminates the need for a write-wanted count -- a
  96  * single bit suffices because the lock remains held until all waiting
  97  * writers are gone; (4) when we awaken N readers we can perform a single
  98  * "atomic_add(&x, N)" to set the total hold count rather than having all N
  99  * threads fight for the cache to perform an "atomic_add(&x, 1)" upon wakeup.
 100  *
 101  * The most interesting policy decision in rw_exit_wakeup() is which thread
 102  * to wake.  Starvation is always possible with priority-based scheduling,
 103  * but any sane wakeup policy should at least satisfy these requirements:
 104  *
 105  * (1) The highest-priority thread in the system should not starve.
 106  * (2) The highest-priority writer should not starve.
 107  * (3) No writer should starve due to lower-priority threads.
 108  * (4) No reader should starve due to lower-priority writers.
 109  * (5) If all threads have equal priority, none of them should starve.
 110  *
 111  * We used to employ a writers-always-win policy, which doesn't even
 112  * satisfy (1): a steady stream of low-priority writers can starve out
 113  * a real-time reader!  This is clearly a broken policy -- it violates
 114  * (1), (4), and (5) -- but it's how rwlocks always used to behave.
 115  *
 116  * A round-robin policy (exiting readers grant the lock to blocked writers
 117  * and vice versa) satisfies all but (3): a single high-priority writer
 118  * and many low-priority readers can starve out medium-priority writers.
 119  *
 120  * A strict priority policy (grant the lock to the highest priority blocked
 121  * thread) satisfies everything but (2): a steady stream of high-priority
 122  * readers can permanently starve the highest-priority writer.
 123  *
 124  * The reason we care about (2) is that it's important to process writers
 125  * reasonably quickly -- even if they're low priority -- because their very
 126  * presence causes all readers to take the slow (blocking) path through this
 127  * code.  There is also a general sense that writers deserve some degree of
 128  * deference because they're updating the data upon which all readers act.
 129  * Presumably this data should not be allowed to become arbitrarily stale
 130  * due to writer starvation.  Finally, it seems reasonable to level the
 131  * playing field a bit to compensate for the fact that it's so much harder
 132  * for a writer to get in when there are already many readers present.
 133  *
 134  * A hybrid of round-robin and strict priority can be made to satisfy
 135  * all five criteria.  In this "writer priority policy" exiting readers
 136  * always grant the lock to waiting writers, but exiting writers only
 137  * grant the lock to readers of the same or higher priority than the
 138  * highest-priority blocked writer.  Thus requirement (2) is satisfied,
 139  * necessarily, by a willful act of priority inversion: an exiting reader
 140  * will grant the lock to a blocked writer even if there are blocked
 141  * readers of higher priority.  The situation is mitigated by the fact
 142  * that writers always inherit priority from blocked readers, and the
 143  * writer will awaken those readers as soon as it exits the lock.
 144  *
 145  * Finally, note that this hybrid scheme -- and indeed, any scheme that
 146  * satisfies requirement (2) -- has an important consequence:  if a lock is
 147  * held as reader and a writer subsequently becomes blocked, any further
 148  * readers must be blocked to avoid writer starvation.  This implementation
 149  * detail has ramifications for the semantics of rwlocks, as it prohibits
 150  * recursively acquiring an rwlock as reader: any writer that wishes to
 151  * acquire the lock after the first but before the second acquisition as
 152  * reader will block the second acquisition -- resulting in deadlock.  This
 153  * itself is not necessarily prohibitive, as it is often straightforward to
 154  * prevent a single thread from recursively acquiring an rwlock as reader.
 155  * However, a more subtle situation arises when both a traditional mutex and
 156  * a reader lock are acquired by two different threads in opposite order.
 157  * (That is, one thread first acquires the mutex and then the rwlock as
 158  * reader; the other acquires the rwlock as reader and then the mutex.) As
 159  * with the single threaded case, this is fine absent a blocked writer: the
 160  * thread that acquires the mutex before acquiring the rwlock as reader will
 161  * be able to successfully acquire the rwlock -- even as/if the other thread
 162  * has the rwlock as reader and is blocked on the held mutex.  However, if
 163  * an unrelated writer (that is, a third thread) becomes blocked on the
 164  * rwlock after the first thread acquires the rwlock as reader but before
 165  * it's able to acquire the mutex, the second thread -- with the mutex held
 166  * -- will not be able to acquire the rwlock as reader due to the waiting
 167  * writer, deadlocking the three threads.  Unlike the single-threaded
 168  * (recursive) rwlock acquisition case, this case can be quite a bit
 169  * thornier to fix, especially as there is nothing inherently wrong in the
 170  * locking strategy: the deadlock is really induced by requirement (2), not
 171  * the consumers of the rwlock.  To permit such consumers, we allow rwlock
 172  * acquirers to explicitly opt out of requirement (2) by specifying
 173  * RW_READER_STARVEWRITER when acquiring the rwlock.  This (obviously) means
 174  * that inifinite readers can starve writers, but it also allows for
 175  * multiple readers in the presence of other synchronization primitives
 176  * without regard for lock-ordering.  And while certainly odd (and perhaps
 177  * unwise), RW_READER_STARVEWRITER can be safely used alongside RW_READER on
 178  * the same lock -- RW_READER_STARVEWRITER describes only the act of lock
 179  * acquisition with respect to waiting writers, not the lock itself.
 180  *
 181  * rw_downgrade() follows the same wakeup policy as an exiting writer.
 182  *
 183  * rw_tryupgrade() has the same failure mode as rw_tryenter() for a
 184  * write lock.  Both honor the WRITE_WANTED bit by specification.
 185  *
 186  * The following rules apply to manipulation of rwlock internal state:
 187  *
 188  * (1) The rwlock is only modified via the atomic primitives casip()
 189  *     and atomic_add_ip().
 190  *
 191  * (2) The waiters bit and write-wanted bit are only modified under
 192  *     turnstile_lookup().  This ensures that the turnstile is consistent
 193  *     with the rwlock.
 194  *
 195  * (3) Waiters receive the lock by direct handoff from the previous
 196  *     owner.  Therefore, waiters *always* wake up holding the lock.
 197  */
 198 
 199 /*
 200  * The sobj_ops vector exports a set of functions needed when a thread
 201  * is asleep on a synchronization object of a given type.
 202  */
 203 static sobj_ops_t rw_sobj_ops = {
 204         SOBJ_RWLOCK, rw_owner, turnstile_stay_asleep, turnstile_change_pri
 205 };
 206 
 207 /*
 208  * If the system panics on an rwlock, save the address of the offending
 209  * rwlock in panic_rwlock_addr, and save the contents in panic_rwlock.
 210  */
 211 static rwlock_impl_t panic_rwlock;
 212 static rwlock_impl_t *panic_rwlock_addr;
 213 
 214 static void
 215 rw_panic(char *msg, rwlock_impl_t *lp)
 216 {
 217         if (panicstr)
 218                 return;
 219 
 220         if (casptr(&panic_rwlock_addr, NULL, lp) == NULL)
 221                 panic_rwlock = *lp;
 222 
 223         panic("%s, lp=%p wwwh=%lx thread=%p",
 224             msg, (void *)lp, panic_rwlock.rw_wwwh, (void *)curthread);
 225 }
 226 
 227 /* ARGSUSED */
 228 void
 229 rw_init(krwlock_t *rwlp, char *name, krw_type_t type, void *arg)
 230 {
 231         ((rwlock_impl_t *)rwlp)->rw_wwwh = 0;
 232 }
 233 
 234 void
 235 rw_destroy(krwlock_t *rwlp)
 236 {
 237         rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;
 238 
 239         if (lp->rw_wwwh != 0) {
 240                 if ((lp->rw_wwwh & RW_DOUBLE_LOCK) == RW_DOUBLE_LOCK)
 241                         rw_panic("rw_destroy: lock already destroyed", lp);
 242                 else
 243                         rw_panic("rw_destroy: lock still active", lp);
 244         }
 245 
 246         lp->rw_wwwh = RW_DOUBLE_LOCK;
 247 }
 248 
 249 /*
 250  * Verify that an rwlock is held correctly.
 251  */
 252 static int
 253 rw_locked(rwlock_impl_t *lp, krw_t rw)
 254 {
 255         uintptr_t old = lp->rw_wwwh;
 256 
 257         if (rw == RW_READER || rw == RW_READER_STARVEWRITER)
 258                 return ((old & RW_LOCKED) && !(old & RW_WRITE_LOCKED));
 259 
 260         if (rw == RW_WRITER)
 261                 return ((old & RW_OWNER) == (uintptr_t)curthread);
 262 
 263         return (0);
 264 }
 265 
 266 uint_t (*rw_lock_backoff)(uint_t) = NULL;
 267 void (*rw_lock_delay)(uint_t) = NULL;
 268 
 269 /*
 270  * Full-service implementation of rw_enter() to handle all the hard cases.
 271  * Called from the assembly version if anything complicated is going on.
 272  * The only semantic difference between calling rw_enter() and calling
 273  * rw_enter_sleep() directly is that we assume the caller has already done
 274  * a THREAD_KPRI_REQUEST() in the RW_READER cases.
 275  */
 276 void
 277 rw_enter_sleep(rwlock_impl_t *lp, krw_t rw)
 278 {
 279         uintptr_t old, new, lock_value, lock_busy, lock_wait;
 280         hrtime_t sleep_time;
 281         turnstile_t *ts;
 282         uint_t  backoff = 0;
 283         int loop_count = 0;
 284 
 285         if (rw == RW_READER) {
 286                 lock_value = RW_READ_LOCK;
 287                 lock_busy = RW_WRITE_CLAIMED;
 288                 lock_wait = RW_HAS_WAITERS;
 289         } else if (rw == RW_READER_STARVEWRITER) {
 290                 lock_value = RW_READ_LOCK;
 291                 lock_busy = RW_WRITE_LOCKED;
 292                 lock_wait = RW_HAS_WAITERS;
 293         } else {
 294                 lock_value = RW_WRITE_LOCK(curthread);
 295                 lock_busy = (uintptr_t)RW_LOCKED;
 296                 lock_wait = RW_HAS_WAITERS | RW_WRITE_WANTED;
 297         }
 298 
 299         for (;;) {
 300                 if (((old = lp->rw_wwwh) & lock_busy) == 0) {
 301                         if (casip(&lp->rw_wwwh, old, old + lock_value) != old) {
 302                                 if (rw_lock_delay != NULL) {
 303                                         backoff = rw_lock_backoff(backoff);
 304                                         rw_lock_delay(backoff);
 305                                         if (++loop_count == ncpus_online) {
 306                                                 backoff = 0;
 307                                                 loop_count = 0;
 308                                         }
 309                                 }
 310                                 continue;
 311                         }
 312                         break;
 313                 }
 314 
 315                 if (panicstr)
 316                         return;
 317 
 318                 if ((old & RW_DOUBLE_LOCK) == RW_DOUBLE_LOCK) {
 319                         rw_panic("rw_enter: bad rwlock", lp);
 320                         return;
 321                 }
 322 
 323                 if ((old & RW_OWNER) == (uintptr_t)curthread) {
 324                         rw_panic("recursive rw_enter", lp);
 325                         return;
 326                 }
 327 
 328                 ts = turnstile_lookup(lp);
 329 
 330                 do {
 331                         if (((old = lp->rw_wwwh) & lock_busy) == 0)
 332                                 break;
 333                         new = old | lock_wait;
 334                 } while (old != new && casip(&lp->rw_wwwh, old, new) != old);
 335 
 336                 if ((old & lock_busy) == 0) {
 337                         /*
 338                          * The lock appears free now; try the dance again
 339                          */
 340                         turnstile_exit(lp);
 341                         continue;
 342                 }
 343 
 344                 /*
 345                  * We really are going to block.  Bump the stats, and drop
 346                  * kpri if we're a reader.
 347                  */
 348                 ASSERT(lp->rw_wwwh & lock_wait);
 349                 ASSERT(lp->rw_wwwh & RW_LOCKED);
 350 
 351                 sleep_time = -gethrtime();
 352                 if (rw != RW_WRITER) {
 353                         THREAD_KPRI_RELEASE();
 354                         CPU_STATS_ADDQ(CPU, sys, rw_rdfails, 1);
 355                         (void) turnstile_block(ts, TS_READER_Q, lp,
 356                             &rw_sobj_ops, NULL, NULL);
 357                 } else {
 358                         CPU_STATS_ADDQ(CPU, sys, rw_wrfails, 1);
 359                         (void) turnstile_block(ts, TS_WRITER_Q, lp,
 360                             &rw_sobj_ops, NULL, NULL);
 361                 }
 362                 sleep_time += gethrtime();
 363 
 364                 LOCKSTAT_RECORD4(LS_RW_ENTER_BLOCK, lp, sleep_time, rw,
 365                     (old & RW_WRITE_LOCKED) ? 1 : 0,
 366                     old >> RW_HOLD_COUNT_SHIFT);
 367 
 368                 /*
 369                  * We wake up holding the lock (and having kpri if we're
 370                  * a reader) via direct handoff from the previous owner.
 371                  */
 372                 break;
 373         }
 374 
 375         ASSERT(rw_locked(lp, rw));
 376 
 377         membar_enter();
 378 
 379         LOCKSTAT_RECORD(LS_RW_ENTER_ACQUIRE, lp, rw);
 380 }
 381 
 382 /*
 383  * Return the number of readers to wake, or zero if we should wake a writer.
 384  * Called only by exiting/downgrading writers (readers don't wake readers).
 385  */
 386 static int
 387 rw_readers_to_wake(turnstile_t *ts)
 388 {
 389         kthread_t *next_writer = ts->ts_sleepq[TS_WRITER_Q].sq_first;
 390         kthread_t *next_reader = ts->ts_sleepq[TS_READER_Q].sq_first;
 391         pri_t wpri = (next_writer != NULL) ? DISP_PRIO(next_writer) : -1;
 392         int count = 0;
 393 
 394         while (next_reader != NULL) {
 395                 if (DISP_PRIO(next_reader) < wpri)
 396                         break;
 397                 next_reader->t_kpri_req++;
 398                 next_reader = next_reader->t_link;
 399                 count++;
 400         }
 401         return (count);
 402 }
 403 
 404 /*
 405  * Full-service implementation of rw_exit() to handle all the hard cases.
 406  * Called from the assembly version if anything complicated is going on.
 407  * There is no semantic difference between calling rw_exit() and calling
 408  * rw_exit_wakeup() directly.
 409  */
 410 void
 411 rw_exit_wakeup(rwlock_impl_t *lp)
 412 {
 413         turnstile_t *ts;
 414         uintptr_t old, new, lock_value;
 415         kthread_t *next_writer;
 416         int nreaders;
 417         uint_t  backoff = 0;
 418         int loop_count = 0;
 419 
 420         membar_exit();
 421 
 422         old = lp->rw_wwwh;
 423         if (old & RW_WRITE_LOCKED) {
 424                 if ((old & RW_OWNER) != (uintptr_t)curthread) {
 425                         rw_panic("rw_exit: not owner", lp);
 426                         lp->rw_wwwh = 0;
 427                         return;
 428                 }
 429                 lock_value = RW_WRITE_LOCK(curthread);
 430         } else {
 431                 if ((old & RW_LOCKED) == 0) {
 432                         rw_panic("rw_exit: lock not held", lp);
 433                         return;
 434                 }
 435                 lock_value = RW_READ_LOCK;
 436         }
 437 
 438         for (;;) {
 439                 /*
 440                  * If this is *not* the final exit of a lock with waiters,
 441                  * just drop the lock -- there's nothing tricky going on.
 442                  */
 443                 old = lp->rw_wwwh;
 444                 new = old - lock_value;
 445                 if ((new & (RW_LOCKED | RW_HAS_WAITERS)) != RW_HAS_WAITERS) {
 446                         if (casip(&lp->rw_wwwh, old, new) != old) {
 447                                 if (rw_lock_delay != NULL) {
 448                                         backoff = rw_lock_backoff(backoff);
 449                                         rw_lock_delay(backoff);
 450                                         if (++loop_count == ncpus_online) {
 451                                                 backoff = 0;
 452                                                 loop_count = 0;
 453                                         }
 454                                 }
 455                                 continue;
 456                         }
 457                         break;
 458                 }
 459 
 460                 /*
 461                  * This appears to be the final exit of a lock with waiters.
 462                  * If we do not have the lock as writer (that is, if this is
 463                  * the last exit of a reader with waiting writers), we will
 464                  * grab the lock as writer to prevent additional readers.
 465                  * (This is required because a reader that is acquiring the
 466                  * lock via RW_READER_STARVEWRITER will not observe the
 467                  * RW_WRITE_WANTED bit -- and we could therefore be racing
 468                  * with such readers here.)
 469                  */
 470                 if (!(old & RW_WRITE_LOCKED)) {
 471                         new = RW_WRITE_LOCK(curthread) |
 472                             RW_HAS_WAITERS | RW_WRITE_WANTED;
 473 
 474                         if (casip(&lp->rw_wwwh, old, new) != old)
 475                                 continue;
 476                 }
 477 
 478                 /*
 479                  * Perform the final exit of a lock that has waiters.
 480                  */
 481                 ts = turnstile_lookup(lp);
 482 
 483                 next_writer = ts->ts_sleepq[TS_WRITER_Q].sq_first;
 484 
 485                 if ((old & RW_WRITE_LOCKED) &&
 486                     (nreaders = rw_readers_to_wake(ts)) > 0) {
 487                         /*
 488                          * Don't drop the lock -- just set the hold count
 489                          * such that we grant the lock to all readers at once.
 490                          */
 491                         new = nreaders * RW_READ_LOCK;
 492                         if (ts->ts_waiters > nreaders)
 493                                 new |= RW_HAS_WAITERS;
 494                         if (next_writer)
 495                                 new |= RW_WRITE_WANTED;
 496                         lp->rw_wwwh = new;
 497                         membar_enter();
 498                         turnstile_wakeup(ts, TS_READER_Q, nreaders, NULL);
 499                 } else {
 500                         /*
 501                          * Don't drop the lock -- just transfer ownership
 502                          * directly to next_writer.  Note that there must
 503                          * be at least one waiting writer, because we get
 504                          * here only if (A) the lock is read-locked or
 505                          * (B) there are no waiting readers.  In case (A),
 506                          * since the lock is read-locked there would be no
 507                          * reason for other readers to have blocked unless
 508                          * the RW_WRITE_WANTED bit was set.  In case (B),
 509                          * since there are waiters but no waiting readers,
 510                          * they must all be waiting writers.
 511                          */
 512                         ASSERT(lp->rw_wwwh & RW_WRITE_WANTED);
 513                         new = RW_WRITE_LOCK(next_writer);
 514                         if (ts->ts_waiters > 1)
 515                                 new |= RW_HAS_WAITERS;
 516                         if (next_writer->t_link)
 517                                 new |= RW_WRITE_WANTED;
 518                         lp->rw_wwwh = new;
 519                         membar_enter();
 520                         turnstile_wakeup(ts, TS_WRITER_Q, 1, next_writer);
 521                 }
 522                 break;
 523         }
 524 
 525         if (lock_value == RW_READ_LOCK) {
 526                 THREAD_KPRI_RELEASE();
 527                 LOCKSTAT_RECORD(LS_RW_EXIT_RELEASE, lp, RW_READER);
 528         } else {
 529                 LOCKSTAT_RECORD(LS_RW_EXIT_RELEASE, lp, RW_WRITER);
 530         }
 531 }
 532 
 533 int
 534 rw_tryenter(krwlock_t *rwlp, krw_t rw)
 535 {
 536         rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;
 537         uintptr_t old;
 538 
 539         if (rw != RW_WRITER) {
 540                 uint_t backoff = 0;
 541                 int loop_count = 0;
 542                 THREAD_KPRI_REQUEST();
 543                 for (;;) {
 544                         if ((old = lp->rw_wwwh) & (rw == RW_READER ?
 545                             RW_WRITE_CLAIMED : RW_WRITE_LOCKED)) {
 546                                 THREAD_KPRI_RELEASE();
 547                                 return (0);
 548                         }
 549                         if (casip(&lp->rw_wwwh, old, old + RW_READ_LOCK) == old)
 550                                 break;
 551                         if (rw_lock_delay != NULL) {
 552                                 backoff = rw_lock_backoff(backoff);
 553                                 rw_lock_delay(backoff);
 554                                 if (++loop_count == ncpus_online) {
 555                                         backoff = 0;
 556                                         loop_count = 0;
 557                                 }
 558                         }
 559                 }
 560                 LOCKSTAT_RECORD(LS_RW_TRYENTER_ACQUIRE, lp, rw);
 561         } else {
 562                 if (casip(&lp->rw_wwwh, 0, RW_WRITE_LOCK(curthread)) != 0)
 563                         return (0);
 564                 LOCKSTAT_RECORD(LS_RW_TRYENTER_ACQUIRE, lp, rw);
 565         }
 566         ASSERT(rw_locked(lp, rw));
 567         membar_enter();
 568         return (1);
 569 }
 570 
 571 void
 572 rw_downgrade(krwlock_t *rwlp)
 573 {
 574         rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;
 575 
 576         THREAD_KPRI_REQUEST();
 577         membar_exit();
 578 
 579         if ((lp->rw_wwwh & RW_OWNER) != (uintptr_t)curthread) {
 580                 rw_panic("rw_downgrade: not owner", lp);
 581                 return;
 582         }
 583 
 584         if (atomic_add_ip_nv(&lp->rw_wwwh,
 585             RW_READ_LOCK - RW_WRITE_LOCK(curthread)) & RW_HAS_WAITERS) {
 586                 turnstile_t *ts = turnstile_lookup(lp);
 587                 int nreaders = rw_readers_to_wake(ts);
 588                 if (nreaders > 0) {
 589                         uintptr_t delta = nreaders * RW_READ_LOCK;
 590                         if (ts->ts_waiters == nreaders)
 591                                 delta -= RW_HAS_WAITERS;
 592                         atomic_add_ip(&lp->rw_wwwh, delta);
 593                 }
 594                 turnstile_wakeup(ts, TS_READER_Q, nreaders, NULL);
 595         }
 596         ASSERT(rw_locked(lp, RW_READER));
 597         LOCKSTAT_RECORD0(LS_RW_DOWNGRADE_DOWNGRADE, lp);
 598 }
 599 
 600 int
 601 rw_tryupgrade(krwlock_t *rwlp)
 602 {
 603         rwlock_impl_t *lp = (rwlock_impl_t *)rwlp;
 604         uintptr_t old, new;
 605 
 606         ASSERT(rw_locked(lp, RW_READER));
 607 
 608         do {
 609                 if (((old = lp->rw_wwwh) & ~RW_HAS_WAITERS) != RW_READ_LOCK)
 610                         return (0);
 611                 new = old + RW_WRITE_LOCK(curthread) - RW_READ_LOCK;
 612         } while (casip(&lp->rw_wwwh, old, new) != old);
 613 
 614         membar_enter();
 615         THREAD_KPRI_RELEASE();
 616         LOCKSTAT_RECORD0(LS_RW_TRYUPGRADE_UPGRADE, lp);
 617         ASSERT(rw_locked(lp, RW_WRITER));
 618         return (1);
 619 }
 620 
 621 int
 622 rw_read_held(krwlock_t *rwlp)
 623 {
 624         uintptr_t tmp;
 625 
 626         return (_RW_READ_HELD(rwlp, tmp));
 627 }
 628 
 629 int
 630 rw_write_held(krwlock_t *rwlp)
 631 {
 632         return (_RW_WRITE_HELD(rwlp));
 633 }
 634 
 635 int
 636 rw_lock_held(krwlock_t *rwlp)
 637 {
 638         return (_RW_LOCK_HELD(rwlp));
 639 }
 640 
 641 /*
 642  * Like rw_read_held(), but ASSERTs that the lock is currently held
 643  */
 644 int
 645 rw_read_locked(krwlock_t *rwlp)
 646 {
 647         uintptr_t old = ((rwlock_impl_t *)rwlp)->rw_wwwh;
 648 
 649         ASSERT(old & RW_LOCKED);
 650         return ((old & RW_LOCKED) && !(old & RW_WRITE_LOCKED));
 651 }
 652 
 653 /*
 654  * Returns non-zero if the lock is either held or desired by a writer
 655  */
 656 int
 657 rw_iswriter(krwlock_t *rwlp)
 658 {
 659         return (_RW_ISWRITER(rwlp));
 660 }
 661 
 662 kthread_t *
 663 rw_owner(krwlock_t *rwlp)
 664 {
 665         uintptr_t old = ((rwlock_impl_t *)rwlp)->rw_wwwh;
 666 
 667         return ((old & RW_WRITE_LOCKED) ? (kthread_t *)(old & RW_OWNER) : NULL);
 668 }