1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  *
  25  * Portions Copyright 2008 Denis Cheng
  26  */
  27 
  28 #include "config.h"
  29 
  30 #include <sys/types.h>
  31 #ifdef HAVE_SYS_ASYNCH_H
  32 #include <sys/asynch.h>
  33 #endif
  34 #include <stddef.h>
  35 #include <sys/ipc.h>
  36 #include <sys/sem.h>
  37 #include <sys/errno.h>
  38 #include <sys/time.h>
  39 #include <inttypes.h>
  40 #include <fcntl.h>
  41 #include <math.h>
  42 #include <dirent.h>
  43 
  44 #ifdef HAVE_UTILITY_H
  45 #include <utility.h>
  46 #endif /* HAVE_UTILITY_H */
  47 
  48 #ifdef HAVE_SYS_ASYNC_H
  49 #include <sys/asynch.h>
  50 #endif /* HAVE_SYS_ASYNC_H */
  51 
  52 #ifndef HAVE_SYSV_SEM
  53 #include <semaphore.h>
  54 #endif /* HAVE_SYSV_SEM */
  55 
  56 #include "filebench.h"
  57 #include "flowop.h"
  58 #include "fileset.h"
  59 #include "fb_random.h"
  60 #include "utils.h"
  61 #include "fsplug.h"
  62 
  63 /*
  64  * These routines implement the flowops from the f language. Each
  65  * flowop has has a name such as "read", and a set of function pointers
  66  * to call for initialization, execution and destruction of the flowop.
  67  * The table flowoplib_funcs[] contains a flowoplib struct for each
  68  * implemented flowop. Most flowops use a generic initialization function
  69  * and all currently use a generic destruction function. All flowop
  70  * functions referenced from the table are in this file, though, of
  71  * course, they often call functions from other files.
  72  *
  73  * The flowop_init() routine uses the flowoplib_funcs[] table to
  74  * create an initial set of "instance 0" flowops, one for each type of
  75  * flowop, from which all other flowops are derived. These "instance 0"
  76  * flowops are initialized with information from the table including
  77  * pointers for their fo_init, fo_func and fo_destroy functions. When
  78  * a flowop definition is encountered in an f language script, the
  79  * "type" of flowop, such as "read" is used to search for the
  80  * "instance 0" flowop named "read", then a new flowop is allocated
  81  * which inherits its function pointers and other initial properties
  82  * from the instance 0 flowop, and is given a new name as specified
  83  * by the "name=" attribute.
  84  */
  85 
  86 static void flowoplib_destruct_noop(flowop_t *flowop);
  87 static int flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop);
  88 static int flowoplib_print(threadflow_t *threadflow, flowop_t *flowop);
  89 static int flowoplib_write(threadflow_t *threadflow, flowop_t *flowop);
  90 static int flowoplib_read(threadflow_t *threadflow, flowop_t *flowop);
  91 static int flowoplib_block_init(flowop_t *flowop);
  92 static int flowoplib_block(threadflow_t *threadflow, flowop_t *flowop);
  93 static int flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop);
  94 static int flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop);
  95 static int flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop);
  96 static int flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop);
  97 static int flowoplib_sempost_init(flowop_t *flowop);
  98 static int flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop);
  99 static int flowoplib_semblock_init(flowop_t *flowop);
 100 static void flowoplib_semblock_destruct(flowop_t *flowop);
 101 static int flowoplib_eventlimit(threadflow_t *, flowop_t *flowop);
 102 static int flowoplib_bwlimit(threadflow_t *, flowop_t *flowop);
 103 static int flowoplib_iopslimit(threadflow_t *, flowop_t *flowop);
 104 static int flowoplib_opslimit(threadflow_t *, flowop_t *flowop);
 105 static int flowoplib_openfile(threadflow_t *, flowop_t *flowop);
 106 static int flowoplib_openfile_common(threadflow_t *, flowop_t *flowop, int fd);
 107 static int flowoplib_createfile(threadflow_t *, flowop_t *flowop);
 108 static int flowoplib_closefile(threadflow_t *, flowop_t *flowop);
 109 static int flowoplib_makedir(threadflow_t *, flowop_t *flowop);
 110 static int flowoplib_removedir(threadflow_t *, flowop_t *flowop);
 111 static int flowoplib_listdir(threadflow_t *, flowop_t *flowop);
 112 static int flowoplib_fsync(threadflow_t *, flowop_t *flowop);
 113 static int flowoplib_readwholefile(threadflow_t *, flowop_t *flowop);
 114 static int flowoplib_writewholefile(threadflow_t *, flowop_t *flowop);
 115 static int flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop);
 116 static int flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop);
 117 static int flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop);
 118 static int flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop);
 119 static int flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop);
 120 static int flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop);
 121 static int flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop);
 122 static int flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop);
 123 static int flowoplib_testrandvar_init(flowop_t *flowop);
 124 static void flowoplib_testrandvar_destruct(flowop_t *flowop);
 125 
 126 static flowop_proto_t flowoplib_funcs[] = {
 127         FLOW_TYPE_IO, FLOW_ATTR_WRITE, "write", flowop_init_generic,
 128         flowoplib_write, flowop_destruct_generic,
 129         FLOW_TYPE_IO, FLOW_ATTR_READ, "read", flowop_init_generic,
 130         flowoplib_read, flowop_destruct_generic,
 131         FLOW_TYPE_SYNC, 0, "block", flowoplib_block_init,
 132         flowoplib_block, flowop_destruct_generic,
 133         FLOW_TYPE_SYNC, 0, "wakeup", flowop_init_generic,
 134         flowoplib_wakeup, flowop_destruct_generic,
 135         FLOW_TYPE_SYNC, 0, "semblock", flowoplib_semblock_init,
 136         flowoplib_semblock, flowoplib_semblock_destruct,
 137         FLOW_TYPE_SYNC, 0, "sempost", flowoplib_sempost_init,
 138         flowoplib_sempost, flowoplib_destruct_noop,
 139         FLOW_TYPE_OTHER, 0, "hog", flowop_init_generic,
 140         flowoplib_hog, flowop_destruct_generic,
 141         FLOW_TYPE_OTHER, 0, "delay", flowop_init_generic,
 142         flowoplib_delay, flowop_destruct_generic,
 143         FLOW_TYPE_OTHER, 0, "eventlimit", flowop_init_generic,
 144         flowoplib_eventlimit, flowop_destruct_generic,
 145         FLOW_TYPE_OTHER, 0, "bwlimit", flowop_init_generic,
 146         flowoplib_bwlimit, flowop_destruct_generic,
 147         FLOW_TYPE_OTHER, 0, "iopslimit", flowop_init_generic,
 148         flowoplib_iopslimit, flowop_destruct_generic,
 149         FLOW_TYPE_OTHER, 0, "opslimit", flowop_init_generic,
 150         flowoplib_opslimit, flowop_destruct_generic,
 151         FLOW_TYPE_OTHER, 0, "finishoncount", flowop_init_generic,
 152         flowoplib_finishoncount, flowop_destruct_generic,
 153         FLOW_TYPE_OTHER, 0, "finishonbytes", flowop_init_generic,
 154         flowoplib_finishonbytes, flowop_destruct_generic,
 155         FLOW_TYPE_IO, 0, "openfile", flowop_init_generic,
 156         flowoplib_openfile, flowop_destruct_generic,
 157         FLOW_TYPE_IO, 0, "createfile", flowop_init_generic,
 158         flowoplib_createfile, flowop_destruct_generic,
 159         FLOW_TYPE_IO, 0, "closefile", flowop_init_generic,
 160         flowoplib_closefile, flowop_destruct_generic,
 161         FLOW_TYPE_IO, 0, "makedir", flowop_init_generic,
 162         flowoplib_makedir, flowop_destruct_generic,
 163         FLOW_TYPE_IO, 0, "removedir", flowop_init_generic,
 164         flowoplib_removedir, flowop_destruct_generic,
 165         FLOW_TYPE_IO, 0, "listdir", flowop_init_generic,
 166         flowoplib_listdir, flowop_destruct_generic,
 167         FLOW_TYPE_IO, 0, "fsync", flowop_init_generic,
 168         flowoplib_fsync, flowop_destruct_generic,
 169         FLOW_TYPE_IO, 0, "fsyncset", flowop_init_generic,
 170         flowoplib_fsyncset, flowop_destruct_generic,
 171         FLOW_TYPE_IO, 0, "statfile", flowop_init_generic,
 172         flowoplib_statfile, flowop_destruct_generic,
 173         FLOW_TYPE_IO, FLOW_ATTR_READ, "readwholefile", flowop_init_generic,
 174         flowoplib_readwholefile, flowop_destruct_generic,
 175         FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfile", flowop_init_generic,
 176         flowoplib_appendfile, flowop_destruct_generic,
 177         FLOW_TYPE_IO, FLOW_ATTR_WRITE, "appendfilerand", flowop_init_generic,
 178         flowoplib_appendfilerand, flowop_destruct_generic,
 179         FLOW_TYPE_IO, 0, "deletefile", flowop_init_generic,
 180         flowoplib_deletefile, flowop_destruct_generic,
 181         FLOW_TYPE_IO, FLOW_ATTR_WRITE, "writewholefile", flowop_init_generic,
 182         flowoplib_writewholefile, flowop_destruct_generic,
 183         FLOW_TYPE_OTHER, 0, "print", flowop_init_generic,
 184         flowoplib_print, flowop_destruct_generic,
 185         /* routine to calculate mean and stddev for output from a randvar */
 186         FLOW_TYPE_OTHER, 0, "testrandvar", flowoplib_testrandvar_init,
 187         flowoplib_testrandvar, flowoplib_testrandvar_destruct
 188 };
 189 
 190 /*
 191  * Loops through the list of flowops defined in this
 192  * module, and creates and initializes a flowop for each one
 193  * by calling flowop_flow_init. As a side effect of calling
 194  * flowop_flow_init, the created flowops are placed on the
 195  * master flowop list. All created flowops are set to
 196  * instance "0".
 197  */
 198 void
 199 flowoplib_flowinit()
 200 {
 201         int nops = sizeof (flowoplib_funcs) / sizeof (flowop_proto_t);
 202 
 203         flowop_flow_init(flowoplib_funcs, nops);
 204 }
 205 
 206 /*
 207  * Special total noop destruct
 208  */
 209 /* ARGSUSED */
 210 static void
 211 flowoplib_destruct_noop(flowop_t *flowop)
 212 {
 213 }
 214 
 215 /*
 216  * Generates a file attribute from flags in the supplied flowop.
 217  * Sets FLOW_ATTR_DIRECTIO and/or FLOW_ATTR_DSYNC as needed.
 218  */
 219 static int
 220 flowoplib_fileattrs(flowop_t *flowop)
 221 {
 222         int attrs = 0;
 223 
 224         if (avd_get_bool(flowop->fo_directio))
 225                 attrs |= FLOW_ATTR_DIRECTIO;
 226 
 227         if (avd_get_bool(flowop->fo_dsync))
 228                 attrs |= FLOW_ATTR_DSYNC;
 229 
 230         return (attrs);
 231 }
 232 
 233 /*
 234  * Obtain a filesetentry for a file. Result placed where filep points.
 235  * Supply with a flowop and a flag to indicate whether an existent or
 236  * non-existent file is required. Returns FILEBENCH_NORSC if all out
 237  * of the appropriate type of directories, FILEBENCH_ERROR if the
 238  * flowop does not point to a fileset, and FILEBENCH_OK otherwise.
 239  */
 240 static int
 241 flowoplib_pickfile(filesetentry_t **filep, flowop_t *flowop, int flags, int tid)
 242 {
 243         fileset_t       *fileset;
 244         int             fileindex;
 245 
 246         if ((fileset = flowop->fo_fileset) == NULL) {
 247                 filebench_log(LOG_ERROR, "flowop NO fileset");
 248                 return (FILEBENCH_ERROR);
 249         }
 250 
 251         if (flowop->fo_fileindex) {
 252                 fileindex = (int)(avd_get_dbl(flowop->fo_fileindex) *
 253                     ((double)(fileset->fs_constentries / 2)));
 254                 fileindex = fileindex % fileset->fs_constentries;
 255                 flags |= FILESET_PICKBYINDEX;
 256         } else {
 257                 fileindex = 0;
 258         }
 259 
 260         if ((*filep = fileset_pick(fileset, FILESET_PICKFILE | flags,
 261             tid, fileindex)) == NULL) {
 262                 filebench_log(LOG_DEBUG_SCRIPT,
 263                     "flowop %s failed to pick file from fileset %s",
 264                     flowop->fo_name,
 265                     avd_get_str(fileset->fs_name));
 266                 return (FILEBENCH_NORSC);
 267         }
 268 
 269         return (FILEBENCH_OK);
 270 }
 271 
 272 /*
 273  * Obtain a filesetentry for a leaf directory. Result placed where dirp
 274  * points. Supply with flowop and a flag to indicate whether an existent
 275  * or non-existent leaf directory is required. Returns FILEBENCH_NORSC
 276  * if all out of the appropriate type of directories, FILEBENCH_ERROR
 277  * if the flowop does not point to a fileset, and FILEBENCH_OK otherwise.
 278  */
 279 static int
 280 flowoplib_pickleafdir(filesetentry_t **dirp, flowop_t *flowop, int flags)
 281 {
 282         fileset_t       *fileset;
 283         int             dirindex;
 284 
 285         if ((fileset = flowop->fo_fileset) == NULL) {
 286                 filebench_log(LOG_ERROR, "flowop NO fileset");
 287                 return (FILEBENCH_ERROR);
 288         }
 289 
 290         if (flowop->fo_fileindex) {
 291                 dirindex = (int)(avd_get_dbl(flowop->fo_fileindex) *
 292                     ((double)(fileset->fs_constleafdirs / 2)));
 293                 dirindex = dirindex % fileset->fs_constleafdirs;
 294                 flags |= FILESET_PICKBYINDEX;
 295         } else {
 296                 dirindex = 0;
 297         }
 298 
 299         if ((*dirp = fileset_pick(fileset,
 300             FILESET_PICKLEAFDIR | flags, 0, dirindex)) == NULL) {
 301                 filebench_log(LOG_DEBUG_SCRIPT,
 302                     "flowop %s failed to pick directory from fileset %s",
 303                     flowop->fo_name,
 304                     avd_get_str(fileset->fs_name));
 305                 return (FILEBENCH_NORSC);
 306         }
 307 
 308         return (FILEBENCH_OK);
 309 }
 310 
 311 /*
 312  * Searches for a file descriptor. Tries the flowop's
 313  * fo_fdnumber first and returns with it if it has been
 314  * explicitly set (greater than 0). It next checks to
 315  * see if a rotating file descriptor policy is in effect,
 316  * and if not returns the fdnumber regardless of what
 317  * it is. (note that if it is 0, it just selects to the
 318  * default file descriptor in the threadflow's tf_fd
 319  * array). If the rotating fd policy is in effect, it
 320  * cycles from the end of the tf_fd array to one location
 321  * beyond the maximum needed by the number of entries in
 322  * the associated fileset on each invocation, then starts
 323  * over from the end.
 324  *
 325  * The routine returns an index into the threadflow's
 326  * tf_fd table where the actual file descriptor will be
 327  * found. Note: the calling routine must not call this
 328  * routine if the flowop does not have a fileset, and the
 329  * flowop's fo_fdnumber is zero and fo_rotatefd is
 330  * asserted, or an addressing fault may occur.
 331  */
 332 static int
 333 flowoplib_fdnum(threadflow_t *threadflow, flowop_t *flowop)
 334 {
 335         fbint_t entries;
 336         int fdnumber = flowop->fo_fdnumber;
 337 
 338         /* If the script sets the fd explicitly */
 339         if (fdnumber > 0)
 340                 return (fdnumber);
 341 
 342         /* If the flowop defaults to persistent fd */
 343         if (!avd_get_bool(flowop->fo_rotatefd))
 344                 return (fdnumber);
 345 
 346         if (flowop->fo_fileset == NULL) {
 347                 filebench_log(LOG_ERROR, "flowop NULL file");
 348                 return (FILEBENCH_ERROR);
 349         }
 350 
 351         entries = flowop->fo_fileset->fs_constentries;
 352 
 353         /* Rotate the fd on each flowop invocation */
 354         if (entries > (THREADFLOW_MAXFD / 2)) {
 355                 filebench_log(LOG_ERROR, "Out of file descriptors in flowop %s"
 356                     " (too many files : %llu",
 357                     flowop->fo_name, (u_longlong_t)entries);
 358                 return (FILEBENCH_ERROR);
 359         }
 360 
 361         /* First time around */
 362         if (threadflow->tf_fdrotor == 0)
 363                 threadflow->tf_fdrotor = THREADFLOW_MAXFD;
 364 
 365         /* One fd for every file in the set */
 366         if (entries == (THREADFLOW_MAXFD - threadflow->tf_fdrotor))
 367                 threadflow->tf_fdrotor = THREADFLOW_MAXFD;
 368 
 369 
 370         threadflow->tf_fdrotor--;
 371         filebench_log(LOG_DEBUG_IMPL, "selected fd = %d",
 372             threadflow->tf_fdrotor);
 373         return (threadflow->tf_fdrotor);
 374 }
 375 
 376 /*
 377  * Determines the file descriptor to use, and attempts to open
 378  * the file if it is not already open. Also determines the wss
 379  * value. Returns FILEBENCH_ERROR on errors, FILESET_NORSC if
 380  * if flowop_openfile_common couldn't obtain an appropriate file
 381  * from a the fileset, and FILEBENCH_OK otherwise.
 382  */
 383 static int
 384 flowoplib_filesetup(threadflow_t *threadflow, flowop_t *flowop,
 385     fbint_t *wssp, fb_fdesc_t **fdescp)
 386 {
 387         int fd = flowoplib_fdnum(threadflow, flowop);
 388 
 389         if (fd == -1)
 390                 return (FILEBENCH_ERROR);
 391 
 392         /* check for conflicting fdnumber and file name */
 393         if ((fd > 0) && (threadflow->tf_fse[fd] != NULL)) {
 394                 char *fd_based_name;
 395 
 396                 fd_based_name =
 397                     avd_get_str(threadflow->tf_fse[fd]->fse_fileset->fs_name);
 398 
 399                 if (flowop->fo_filename != NULL) {
 400                         char *fo_based_name;
 401 
 402                         fo_based_name = avd_get_str(flowop->fo_filename);
 403                         if (strcmp(fd_based_name, fo_based_name) != 0) {
 404                                 filebench_log(LOG_ERROR, "Name of fd refer"
 405                                     "enced fileset name (%s) CONFLICTS with"
 406                                     " flowop supplied fileset name (%s)",
 407                                     fd_based_name, fo_based_name);
 408                                 filebench_shutdown(1);
 409                                 return (FILEBENCH_ERROR);
 410                         }
 411                 }
 412         }
 413 
 414         if (threadflow->tf_fd[fd].fd_ptr == NULL) {
 415                 int ret;
 416 
 417                 if ((ret = flowoplib_openfile_common(
 418                     threadflow, flowop, fd)) != FILEBENCH_OK)
 419                         return (ret);
 420 
 421                 if (threadflow->tf_fse[fd]) {
 422                         filebench_log(LOG_DEBUG_IMPL, "opened file %s",
 423                             threadflow->tf_fse[fd]->fse_path);
 424                 } else {
 425                         filebench_log(LOG_DEBUG_IMPL,
 426                             "opened device %s/%s",
 427                             avd_get_str(flowop->fo_fileset->fs_path),
 428                             avd_get_str(flowop->fo_fileset->fs_name));
 429                 }
 430         }
 431 
 432         *fdescp = &(threadflow->tf_fd[fd]);
 433 
 434         if ((*wssp = flowop->fo_constwss) == 0) {
 435                 if (threadflow->tf_fse[fd])
 436                         *wssp = threadflow->tf_fse[fd]->fse_size;
 437                 else
 438                         *wssp = avd_get_int(flowop->fo_fileset->fs_size);
 439         }
 440 
 441         return (FILEBENCH_OK);
 442 }
 443 
 444 /*
 445  * Determines the io buffer or random offset into tf_mem for
 446  * the IO operation. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
 447  */
 448 static int
 449 flowoplib_iobufsetup(threadflow_t *threadflow, flowop_t *flowop,
 450     caddr_t *iobufp, fbint_t iosize)
 451 {
 452         long memsize;
 453         size_t memoffset;
 454 
 455         if (iosize == 0) {
 456                 filebench_log(LOG_ERROR, "zero iosize for thread %s",
 457                     flowop->fo_name);
 458                 return (FILEBENCH_ERROR);
 459         }
 460 
 461         if ((memsize = threadflow->tf_constmemsize) != 0) {
 462 
 463                 /* use tf_mem for I/O with random offset */
 464                 if (filebench_randomno(&memoffset,
 465                     memsize, iosize, NULL) == -1) {
 466                         filebench_log(LOG_ERROR,
 467                             "tf_memsize smaller than IO size for thread %s",
 468                             flowop->fo_name);
 469                         return (FILEBENCH_ERROR);
 470                 }
 471                 *iobufp = threadflow->tf_mem + memoffset;
 472 
 473         } else {
 474                 /* use private I/O buffer */
 475                 if ((flowop->fo_buf != NULL) &&
 476                     (flowop->fo_buf_size < iosize)) {
 477                         /* too small, so free up and re-allocate */
 478                         free(flowop->fo_buf);
 479                         flowop->fo_buf = NULL;
 480                 }
 481 
 482                 /*
 483                  * Allocate memory for the  buffer. The memory is freed
 484                  * by flowop_destruct_generic() or by this routine if more
 485                  * memory is needed for the buffer.
 486                  */
 487                 if ((flowop->fo_buf == NULL) && ((flowop->fo_buf
 488                     = (char *)malloc(iosize)) == NULL))
 489                         return (FILEBENCH_ERROR);
 490 
 491                 flowop->fo_buf_size = iosize;
 492                 *iobufp = flowop->fo_buf;
 493         }
 494         return (FILEBENCH_OK);
 495 }
 496 
 497 /*
 498  * Determines the file descriptor to use, opens it if necessary, the
 499  * io buffer or random offset into tf_mem for IO operation and the wss
 500  * value. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
 501  */
 502 int
 503 flowoplib_iosetup(threadflow_t *threadflow, flowop_t *flowop,
 504     fbint_t *wssp, caddr_t *iobufp, fb_fdesc_t **filedescp, fbint_t iosize)
 505 {
 506         int ret;
 507 
 508         if ((ret = flowoplib_filesetup(threadflow, flowop, wssp, filedescp)) !=
 509             FILEBENCH_OK)
 510                 return (ret);
 511 
 512         if ((ret = flowoplib_iobufsetup(threadflow, flowop, iobufp, iosize)) !=
 513             FILEBENCH_OK)
 514                 return (ret);
 515 
 516         return (FILEBENCH_OK);
 517 }
 518 
 519 /*
 520  * Emulate posix read / pread. If the flowop has a fileset,
 521  * a file descriptor number index is fetched, otherwise a
 522  * supplied fileobj file is used. In either case the specified
 523  * file will be opened if not already open. If the flowop has
 524  * neither a fileset or fileobj, an error is logged and FILEBENCH_ERROR
 525  * returned.
 526  *
 527  * The actual read is done to a random offset in the
 528  * threadflow's thread memory (tf_mem), with a size set by
 529  * fo_iosize and at either a random disk offset within the
 530  * working set size, or at the next sequential location. If
 531  * any errors are encountered, FILEBENCH_ERROR is returned,
 532  * if no appropriate file can be obtained from the fileset then
 533  * FILEBENCH_NORSC is returned, otherise FILEBENCH_OK is returned.
 534  */
 535 static int
 536 flowoplib_read(threadflow_t *threadflow, flowop_t *flowop)
 537 {
 538         caddr_t iobuf;
 539         fbint_t wss;
 540         fbint_t iosize;
 541         fb_fdesc_t *fdesc;
 542         int ret;
 543 
 544 
 545         iosize = avd_get_int(flowop->fo_iosize);
 546         if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
 547             &fdesc, iosize)) != FILEBENCH_OK)
 548                 return (ret);
 549 
 550         if (avd_get_bool(flowop->fo_random)) {
 551                 uint64_t fileoffset;
 552 
 553                 if (filebench_randomno64(&fileoffset,
 554                     wss, iosize, NULL) == -1) {
 555                         filebench_log(LOG_ERROR,
 556                             "file size smaller than IO size for thread %s",
 557                             flowop->fo_name);
 558                         return (FILEBENCH_ERROR);
 559                 }
 560 
 561                 (void) flowop_beginop(threadflow, flowop);
 562                 if ((ret = FB_PREAD(fdesc, iobuf,
 563                     iosize, (off64_t)fileoffset)) == -1) {
 564                         (void) flowop_endop(threadflow, flowop, 0);
 565                         filebench_log(LOG_ERROR,
 566                             "read file %s failed, offset %llu "
 567                             "io buffer %zd: %s",
 568                             avd_get_str(flowop->fo_fileset->fs_name),
 569                             (u_longlong_t)fileoffset, iobuf, strerror(errno));
 570                         flowop_endop(threadflow, flowop, 0);
 571                         return (FILEBENCH_ERROR);
 572                 }
 573                 (void) flowop_endop(threadflow, flowop, ret);
 574 
 575                 if ((ret == 0))
 576                         (void) FB_LSEEK(fdesc, 0, SEEK_SET);
 577 
 578         } else {
 579                 (void) flowop_beginop(threadflow, flowop);
 580                 if ((ret = FB_READ(fdesc, iobuf, iosize)) == -1) {
 581                         (void) flowop_endop(threadflow, flowop, 0);
 582                         filebench_log(LOG_ERROR,
 583                             "read file %s failed, io buffer %zd: %s",
 584                             avd_get_str(flowop->fo_fileset->fs_name),
 585                             iobuf, strerror(errno));
 586                         (void) flowop_endop(threadflow, flowop, 0);
 587                         return (FILEBENCH_ERROR);
 588                 }
 589                 (void) flowop_endop(threadflow, flowop, ret);
 590 
 591                 if ((ret == 0))
 592                         (void) FB_LSEEK(fdesc, 0, SEEK_SET);
 593         }
 594 
 595         return (FILEBENCH_OK);
 596 }
 597 
 598 /*
 599  * Initializes a "flowop_block" flowop. Specifically, it
 600  * initializes the flowop's fo_cv and unlocks the fo_lock.
 601  */
 602 static int
 603 flowoplib_block_init(flowop_t *flowop)
 604 {
 605         filebench_log(LOG_DEBUG_IMPL, "flow %s-%d block init address %zx",
 606             flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
 607         (void) pthread_cond_init(&flowop->fo_cv, ipc_condattr());
 608         (void) ipc_mutex_unlock(&flowop->fo_lock);
 609 
 610         return (FILEBENCH_OK);
 611 }
 612 
 613 /*
 614  * Blocks the threadflow until woken up by flowoplib_wakeup.
 615  * The routine blocks on the flowop's fo_cv condition variable.
 616  */
 617 static int
 618 flowoplib_block(threadflow_t *threadflow, flowop_t *flowop)
 619 {
 620         filebench_log(LOG_DEBUG_IMPL, "flow %s-%d blocking at address %zx",
 621             flowop->fo_name, flowop->fo_instance, &flowop->fo_cv);
 622         (void) ipc_mutex_lock(&flowop->fo_lock);
 623 
 624         flowop_beginop(threadflow, flowop);
 625         (void) pthread_cond_wait(&flowop->fo_cv, &flowop->fo_lock);
 626         flowop_endop(threadflow, flowop, 0);
 627 
 628         filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
 629             flowop->fo_name, flowop->fo_instance);
 630 
 631         (void) ipc_mutex_unlock(&flowop->fo_lock);
 632 
 633         return (FILEBENCH_OK);
 634 }
 635 
 636 /*
 637  * Wakes up one or more target blocking flowops.
 638  * Sends broadcasts on the fo_cv condition variables of all
 639  * flowops on the target list, except those that are
 640  * FLOW_MASTER flowops. The target list consists of all
 641  * flowops whose name matches this flowop's "fo_targetname"
 642  * attribute. The target list is generated on the first
 643  * invocation, and the run will be shutdown if no targets
 644  * are found. Otherwise the routine always returns FILEBENCH_OK.
 645  */
 646 static int
 647 flowoplib_wakeup(threadflow_t *threadflow, flowop_t *flowop)
 648 {
 649         flowop_t *target;
 650 
 651         /* if this is the first wakeup, create the wakeup list */
 652         if (flowop->fo_targets == NULL) {
 653                 flowop_t *result = flowop_find(flowop->fo_targetname);
 654 
 655                 flowop->fo_targets = result;
 656                 if (result == NULL) {
 657                         filebench_log(LOG_ERROR,
 658                             "wakeup: could not find op %s for thread %s",
 659                             flowop->fo_targetname,
 660                             threadflow->tf_name);
 661                         filebench_shutdown(1);
 662                 }
 663                 while (result) {
 664                         result->fo_targetnext =
 665                             result->fo_resultnext;
 666                         result = result->fo_resultnext;
 667                 }
 668         }
 669 
 670         target = flowop->fo_targets;
 671 
 672         /* wakeup the targets */
 673         while (target) {
 674                 if (target->fo_instance == FLOW_MASTER) {
 675                         target = target->fo_targetnext;
 676                         continue;
 677                 }
 678                 filebench_log(LOG_DEBUG_IMPL,
 679                     "wakeup flow %s-%d at address %zx",
 680                     target->fo_name,
 681                     target->fo_instance,
 682                     &target->fo_cv);
 683 
 684                 flowop_beginop(threadflow, flowop);
 685                 (void) ipc_mutex_lock(&target->fo_lock);
 686                 (void) pthread_cond_broadcast(&target->fo_cv);
 687                 (void) ipc_mutex_unlock(&target->fo_lock);
 688                 flowop_endop(threadflow, flowop, 0);
 689 
 690                 target = target->fo_targetnext;
 691         }
 692 
 693         return (FILEBENCH_OK);
 694 }
 695 
 696 /*
 697  * "think time" routines. the "hog" routine consumes cpu cycles as
 698  * it "thinks", while the "delay" flowop simply calls sleep() to delay
 699  * for a given number of seconds without consuming cpu cycles.
 700  */
 701 
 702 
 703 /*
 704  * Consumes CPU cycles and memory bandwidth by looping for
 705  * flowop->fo_value times. With each loop sets memory location
 706  * threadflow->tf_mem to 1.
 707  */
 708 static int
 709 flowoplib_hog(threadflow_t *threadflow, flowop_t *flowop)
 710 {
 711         uint64_t value = avd_get_int(flowop->fo_value);
 712         int i;
 713 
 714         filebench_log(LOG_DEBUG_IMPL, "hog enter");
 715         flowop_beginop(threadflow, flowop);
 716         if (threadflow->tf_mem != NULL) {
 717                 for (i = 0; i < value; i++)
 718                         *(threadflow->tf_mem) = 1;
 719         }
 720         flowop_endop(threadflow, flowop, 0);
 721         filebench_log(LOG_DEBUG_IMPL, "hog exit");
 722         return (FILEBENCH_OK);
 723 }
 724 
 725 
 726 /*
 727  * Delays for fo_value seconds.
 728  */
 729 static int
 730 flowoplib_delay(threadflow_t *threadflow, flowop_t *flowop)
 731 {
 732         int value = avd_get_int(flowop->fo_value);
 733 
 734         flowop_beginop(threadflow, flowop);
 735         (void) sleep(value);
 736         flowop_endop(threadflow, flowop, 0);
 737         return (FILEBENCH_OK);
 738 }
 739 
 740 /*
 741  * Rate limiting routines. This is the event consuming half of the
 742  * event system. Each of the four following routines will limit the rate
 743  * to one unit of either calls, issued I/O operations, issued filebench
 744  * operations, or I/O bandwidth. Since there is only one event generator,
 745  * the events will be divided amoung multiple instances of an event
 746  * consumer, and further divided among different consumers if more than
 747  * one has been defined. There is no mechanism to enforce equal sharing
 748  * of events.
 749  */
 750 
 751 /*
 752  * Completes one invocation per posted event. If eventgen_q
 753  * has an event count greater than zero, one will be removed
 754  * (count decremented), otherwise the calling thread will
 755  * block until another event has been posted. Always returns 0
 756  */
 757 static int
 758 flowoplib_eventlimit(threadflow_t *threadflow, flowop_t *flowop)
 759 {
 760         /* Immediately bail if not set/enabled */
 761         if (!filebench_shm->shm_eventgen_enabled)
 762                 return (FILEBENCH_OK);
 763 
 764         if (flowop->fo_initted == 0) {
 765                 filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
 766                     flowop, threadflow->tf_name, threadflow->tf_instance);
 767                 flowop->fo_initted = 1;
 768         }
 769 
 770         flowop_beginop(threadflow, flowop);
 771         while (filebench_shm->shm_eventgen_enabled) {
 772                 (void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
 773                 if (filebench_shm->shm_eventgen_q > 0) {
 774                         filebench_shm->shm_eventgen_q--;
 775                         (void) ipc_mutex_unlock(
 776                             &filebench_shm->shm_eventgen_lock);
 777                         break;
 778                 }
 779                 (void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
 780                     &filebench_shm->shm_eventgen_lock);
 781                 (void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
 782         }
 783         flowop_endop(threadflow, flowop, 0);
 784         return (FILEBENCH_OK);
 785 }
 786 
 787 static int
 788 flowoplib_event_find_target(threadflow_t *threadflow, flowop_t *flowop)
 789 {
 790         if (flowop->fo_targetname[0] != '\0') {
 791 
 792                 /* Try to use statistics from specific flowop */
 793                 flowop->fo_targets =
 794                     flowop_find_from_list(flowop->fo_targetname,
 795                     threadflow->tf_thrd_fops);
 796                 if (flowop->fo_targets == NULL) {
 797                         filebench_log(LOG_ERROR,
 798                             "limit target: could not find flowop %s",
 799                             flowop->fo_targetname);
 800                         filebench_shutdown(1);
 801                         return (FILEBENCH_ERROR);
 802                 }
 803         } else {
 804                 /* use total workload statistics */
 805                 flowop->fo_targets = NULL;
 806         }
 807         return (FILEBENCH_OK);
 808 }
 809 
 810 /*
 811  * Blocks the calling thread if the number of issued I/O
 812  * operations exceeds the number of posted events, thus
 813  * limiting the average I/O operation rate to the rate
 814  * specified by eventgen_hz. Always returns FILEBENCH_OK.
 815  */
 816 static int
 817 flowoplib_iopslimit(threadflow_t *threadflow, flowop_t *flowop)
 818 {
 819         uint64_t iops;
 820         uint64_t delta;
 821         uint64_t events;
 822 
 823         /* Immediately bail if not set/enabled */
 824         if (!filebench_shm->shm_eventgen_enabled)
 825                 return (FILEBENCH_OK);
 826 
 827         if (flowop->fo_initted == 0) {
 828                 filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
 829                     flowop, threadflow->tf_name, threadflow->tf_instance);
 830                 flowop->fo_initted = 1;
 831 
 832                 if (flowoplib_event_find_target(threadflow, flowop)
 833                     == FILEBENCH_ERROR)
 834                         return (FILEBENCH_ERROR);
 835 
 836                 if (flowop->fo_targets && ((flowop->fo_targets->fo_attrs &
 837                     (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
 838                         filebench_log(LOG_ERROR,
 839                             "WARNING: Flowop %s does no IO",
 840                             flowop->fo_targets->fo_name);
 841                         filebench_shutdown(1);
 842                         return (FILEBENCH_ERROR);
 843                 }
 844         }
 845 
 846         if (flowop->fo_targets) {
 847                 /*
 848                  * Note that fs_count is already the sum of fs_rcount
 849                  * and fs_wcount if looking at a single flowop.
 850                  */
 851                 iops = flowop->fo_targets->fo_stats.fs_count;
 852         } else {
 853                 (void) ipc_mutex_lock(&controlstats_lock);
 854                 iops = (controlstats.fs_rcount +
 855                     controlstats.fs_wcount);
 856                 (void) ipc_mutex_unlock(&controlstats_lock);
 857         }
 858 
 859         /* Is this the first time around */
 860         if (flowop->fo_tputlast == 0) {
 861                 flowop->fo_tputlast = iops;
 862                 return (FILEBENCH_OK);
 863         }
 864 
 865         delta = iops - flowop->fo_tputlast;
 866         flowop->fo_tputbucket -= delta;
 867         flowop->fo_tputlast = iops;
 868 
 869         /* No need to block if the q isn't empty */
 870         if (flowop->fo_tputbucket >= 0LL) {
 871                 flowop_endop(threadflow, flowop, 0);
 872                 return (FILEBENCH_OK);
 873         }
 874 
 875         iops = flowop->fo_tputbucket * -1;
 876         events = iops;
 877 
 878         flowop_beginop(threadflow, flowop);
 879         while (filebench_shm->shm_eventgen_enabled) {
 880 
 881                 (void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
 882                 if (filebench_shm->shm_eventgen_q >= events) {
 883                         filebench_shm->shm_eventgen_q -= events;
 884                         (void) ipc_mutex_unlock(
 885                             &filebench_shm->shm_eventgen_lock);
 886                         flowop->fo_tputbucket += events;
 887                         break;
 888                 }
 889                 (void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
 890                     &filebench_shm->shm_eventgen_lock);
 891                 (void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
 892         }
 893         flowop_endop(threadflow, flowop, 0);
 894 
 895         return (FILEBENCH_OK);
 896 }
 897 
 898 /*
 899  * Blocks the calling thread if the number of issued filebench
 900  * operations exceeds the number of posted events, thus limiting
 901  * the average filebench operation rate to the rate specified by
 902  * eventgen_hz. Always returns FILEBENCH_OK.
 903  */
 904 static int
 905 flowoplib_opslimit(threadflow_t *threadflow, flowop_t *flowop)
 906 {
 907         uint64_t ops;
 908         uint64_t delta;
 909         uint64_t events;
 910 
 911         /* Immediately bail if not set/enabled */
 912         if (!filebench_shm->shm_eventgen_enabled)
 913                 return (FILEBENCH_OK);
 914 
 915         if (flowop->fo_initted == 0) {
 916                 filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
 917                     flowop, threadflow->tf_name, threadflow->tf_instance);
 918                 flowop->fo_initted = 1;
 919 
 920                 if (flowoplib_event_find_target(threadflow, flowop)
 921                     == FILEBENCH_ERROR)
 922                         return (FILEBENCH_ERROR);
 923         }
 924 
 925         if (flowop->fo_targets) {
 926                 ops = flowop->fo_targets->fo_stats.fs_count;
 927         } else {
 928                 (void) ipc_mutex_lock(&controlstats_lock);
 929                 ops = controlstats.fs_count;
 930                 (void) ipc_mutex_unlock(&controlstats_lock);
 931         }
 932 
 933         /* Is this the first time around */
 934         if (flowop->fo_tputlast == 0) {
 935                 flowop->fo_tputlast = ops;
 936                 return (FILEBENCH_OK);
 937         }
 938 
 939         delta = ops - flowop->fo_tputlast;
 940         flowop->fo_tputbucket -= delta;
 941         flowop->fo_tputlast = ops;
 942 
 943         /* No need to block if the q isn't empty */
 944         if (flowop->fo_tputbucket >= 0LL) {
 945                 flowop_endop(threadflow, flowop, 0);
 946                 return (FILEBENCH_OK);
 947         }
 948 
 949         ops = flowop->fo_tputbucket * -1;
 950         events = ops;
 951 
 952         flowop_beginop(threadflow, flowop);
 953         while (filebench_shm->shm_eventgen_enabled) {
 954                 (void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
 955                 if (filebench_shm->shm_eventgen_q >= events) {
 956                         filebench_shm->shm_eventgen_q -= events;
 957                         (void) ipc_mutex_unlock(
 958                             &filebench_shm->shm_eventgen_lock);
 959                         flowop->fo_tputbucket += events;
 960                         break;
 961                 }
 962                 (void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
 963                     &filebench_shm->shm_eventgen_lock);
 964                 (void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
 965         }
 966         flowop_endop(threadflow, flowop, 0);
 967 
 968         return (FILEBENCH_OK);
 969 }
 970 
 971 
 972 /*
 973  * Blocks the calling thread if the number of bytes of I/O
 974  * issued exceeds one megabyte times the number of posted
 975  * events, thus limiting the average I/O byte rate to one
 976  * megabyte times the event rate as set by eventgen_hz.
 977  * Always retuns FILEBENCH_OK.
 978  */
 979 static int
 980 flowoplib_bwlimit(threadflow_t *threadflow, flowop_t *flowop)
 981 {
 982         uint64_t bytes;
 983         uint64_t delta;
 984         uint64_t events;
 985 
 986         /* Immediately bail if not set/enabled */
 987         if (!filebench_shm->shm_eventgen_enabled)
 988                 return (FILEBENCH_OK);
 989 
 990         if (flowop->fo_initted == 0) {
 991                 filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
 992                     flowop, threadflow->tf_name, threadflow->tf_instance);
 993                 flowop->fo_initted = 1;
 994 
 995                 if (flowoplib_event_find_target(threadflow, flowop)
 996                     == FILEBENCH_ERROR)
 997                         return (FILEBENCH_ERROR);
 998 
 999                 if ((flowop->fo_targets) &&
1000                     ((flowop->fo_targets->fo_attrs &
1001                     (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
1002                         filebench_log(LOG_ERROR,
1003                             "WARNING: Flowop %s does no Reads or Writes",
1004                             flowop->fo_targets->fo_name);
1005                         filebench_shutdown(1);
1006                         return (FILEBENCH_ERROR);
1007                 }
1008         }
1009 
1010         if (flowop->fo_targets) {
1011                 /*
1012                  * Note that fs_bytes is already the sum of fs_rbytes
1013                  * and fs_wbytes if looking at a single flowop.
1014                  */
1015                 bytes = flowop->fo_targets->fo_stats.fs_bytes;
1016         } else {
1017                 (void) ipc_mutex_lock(&controlstats_lock);
1018                 bytes = (controlstats.fs_rbytes +
1019                     controlstats.fs_wbytes);
1020                 (void) ipc_mutex_unlock(&controlstats_lock);
1021         }
1022 
1023         /* Is this the first time around? */
1024         if (flowop->fo_tputlast == 0) {
1025                 flowop->fo_tputlast = bytes;
1026                 return (FILEBENCH_OK);
1027         }
1028 
1029         delta = bytes - flowop->fo_tputlast;
1030         flowop->fo_tputbucket -= delta;
1031         flowop->fo_tputlast = bytes;
1032 
1033         /* No need to block if the q isn't empty */
1034         if (flowop->fo_tputbucket >= 0LL) {
1035                 flowop_endop(threadflow, flowop, 0);
1036                 return (FILEBENCH_OK);
1037         }
1038 
1039         bytes = flowop->fo_tputbucket * -1;
1040         events = (bytes / MB) + 1;
1041 
1042         filebench_log(LOG_DEBUG_IMPL, "%llu bytes, %llu events",
1043             (u_longlong_t)bytes, (u_longlong_t)events);
1044 
1045         flowop_beginop(threadflow, flowop);
1046         while (filebench_shm->shm_eventgen_enabled) {
1047                 (void) ipc_mutex_lock(&filebench_shm->shm_eventgen_lock);
1048                 if (filebench_shm->shm_eventgen_q >= events) {
1049                         filebench_shm->shm_eventgen_q -= events;
1050                         (void) ipc_mutex_unlock(
1051                             &filebench_shm->shm_eventgen_lock);
1052                         flowop->fo_tputbucket += (events * MB);
1053                         break;
1054                 }
1055                 (void) pthread_cond_wait(&filebench_shm->shm_eventgen_cv,
1056                     &filebench_shm->shm_eventgen_lock);
1057                 (void) ipc_mutex_unlock(&filebench_shm->shm_eventgen_lock);
1058         }
1059         flowop_endop(threadflow, flowop, 0);
1060 
1061         return (FILEBENCH_OK);
1062 }
1063 
1064 /*
1065  * These flowops terminate a benchmark run when either the specified
1066  * number of bytes of I/O (flowoplib_finishonbytes) or the specified
1067  * number of I/O operations (flowoplib_finishoncount) have been generated.
1068  */
1069 
1070 
1071 /*
1072  * Stop filebench run when specified number of I/O bytes have been
1073  * transferred. Compares controlstats.fs_bytes with flowop->value,
1074  * and if greater returns 1, stopping the run, if not, returns 0
1075  * to continue running.
1076  */
1077 static int
1078 flowoplib_finishonbytes(threadflow_t *threadflow, flowop_t *flowop)
1079 {
1080         uint64_t bytes_io;              /* Bytes of I/O delivered so far */
1081         uint64_t byte_lim = flowop->fo_constvalue;  /* Total Bytes desired */
1082                                                     /* Uses constant value */
1083 
1084         if (flowop->fo_initted == 0) {
1085                 filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1086                     flowop, threadflow->tf_name, threadflow->tf_instance);
1087                 flowop->fo_initted = 1;
1088 
1089                 if (flowoplib_event_find_target(threadflow, flowop)
1090                     == FILEBENCH_ERROR)
1091                         return (FILEBENCH_ERROR);
1092 
1093                 if ((flowop->fo_targets) &&
1094                     ((flowop->fo_targets->fo_attrs &
1095                     (FLOW_ATTR_READ | FLOW_ATTR_WRITE)) == 0)) {
1096                         filebench_log(LOG_ERROR,
1097                             "WARNING: Flowop %s does no Reads or Writes",
1098                             flowop->fo_targets->fo_name);
1099                         filebench_shutdown(1);
1100                         return (FILEBENCH_ERROR);
1101                 }
1102         }
1103 
1104         if (flowop->fo_targets) {
1105                 bytes_io = flowop->fo_targets->fo_stats.fs_bytes;
1106         } else {
1107                 (void) ipc_mutex_lock(&controlstats_lock);
1108                 bytes_io = controlstats.fs_bytes;
1109                 (void) ipc_mutex_unlock(&controlstats_lock);
1110         }
1111 
1112         flowop_beginop(threadflow, flowop);
1113         if (bytes_io > byte_lim) {
1114                 flowop_endop(threadflow, flowop, 0);
1115                 return (FILEBENCH_DONE);
1116         }
1117         flowop_endop(threadflow, flowop, 0);
1118 
1119         return (FILEBENCH_OK);
1120 }
1121 
1122 /*
1123  * Stop filebench run when specified number of I/O operations have
1124  * been performed. Compares controlstats.fs_count with *flowop->value,
1125  * and if greater returns 1, stopping the run, if not, returns FILEBENCH_OK
1126  * to continue running.
1127  */
1128 static int
1129 flowoplib_finishoncount(threadflow_t *threadflow, flowop_t *flowop)
1130 {
1131         uint64_t ops;
1132         uint64_t count = flowop->fo_constvalue; /* use constant value */
1133 
1134         if (flowop->fo_initted == 0) {
1135                 filebench_log(LOG_DEBUG_IMPL, "rate %zx %s-%d locking",
1136                     flowop, threadflow->tf_name, threadflow->tf_instance);
1137                 flowop->fo_initted = 1;
1138 
1139                 if (flowoplib_event_find_target(threadflow, flowop)
1140                     == FILEBENCH_ERROR)
1141                         return (FILEBENCH_ERROR);
1142         }
1143 
1144         if (flowop->fo_targets) {
1145                 ops = flowop->fo_targets->fo_stats.fs_count;
1146         } else {
1147                 (void) ipc_mutex_lock(&controlstats_lock);
1148                 ops = controlstats.fs_count;
1149                 (void) ipc_mutex_unlock(&controlstats_lock);
1150         }
1151 
1152         flowop_beginop(threadflow, flowop);
1153         if (ops >= count) {
1154                 flowop_endop(threadflow, flowop, 0);
1155                 return (FILEBENCH_DONE);
1156         }
1157         flowop_endop(threadflow, flowop, 0);
1158 
1159         return (FILEBENCH_OK);
1160 }
1161 
1162 /*
1163  * Semaphore synchronization using either System V semaphores or
1164  * posix semaphores. If System V semaphores are available, they will be
1165  * used, otherwise posix semaphores will be used.
1166  */
1167 
1168 
1169 /*
1170  * Initializes the filebench "block on semaphore" flowop.
1171  * If System V semaphores are implemented, the routine
1172  * initializes the System V semaphore subsystem if it hasn't
1173  * already been initialized, also allocates a pair of semids
1174  * and initializes the highwater System V semaphore.
1175  * If no System V semaphores, then does nothing special.
1176  * Returns FILEBENCH_ERROR if it cannot acquire a set of System V semphores
1177  * or if the initial post to the semaphore set fails. Returns FILEBENCH_OK
1178  * on success.
1179  */
1180 static int
1181 flowoplib_semblock_init(flowop_t *flowop)
1182 {
1183 
1184 #ifdef HAVE_SYSV_SEM
1185         int sys_semid;
1186         struct sembuf sbuf[2];
1187         int highwater;
1188 
1189         ipc_seminit();
1190 
1191         flowop->fo_semid_lw = ipc_semidalloc();
1192         flowop->fo_semid_hw = ipc_semidalloc();
1193 
1194         filebench_log(LOG_DEBUG_IMPL, "flow %s-%d semblock init semid=%x",
1195             flowop->fo_name, flowop->fo_instance, flowop->fo_semid_lw);
1196 
1197         sys_semid = filebench_shm->shm_sys_semid;
1198 
1199         if ((highwater = flowop->fo_semid_hw) == 0)
1200                 highwater = flowop->fo_constvalue; /* use constant value */
1201 
1202         filebench_log(LOG_DEBUG_IMPL, "setting highwater to : %d", highwater);
1203 
1204         sbuf[0].sem_num = (short)highwater;
1205         sbuf[0].sem_op = avd_get_int(flowop->fo_highwater);
1206         sbuf[0].sem_flg = 0;
1207         if ((semop(sys_semid, &sbuf[0], 1) == -1) && errno) {
1208                 filebench_log(LOG_ERROR, "semblock init post failed: %s (%d,"
1209                     "%d)", strerror(errno), sbuf[0].sem_num, sbuf[0].sem_op);
1210                 return (FILEBENCH_ERROR);
1211         }
1212 #else
1213         filebench_log(LOG_DEBUG_IMPL,
1214             "flow %s-%d semblock init with posix semaphore",
1215             flowop->fo_name, flowop->fo_instance);
1216 
1217         sem_init(&flowop->fo_sem, 1, 0);
1218 #endif  /* HAVE_SYSV_SEM */
1219 
1220         if (!(avd_get_bool(flowop->fo_blocking)))
1221                 (void) ipc_mutex_unlock(&flowop->fo_lock);
1222 
1223         return (FILEBENCH_OK);
1224 }
1225 
1226 /*
1227  * Releases the semids for the System V semaphore allocated
1228  * to this flowop. If not using System V semaphores, then
1229  * it is effectively just a no-op.
1230  */
1231 static void
1232 flowoplib_semblock_destruct(flowop_t *flowop)
1233 {
1234 #ifdef HAVE_SYSV_SEM
1235         ipc_semidfree(flowop->fo_semid_lw);
1236         ipc_semidfree(flowop->fo_semid_hw);
1237 #else
1238         sem_destroy(&flowop->fo_sem);
1239 #endif /* HAVE_SYSV_SEM */
1240 }
1241 
1242 /*
1243  * Attempts to pass a System V or posix semaphore as appropriate,
1244  * and blocks if necessary. Returns FILEBENCH_ERROR if a set of System V
1245  * semphores is not available or cannot be acquired, or if the initial
1246  * post to the semaphore set fails. Returns FILEBENCH_OK on success.
1247  */
1248 static int
1249 flowoplib_semblock(threadflow_t *threadflow, flowop_t *flowop)
1250 {
1251 
1252 #ifdef HAVE_SYSV_SEM
1253         struct sembuf sbuf[2];
1254         int value = avd_get_int(flowop->fo_value);
1255         int sys_semid;
1256         struct timespec timeout;
1257 
1258         sys_semid = filebench_shm->shm_sys_semid;
1259 
1260         filebench_log(LOG_DEBUG_IMPL,
1261             "flow %s-%d sem blocking on id %x num %x value %d",
1262             flowop->fo_name, flowop->fo_instance, sys_semid,
1263             flowop->fo_semid_hw, value);
1264 
1265         /* Post, decrement the increment the hw queue */
1266         sbuf[0].sem_num = flowop->fo_semid_hw;
1267         sbuf[0].sem_op = (short)value;
1268         sbuf[0].sem_flg = 0;
1269         sbuf[1].sem_num = flowop->fo_semid_lw;
1270         sbuf[1].sem_op = value * -1;
1271         sbuf[1].sem_flg = 0;
1272         timeout.tv_sec = 600;
1273         timeout.tv_nsec = 0;
1274 
1275         if (avd_get_bool(flowop->fo_blocking))
1276                 (void) ipc_mutex_unlock(&flowop->fo_lock);
1277 
1278         flowop_beginop(threadflow, flowop);
1279 
1280 #ifdef HAVE_SEMTIMEDOP
1281         (void) semtimedop(sys_semid, &sbuf[0], 1, &timeout);
1282         (void) semtimedop(sys_semid, &sbuf[1], 1, &timeout);
1283 #else
1284         (void) semop(sys_semid, &sbuf[0], 1);
1285         (void) semop(sys_semid, &sbuf[1], 1);
1286 #endif /* HAVE_SEMTIMEDOP */
1287 
1288         if (avd_get_bool(flowop->fo_blocking))
1289                 (void) ipc_mutex_lock(&flowop->fo_lock);
1290 
1291         flowop_endop(threadflow, flowop, 0);
1292 
1293 #else
1294         int value = avd_get_int(flowop->fo_value);
1295         int i;
1296 
1297         filebench_log(LOG_DEBUG_IMPL,
1298             "flow %s-%d sem blocking on posix semaphore",
1299             flowop->fo_name, flowop->fo_instance);
1300 
1301         /* Decrement sem by value */
1302         for (i = 0; i < value; i++) {
1303                 if (sem_wait(&flowop->fo_sem) == -1) {
1304                         filebench_log(LOG_ERROR, "semop wait failed");
1305                         return (FILEBENCH_ERROR);
1306                 }
1307         }
1308 
1309         filebench_log(LOG_DEBUG_IMPL, "flow %s-%d sem unblocking",
1310             flowop->fo_name, flowop->fo_instance);
1311 #endif /* HAVE_SYSV_SEM */
1312 
1313         return (FILEBENCH_OK);
1314 }
1315 
1316 /*
1317  * Calls ipc_seminit(). Always returns FILEBENCH_OK.
1318  */
1319 /* ARGSUSED */
1320 static int
1321 flowoplib_sempost_init(flowop_t *flowop)
1322 {
1323 #ifdef HAVE_SYSV_SEM
1324         ipc_seminit();
1325 #endif /* HAVE_SYSV_SEM */
1326         return (FILEBENCH_OK);
1327 }
1328 
1329 /*
1330  * Post to a System V or posix semaphore as appropriate.
1331  * On the first call for a given flowop instance, this routine
1332  * will use the fo_targetname attribute to locate all semblock
1333  * flowops that are expecting posts from this flowop. All
1334  * target flowops on this list will have a post operation done
1335  * to their semaphores on each call.
1336  */
1337 static int
1338 flowoplib_sempost(threadflow_t *threadflow, flowop_t *flowop)
1339 {
1340         flowop_t *target;
1341 
1342         filebench_log(LOG_DEBUG_IMPL,
1343             "sempost flow %s-%d",
1344             flowop->fo_name,
1345             flowop->fo_instance);
1346 
1347         /* if this is the first post, create the post list */
1348         if (flowop->fo_targets == NULL) {
1349                 flowop_t *result = flowop_find(flowop->fo_targetname);
1350 
1351                 flowop->fo_targets = result;
1352 
1353                 if (result == NULL) {
1354                         filebench_log(LOG_ERROR,
1355                             "sempost: could not find op %s for thread %s",
1356                             flowop->fo_targetname,
1357                             threadflow->tf_name);
1358                         filebench_shutdown(1);
1359                 }
1360 
1361                 while (result) {
1362                         result->fo_targetnext =
1363                             result->fo_resultnext;
1364                         result = result->fo_resultnext;
1365                 }
1366         }
1367 
1368         target = flowop->fo_targets;
1369 
1370         flowop_beginop(threadflow, flowop);
1371         /* post to the targets */
1372         while (target) {
1373 #ifdef HAVE_SYSV_SEM
1374                 struct sembuf sbuf[2];
1375                 int sys_semid;
1376                 int blocking;
1377 #else
1378                 int i;
1379 #endif /* HAVE_SYSV_SEM */
1380                 struct timespec timeout;
1381                 int value = (int)avd_get_int(flowop->fo_value);
1382 
1383                 if (target->fo_instance == FLOW_MASTER) {
1384                         target = target->fo_targetnext;
1385                         continue;
1386                 }
1387 
1388 #ifdef HAVE_SYSV_SEM
1389 
1390                 filebench_log(LOG_DEBUG_IMPL,
1391                     "sempost flow %s-%d num %x",
1392                     target->fo_name,
1393                     target->fo_instance,
1394                     target->fo_semid_lw);
1395 
1396                 sys_semid = filebench_shm->shm_sys_semid;
1397                 sbuf[0].sem_num = target->fo_semid_lw;
1398                 sbuf[0].sem_op = (short)value;
1399                 sbuf[0].sem_flg = 0;
1400                 sbuf[1].sem_num = target->fo_semid_hw;
1401                 sbuf[1].sem_op = value * -1;
1402                 sbuf[1].sem_flg = 0;
1403                 timeout.tv_sec = 600;
1404                 timeout.tv_nsec = 0;
1405 
1406                 if (avd_get_bool(flowop->fo_blocking))
1407                         blocking = 1;
1408                 else
1409                         blocking = 0;
1410 
1411 #ifdef HAVE_SEMTIMEDOP
1412                 if ((semtimedop(sys_semid, &sbuf[0], blocking + 1,
1413                     &timeout) == -1) && (errno && (errno != EAGAIN))) {
1414 #else
1415                 if ((semop(sys_semid, &sbuf[0], blocking + 1) == -1) &&
1416                     (errno && (errno != EAGAIN))) {
1417 #endif /* HAVE_SEMTIMEDOP */
1418                         filebench_log(LOG_ERROR, "semop post failed: %s",
1419                             strerror(errno));
1420                         return (FILEBENCH_ERROR);
1421                 }
1422 
1423                 filebench_log(LOG_DEBUG_IMPL,
1424                     "flow %s-%d finished posting",
1425                     target->fo_name, target->fo_instance);
1426 #else
1427                 filebench_log(LOG_DEBUG_IMPL,
1428                     "sempost flow %s-%d to posix semaphore",
1429                     target->fo_name,
1430                     target->fo_instance);
1431 
1432                 /* Increment sem by value */
1433                 for (i = 0; i < value; i++) {
1434                         if (sem_post(&target->fo_sem) == -1) {
1435                                 filebench_log(LOG_ERROR, "semop post failed");
1436                                 return (FILEBENCH_ERROR);
1437                         }
1438                 }
1439 
1440                 filebench_log(LOG_DEBUG_IMPL, "flow %s-%d unblocking",
1441                     target->fo_name, target->fo_instance);
1442 #endif /* HAVE_SYSV_SEM */
1443 
1444                 target = target->fo_targetnext;
1445         }
1446         flowop_endop(threadflow, flowop, 0);
1447 
1448         return (FILEBENCH_OK);
1449 }
1450 
1451 
1452 /*
1453  * Section for exercising create / open / close / delete operations
1454  * on files within a fileset. For proper operation, the flowop attribute
1455  * "fd", which sets the fo_fdnumber field in the flowop, must be used
1456  * so that the same file is opened and later closed. "fd" is an index
1457  * into a pair of arrays maintained by threadflows, one of which
1458  * contains the operating system assigned file descriptors and the other
1459  * a pointer to the filesetentry whose file the file descriptor
1460  * references. An openfile flowop defined without fd being set will use
1461  * the default (0) fd or, if specified, rotate through fd indices, but
1462  * createfile and closefile must use the default or a specified fd.
1463  * Meanwhile deletefile picks and arbitrary file to delete, regardless
1464  * of fd attribute.
1465  */
1466 
1467 /*
1468  * Emulates (and actually does) file open. Obtains a file descriptor
1469  * index, then calls flowoplib_openfile_common() to open. Returns
1470  * FILEBENCH_ERROR if no file descriptor is found, and returns the
1471  * status from flowoplib_openfile_common otherwise (FILEBENCH_ERROR,
1472  * FILEBENCH_NORSC, FILEBENCH_OK).
1473  */
1474 static int
1475 flowoplib_openfile(threadflow_t *threadflow, flowop_t *flowop)
1476 {
1477         int fd = flowoplib_fdnum(threadflow, flowop);
1478 
1479         if (fd == -1)
1480                 return (FILEBENCH_ERROR);
1481 
1482         return (flowoplib_openfile_common(threadflow, flowop, fd));
1483 }
1484 
1485 /*
1486  * Common file opening code for filesets. Uses the supplied
1487  * file descriptor index to determine the tf_fd entry to use.
1488  * If the entry is empty (0) and the fileset exists, fileset
1489  * pick is called to select a fileset entry to use. The file
1490  * specified in the filesetentry is opened, and the returned
1491  * operating system file descriptor and a pointer to the
1492  * filesetentry are stored in tf_fd[fd] and tf_fse[fd],
1493  * respectively. Returns FILEBENCH_ERROR on error,
1494  * FILEBENCH_NORSC if no suitable filesetentry can be found,
1495  * and FILEBENCH_OK on success.
1496  */
1497 static int
1498 flowoplib_openfile_common(threadflow_t *threadflow, flowop_t *flowop, int fd)
1499 {
1500         filesetentry_t *file;
1501         char *fileset_name;
1502         int tid = 0;
1503         int openflag = 0;
1504         int err;
1505 
1506         if (flowop->fo_fileset == NULL) {
1507                 filebench_log(LOG_ERROR, "flowop NULL file");
1508                 return (FILEBENCH_ERROR);
1509         }
1510 
1511         if ((fileset_name =
1512             avd_get_str(flowop->fo_fileset->fs_name)) == NULL) {
1513                 filebench_log(LOG_ERROR,
1514                     "flowop %s: fileset has no name", flowop->fo_name);
1515                 return (FILEBENCH_ERROR);
1516         }
1517 
1518         /*
1519          * set the open flag for read only or read/write, as appropriate.
1520          */
1521         if (avd_get_bool(flowop->fo_fileset->fs_readonly) == TRUE)
1522                 openflag = O_RDONLY;
1523         else
1524                 openflag = O_RDWR;
1525 
1526         /*
1527          * If the flowop doesn't default to persistent fd
1528          * then get unique thread ID for use by fileset_pick
1529          */
1530         if (avd_get_bool(flowop->fo_rotatefd))
1531                 tid = threadflow->tf_utid;
1532 
1533         if (threadflow->tf_fd[fd].fd_ptr != NULL) {
1534                 filebench_log(LOG_ERROR,
1535                     "flowop %s attempted to open without closing on fd %d",
1536                     flowop->fo_name, fd);
1537                 return (FILEBENCH_ERROR);
1538         }
1539 
1540 #ifdef HAVE_RAW_SUPPORT
1541         if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1542                 int open_attrs = 0;
1543                 char name[MAXPATHLEN];
1544 
1545                 (void) fb_strlcpy(name,
1546                     avd_get_str(flowop->fo_fileset->fs_path), MAXPATHLEN);
1547                 (void) fb_strlcat(name, "/", MAXPATHLEN);
1548                 (void) fb_strlcat(name, fileset_name, MAXPATHLEN);
1549 
1550                 if (avd_get_bool(flowop->fo_dsync)) {
1551 #ifdef sun
1552                         open_attrs |= O_DSYNC;
1553 #else
1554                         open_attrs |= O_FSYNC;
1555 #endif
1556                 }
1557 
1558                 filebench_log(LOG_DEBUG_SCRIPT,
1559                     "open raw device %s flags %d = %d", name, open_attrs, fd);
1560 
1561                 if (FB_OPEN(&(threadflow->tf_fd[fd]), name,
1562                     openflag | open_attrs, 0666) == FILEBENCH_ERROR) {
1563                         filebench_log(LOG_ERROR,
1564                             "Failed to open raw device %s: %s",
1565                             name, strerror(errno));
1566                         return (FILEBENCH_ERROR);
1567                 }
1568 
1569                 /* if running on Solaris, use un-buffered io */
1570 #ifdef sun
1571                 (void) directio(threadflow->tf_fd[fd].fd_num, DIRECTIO_ON);
1572 #endif
1573 
1574                 threadflow->tf_fse[fd] = NULL;
1575 
1576                 return (FILEBENCH_OK);
1577         }
1578 #endif /* HAVE_RAW_SUPPORT */
1579 
1580         if ((err = flowoplib_pickfile(&file, flowop,
1581             FILESET_PICKEXISTS, tid)) != FILEBENCH_OK) {
1582                 filebench_log(LOG_DEBUG_SCRIPT,
1583                     "flowop %s failed to pick file from %s on fd %d",
1584                     flowop->fo_name, fileset_name, fd);
1585                 return (err);
1586         }
1587 
1588         threadflow->tf_fse[fd] = file;
1589 
1590         flowop_beginop(threadflow, flowop);
1591         err = fileset_openfile(&threadflow->tf_fd[fd], flowop->fo_fileset,
1592             file, openflag, 0666, flowoplib_fileattrs(flowop));
1593         flowop_endop(threadflow, flowop, 0);
1594 
1595         if (err == FILEBENCH_ERROR) {
1596                 filebench_log(LOG_ERROR, "flowop %s failed to open file %s",
1597                     flowop->fo_name, file->fse_path);
1598                 return (FILEBENCH_ERROR);
1599         }
1600 
1601         filebench_log(LOG_DEBUG_SCRIPT,
1602             "flowop %s: opened %s fd[%d] = %d",
1603             flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1604 
1605         return (FILEBENCH_OK);
1606 }
1607 
1608 /*
1609  * Emulate create of a file. Uses the flowop's fdnumber to select
1610  * tf_fd and tf_fse array locations to put the created file's file
1611  * descriptor and filesetentry respectively. Uses flowoplib_pickfile()
1612  * to select a specific filesetentry whose file does not currently
1613  * exist for the file create operation. Then calls
1614  * fileset_openfile() with the O_CREATE flag set to create the
1615  * file. Returns FILEBENCH_ERROR if the array index specified by fdnumber is
1616  * already in use, the flowop has no associated fileset, or
1617  * the create call fails. Returns 1 if a filesetentry with a
1618  * nonexistent file cannot be found. Returns FILEBENCH_OK on success.
1619  */
1620 static int
1621 flowoplib_createfile(threadflow_t *threadflow, flowop_t *flowop)
1622 {
1623         filesetentry_t *file;
1624         int fd = flowop->fo_fdnumber;
1625         int err;
1626 
1627         if (threadflow->tf_fd[fd].fd_ptr != NULL) {
1628                 filebench_log(LOG_ERROR,
1629                     "flowop %s attempted to create without closing on fd %d",
1630                     flowop->fo_name, fd);
1631                 return (FILEBENCH_ERROR);
1632         }
1633 
1634         if (flowop->fo_fileset == NULL) {
1635                 filebench_log(LOG_ERROR, "flowop NULL file");
1636                 return (FILEBENCH_ERROR);
1637         }
1638 
1639         if (avd_get_bool(flowop->fo_fileset->fs_readonly) == TRUE) {
1640                 filebench_log(LOG_ERROR, "Can not CREATE the READONLY file %s",
1641                     avd_get_str(flowop->fo_fileset->fs_name));
1642                 return (FILEBENCH_ERROR);
1643         }
1644 
1645 
1646 #ifdef HAVE_RAW_SUPPORT
1647         /* can't be used with raw devices */
1648         if (flowop->fo_fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1649                 filebench_log(LOG_ERROR,
1650                     "flowop %s attempted to a createfile on RAW device",
1651                     flowop->fo_name);
1652                 return (FILEBENCH_ERROR);
1653         }
1654 #endif /* HAVE_RAW_SUPPORT */
1655 
1656         if ((err = flowoplib_pickfile(&file, flowop,
1657             FILESET_PICKNOEXIST, 0)) != FILEBENCH_OK) {
1658                 filebench_log(LOG_DEBUG_SCRIPT,
1659                     "flowop %s failed to pick file from fileset %s",
1660                     flowop->fo_name,
1661                     avd_get_str(flowop->fo_fileset->fs_name));
1662                 return (err);
1663         }
1664 
1665         threadflow->tf_fse[fd] = file;
1666 
1667         flowop_beginop(threadflow, flowop);
1668         err = fileset_openfile(&threadflow->tf_fd[fd], flowop->fo_fileset,
1669             file, O_RDWR | O_CREAT, 0666, flowoplib_fileattrs(flowop));
1670         flowop_endop(threadflow, flowop, 0);
1671 
1672         if (err == FILEBENCH_ERROR) {
1673                 filebench_log(LOG_ERROR, "failed to create file %s",
1674                     flowop->fo_name);
1675                 return (FILEBENCH_ERROR);
1676         }
1677 
1678         filebench_log(LOG_DEBUG_SCRIPT,
1679             "flowop %s: created %s fd[%d] = %d",
1680             flowop->fo_name, file->fse_path, fd, threadflow->tf_fd[fd]);
1681 
1682         return (FILEBENCH_OK);
1683 }
1684 
1685 /*
1686  * Emulates delete of a file. If a valid fd is provided, it uses the
1687  * filesetentry stored at that fd location to select the file to be
1688  * deleted, otherwise it picks an arbitrary filesetentry
1689  * whose file exists. It then uses unlink() to delete it and Clears
1690  * the FSE_EXISTS flag for the filesetentry. Returns FILEBENCH_ERROR if the
1691  * flowop has no associated fileset. Returns FILEBENCH_NORSC if an appropriate
1692  * filesetentry cannot be found, and FILEBENCH_OK on success.
1693  */
1694 static int
1695 flowoplib_deletefile(threadflow_t *threadflow, flowop_t *flowop)
1696 {
1697         filesetentry_t *file;
1698         fileset_t *fileset;
1699         char path[MAXPATHLEN];
1700         char *pathtmp;
1701         int fd = flowop->fo_fdnumber;
1702 
1703         /* if fd specified, use it to access file */
1704         if ((fd > 0) && ((file = threadflow->tf_fse[fd]) != NULL)) {
1705 
1706                 /* indicate that the file will be deleted */
1707                 threadflow->tf_fse[fd] = NULL;
1708 
1709                 /* if here, we still have a valid file pointer */
1710                 fileset = file->fse_fileset;
1711         } else {
1712 
1713                 /* Otherwise, pick arbitrary file */
1714                 file = NULL;
1715                 fileset = flowop->fo_fileset;
1716         }
1717 
1718 
1719         if (fileset == NULL) {
1720                 filebench_log(LOG_ERROR, "flowop NULL file");
1721                 return (FILEBENCH_ERROR);
1722         }
1723 
1724 #ifdef HAVE_RAW_SUPPORT
1725         /* can't be used with raw devices */
1726         if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
1727                 filebench_log(LOG_ERROR,
1728                     "flowop %s attempted a deletefile on RAW device",
1729                     flowop->fo_name);
1730                 return (FILEBENCH_ERROR);
1731         }
1732 #endif /* HAVE_RAW_SUPPORT */
1733 
1734         if (file == NULL) {
1735                 int err;
1736 
1737                 /* pick arbitrary, existing (allocated) file */
1738                 if ((err = flowoplib_pickfile(&file, flowop,
1739                     FILESET_PICKEXISTS, 0)) != FILEBENCH_OK) {
1740                         filebench_log(LOG_DEBUG_SCRIPT,
1741                             "flowop %s failed to pick file", flowop->fo_name);
1742                         return (err);
1743                 }
1744         } else {
1745                 /* delete specific file. wait for it to be non-busy */
1746                 (void) ipc_mutex_lock(&fileset->fs_pick_lock);
1747                 while (file->fse_flags & FSE_BUSY) {
1748                         file->fse_flags |= FSE_THRD_WAITNG;
1749                         (void) pthread_cond_wait(&fileset->fs_thrd_wait_cv,
1750                             &fileset->fs_pick_lock);
1751                 }
1752 
1753                 /* File now available, grab it for deletion */
1754                 file->fse_flags |= FSE_BUSY;
1755                 fileset->fs_idle_files--;
1756                 (void) ipc_mutex_unlock(&fileset->fs_pick_lock);
1757         }
1758 
1759         /* don't delete if anyone (other than me) has file open */
1760         if ((fd > 0) && (threadflow->tf_fd[fd].fd_num > 0)) {
1761                 if (file->fse_open_cnt > 1) {
1762                         filebench_log(LOG_DEBUG_SCRIPT,
1763                             "flowop %s can't delete file opened by other"
1764                             " threads at fd = %d", flowop->fo_name, fd);
1765                         fileset_unbusy(file, FALSE, FALSE, 0);
1766                         return (FILEBENCH_OK);
1767                 } else {
1768                         filebench_log(LOG_DEBUG_SCRIPT,
1769                             "flowop %s deleting still open file at fd = %d",
1770                             flowop->fo_name, fd);
1771                 }
1772         } else if (file->fse_open_cnt > 0) {
1773                 filebench_log(LOG_DEBUG_SCRIPT,
1774                     "flowop %s can't delete file opened by other"
1775                     " threads at fd = %d, open count = %d",
1776                     flowop->fo_name, fd, file->fse_open_cnt);
1777                 fileset_unbusy(file, FALSE, FALSE, 0);
1778                 return (FILEBENCH_OK);
1779         }
1780 
1781         (void) fb_strlcpy(path, avd_get_str(fileset->fs_path), MAXPATHLEN);
1782         (void) fb_strlcat(path, "/", MAXPATHLEN);
1783         (void) fb_strlcat(path, avd_get_str(fileset->fs_name), MAXPATHLEN);
1784         pathtmp = fileset_resolvepath(file);
1785         (void) fb_strlcat(path, pathtmp, MAXPATHLEN);
1786         free(pathtmp);
1787 
1788         /* delete the selected file */
1789         flowop_beginop(threadflow, flowop);
1790         (void) FB_UNLINK(path);
1791         flowop_endop(threadflow, flowop, 0);
1792 
1793         /* indicate that it is no longer busy and no longer exists */
1794         fileset_unbusy(file, TRUE, FALSE, -file->fse_open_cnt);
1795 
1796         filebench_log(LOG_DEBUG_SCRIPT, "deleted file %s", file->fse_path);
1797 
1798         return (FILEBENCH_OK);
1799 }
1800 
1801 /*
1802  * Emulates fsync of a file. Obtains the file descriptor index
1803  * from the flowop, obtains the actual file descriptor from
1804  * the threadflow's table, checks to be sure it is still an
1805  * open file, then does an fsync operation on it. Returns FILEBENCH_ERROR
1806  * if the file no longer is open, FILEBENCH_OK otherwise.
1807  */
1808 static int
1809 flowoplib_fsync(threadflow_t *threadflow, flowop_t *flowop)
1810 {
1811         filesetentry_t *file;
1812         int fd = flowop->fo_fdnumber;
1813 
1814         if (threadflow->tf_fd[fd].fd_ptr == NULL) {
1815                 filebench_log(LOG_ERROR,
1816                     "flowop %s attempted to fsync a closed fd %d",
1817                     flowop->fo_name, fd);
1818                 return (FILEBENCH_ERROR);
1819         }
1820 
1821         file = threadflow->tf_fse[fd];
1822 
1823         if ((file == NULL) ||
1824             (file->fse_fileset->fs_attrs & FILESET_IS_RAW_DEV)) {
1825                 filebench_log(LOG_ERROR,
1826                     "flowop %s attempted to a fsync a RAW device",
1827                     flowop->fo_name);
1828                 return (FILEBENCH_ERROR);
1829         }
1830 
1831         /* Measure time to fsync */
1832         flowop_beginop(threadflow, flowop);
1833         (void) FB_FSYNC(&threadflow->tf_fd[fd]);
1834         flowop_endop(threadflow, flowop, 0);
1835 
1836         filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s", file->fse_path);
1837 
1838         return (FILEBENCH_OK);
1839 }
1840 
1841 /*
1842  * Emulate fsync of an entire fileset. Search through the
1843  * threadflow's file descriptor array, doing fsync() on each
1844  * open file that belongs to the flowop's fileset. Always
1845  * returns FILEBENCH_OK.
1846  */
1847 static int
1848 flowoplib_fsyncset(threadflow_t *threadflow, flowop_t *flowop)
1849 {
1850         int fd;
1851 
1852         for (fd = 0; fd < THREADFLOW_MAXFD; fd++) {
1853                 filesetentry_t *file;
1854 
1855                 /* Match the file set to fsync */
1856                 if ((threadflow->tf_fse[fd] == NULL) ||
1857                     (flowop->fo_fileset != threadflow->tf_fse[fd]->fse_fileset))
1858                         continue;
1859 
1860                 /* Measure time to fsync */
1861                 flowop_beginop(threadflow, flowop);
1862                 (void) FB_FSYNC(&threadflow->tf_fd[fd]);
1863                 flowop_endop(threadflow, flowop, 0);
1864 
1865                 file = threadflow->tf_fse[fd];
1866 
1867                 filebench_log(LOG_DEBUG_SCRIPT, "fsync file %s",
1868                     file->fse_path);
1869         }
1870 
1871         return (FILEBENCH_OK);
1872 }
1873 
1874 /*
1875  * Emulate close of a file.  Obtains the file descriptor index
1876  * from the flowop, obtains the actual file descriptor from the
1877  * threadflow's table, checks to be sure it is still an open
1878  * file, then does a close operation on it. Then sets the
1879  * threadflow file descriptor table entry to 0, and the file set
1880  * entry pointer to NULL. Returns FILEBENCH_ERROR if the file was not open,
1881  * FILEBENCH_OK otherwise.
1882  */
1883 static int
1884 flowoplib_closefile(threadflow_t *threadflow, flowop_t *flowop)
1885 {
1886         filesetentry_t *file;
1887         fileset_t *fileset;
1888         int fd = flowop->fo_fdnumber;
1889 
1890         if (threadflow->tf_fd[fd].fd_ptr == NULL) {
1891                 filebench_log(LOG_ERROR,
1892                     "flowop %s attempted to close an already closed fd %d",
1893                     flowop->fo_name, fd);
1894                 return (FILEBENCH_ERROR);
1895         }
1896 
1897         file = threadflow->tf_fse[fd];
1898         fileset = file->fse_fileset;
1899 
1900         /* Wait for it to be non-busy */
1901         (void) ipc_mutex_lock(&fileset->fs_pick_lock);
1902         while (file->fse_flags & FSE_BUSY) {
1903                 file->fse_flags |= FSE_THRD_WAITNG;
1904                 (void) pthread_cond_wait(&fileset->fs_thrd_wait_cv,
1905                     &fileset->fs_pick_lock);
1906         }
1907 
1908         /* File now available, grab it for closing */
1909         file->fse_flags |= FSE_BUSY;
1910 
1911         /* if last open, set declare idle */
1912         if (file->fse_open_cnt == 1)
1913                 fileset->fs_idle_files--;
1914 
1915         (void) ipc_mutex_unlock(&fileset->fs_pick_lock);
1916 
1917         /* Measure time to close */
1918         flowop_beginop(threadflow, flowop);
1919         (void) FB_CLOSE(&threadflow->tf_fd[fd]);
1920         flowop_endop(threadflow, flowop, 0);
1921 
1922         fileset_unbusy(file, FALSE, FALSE, -1);
1923 
1924         threadflow->tf_fd[fd].fd_ptr = NULL;
1925 
1926         filebench_log(LOG_DEBUG_SCRIPT, "closed file %s", file->fse_path);
1927 
1928         return (FILEBENCH_OK);
1929 }
1930 
1931 /*
1932  * Obtain the full pathname of the directory described by the filesetentry
1933  * indicated by "dir", and copy it into the character array pointed to by
1934  * path. Returns FILEBENCH_ERROR on errors, FILEBENCH_OK otherwise.
1935  */
1936 static int
1937 flowoplib_getdirpath(filesetentry_t *dir, char *path)
1938 {
1939         char            *fileset_path;
1940         char            *fileset_name;
1941         char            *part_path;
1942 
1943         if ((fileset_path = avd_get_str(dir->fse_fileset->fs_path)) == NULL) {
1944                 filebench_log(LOG_ERROR, "Fileset path not set");
1945                 return (FILEBENCH_ERROR);
1946         }
1947 
1948         if ((fileset_name = avd_get_str(dir->fse_fileset->fs_name)) == NULL) {
1949                 filebench_log(LOG_ERROR, "Fileset name not set");
1950                 return (FILEBENCH_ERROR);
1951         }
1952 
1953         (void) fb_strlcpy(path, fileset_path, MAXPATHLEN);
1954         (void) fb_strlcat(path, "/", MAXPATHLEN);
1955         (void) fb_strlcat(path, fileset_name, MAXPATHLEN);
1956 
1957         if ((part_path = fileset_resolvepath(dir)) == NULL)
1958                 return (FILEBENCH_ERROR);
1959 
1960         (void) fb_strlcat(path, part_path, MAXPATHLEN);
1961         free(part_path);
1962 
1963         return (FILEBENCH_OK);
1964 }
1965 
1966 /*
1967  * Use mkdir to create a directory.  Obtains the fileset name from the
1968  * flowop, selects a non-existent leaf directory and obtains its full
1969  * path, then uses mkdir to create it on the storage subsystem (make it
1970  * existent). Returns FILEBENCH_NORSC is there are no more non-existent
1971  * directories in the fileset, FILEBENCH_ERROR on other errors, and
1972  * FILEBENCH_OK on success.
1973  */
1974 static int
1975 flowoplib_makedir(threadflow_t *threadflow, flowop_t *flowop)
1976 {
1977         filesetentry_t  *dir;
1978         int             ret;
1979         char            full_path[MAXPATHLEN];
1980 
1981         if ((ret = flowoplib_pickleafdir(&dir, flowop,
1982             FILESET_PICKNOEXIST)) != FILEBENCH_OK)
1983                 return (ret);
1984 
1985         if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
1986                 return (ret);
1987 
1988         flowop_beginop(threadflow, flowop);
1989         (void) FB_MKDIR(full_path, 0755);
1990         flowop_endop(threadflow, flowop, 0);
1991 
1992         /* indicate that it is no longer busy and now exists */
1993         fileset_unbusy(dir, TRUE, TRUE, 0);
1994 
1995         return (FILEBENCH_OK);
1996 }
1997 
1998 /*
1999  * Use rmdir to delete a directory.  Obtains the fileset name from the
2000  * flowop, selects an existent leaf directory and obtains its full path,
2001  * then uses rmdir to remove it from the storage subsystem (make it
2002  * non-existent). Returns FILEBENCH_NORSC is there are no more existent
2003  * directories in the fileset, FILEBENCH_ERROR on other errors, and
2004  * FILEBENCH_OK on success.
2005  */
2006 static int
2007 flowoplib_removedir(threadflow_t *threadflow, flowop_t *flowop)
2008 {
2009         filesetentry_t *dir;
2010         int             ret;
2011         char            full_path[MAXPATHLEN];
2012 
2013         if ((ret = flowoplib_pickleafdir(&dir, flowop,
2014             FILESET_PICKEXISTS)) != FILEBENCH_OK)
2015                 return (ret);
2016 
2017         if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
2018                 return (ret);
2019 
2020         flowop_beginop(threadflow, flowop);
2021         (void) FB_RMDIR(full_path);
2022         flowop_endop(threadflow, flowop, 0);
2023 
2024         /* indicate that it is no longer busy and no longer exists */
2025         fileset_unbusy(dir, TRUE, FALSE, 0);
2026 
2027         return (FILEBENCH_OK);
2028 }
2029 
2030 /*
2031  * Use opendir(), multiple readdir() calls, and closedir() to list the
2032  * contents of a directory.  Obtains the fileset name from the
2033  * flowop, selects a normal subdirectory (which always exist) and obtains
2034  * its full path, then uses opendir() to get a DIR handle to it from the
2035  * file system, a readdir() loop to access each directory entry, and
2036  * finally cleans up with a closedir(). The latency reported is the total
2037  * for all this activity, and it also reports the total number of bytes
2038  * in the entries as the amount "read". Returns FILEBENCH_ERROR on errors,
2039  * and FILEBENCH_OK on success.
2040  */
2041 static int
2042 flowoplib_listdir(threadflow_t *threadflow, flowop_t *flowop)
2043 {
2044         fileset_t       *fileset;
2045         filesetentry_t  *dir;
2046         DIR             *dir_handle;
2047         struct dirent   *direntp;
2048         int             dir_bytes = 0;
2049         int             ret;
2050         char            full_path[MAXPATHLEN];
2051 
2052         if ((fileset = flowop->fo_fileset) == NULL) {
2053                 filebench_log(LOG_ERROR, "flowop NO fileset");
2054                 return (FILEBENCH_ERROR);
2055         }
2056 
2057         if ((dir = fileset_pick(fileset, FILESET_PICKDIR, 0, 0)) == NULL) {
2058                 filebench_log(LOG_DEBUG_SCRIPT,
2059                     "flowop %s failed to pick directory from fileset %s",
2060                     flowop->fo_name,
2061                     avd_get_str(fileset->fs_name));
2062                 return (FILEBENCH_ERROR);
2063         }
2064 
2065         if ((ret = flowoplib_getdirpath(dir, full_path)) != FILEBENCH_OK)
2066                 return (ret);
2067 
2068         flowop_beginop(threadflow, flowop);
2069 
2070         /* open the directory */
2071         if ((dir_handle = FB_OPENDIR(full_path)) == NULL) {
2072                 filebench_log(LOG_ERROR,
2073                     "flowop %s failed to open directory in fileset %s\n",
2074                     flowop->fo_name, avd_get_str(fileset->fs_name));
2075                 return (FILEBENCH_ERROR);
2076         }
2077 
2078         /* read through the directory entries */
2079         while ((direntp = FB_READDIR(dir_handle)) != NULL) {
2080                 dir_bytes += (strlen(direntp->d_name) +
2081                     sizeof (struct dirent) - 1);
2082         }
2083 
2084         /* close the directory */
2085         (void) FB_CLOSEDIR(dir_handle);
2086 
2087         flowop_endop(threadflow, flowop, dir_bytes);
2088 
2089         /* indicate that it is no longer busy */
2090         fileset_unbusy(dir, FALSE, FALSE, 0);
2091 
2092         return (FILEBENCH_OK);
2093 }
2094 
2095 /*
2096  * Emulate stat of a file. Picks an arbitrary filesetentry with
2097  * an existing file from the flowop's fileset, then performs a
2098  * stat() operation on it. Returns FILEBENCH_ERROR if the flowop has no
2099  * associated fileset. Returns FILEBENCH_NORSC if an appropriate filesetentry
2100  * cannot be found, and FILEBENCH_OK on success.
2101  */
2102 static int
2103 flowoplib_statfile(threadflow_t *threadflow, flowop_t *flowop)
2104 {
2105         filesetentry_t *file;
2106         fileset_t *fileset;
2107         struct stat64 statbuf;
2108         int fd = flowop->fo_fdnumber;
2109 
2110         /* if fd specified and the file is open, use it to access file */
2111         if ((fd > 0) && (threadflow->tf_fd[fd].fd_num > 0)) {
2112 
2113                 /* check whether file handle still valid */
2114                 if ((file = threadflow->tf_fse[fd]) == NULL) {
2115                         filebench_log(LOG_DEBUG_SCRIPT,
2116                             "flowop %s trying to stat NULL file at fd = %d",
2117                             flowop->fo_name, fd);
2118                         return (FILEBENCH_ERROR);
2119                 }
2120 
2121                 /* if here, we still have a valid file pointer */
2122                 fileset = file->fse_fileset;
2123         } else {
2124                 /* Otherwise, pick arbitrary file */
2125                 file = NULL;
2126                 fileset = flowop->fo_fileset;
2127         }
2128 
2129         if (fileset == NULL) {
2130                 filebench_log(LOG_ERROR,
2131                     "statfile with no fileset specified");
2132                 return (FILEBENCH_ERROR);
2133         }
2134 
2135 #ifdef HAVE_RAW_SUPPORT
2136         /* can't be used with raw devices */
2137         if (fileset->fs_attrs & FILESET_IS_RAW_DEV) {
2138                 filebench_log(LOG_ERROR,
2139                     "flowop %s attempted do a statfile on a RAW device",
2140                     flowop->fo_name);
2141                 return (FILEBENCH_ERROR);
2142         }
2143 #endif /* HAVE_RAW_SUPPORT */
2144 
2145         if (file == NULL) {
2146                 char path[MAXPATHLEN];
2147                 char *pathtmp;
2148                 int err;
2149 
2150                 /* pick arbitrary, existing (allocated) file */
2151                 if ((err = flowoplib_pickfile(&file, flowop,
2152                     FILESET_PICKEXISTS, 0)) != FILEBENCH_OK) {
2153                         filebench_log(LOG_DEBUG_SCRIPT,
2154                             "Statfile flowop %s failed to pick file",
2155                             flowop->fo_name);
2156                         return (err);
2157                 }
2158 
2159                 /* resolve path and do a stat on file */
2160                 (void) fb_strlcpy(path, avd_get_str(fileset->fs_path),
2161                     MAXPATHLEN);
2162                 (void) fb_strlcat(path, "/", MAXPATHLEN);
2163                 (void) fb_strlcat(path, avd_get_str(fileset->fs_name),
2164                     MAXPATHLEN);
2165                 pathtmp = fileset_resolvepath(file);
2166                 (void) fb_strlcat(path, pathtmp, MAXPATHLEN);
2167                 free(pathtmp);
2168 
2169                 /* stat the file */
2170                 flowop_beginop(threadflow, flowop);
2171                 if (FB_STAT(path, &statbuf) == -1)
2172                         filebench_log(LOG_ERROR,
2173                             "statfile flowop %s failed", flowop->fo_name);
2174                 flowop_endop(threadflow, flowop, 0);
2175 
2176                 fileset_unbusy(file, FALSE, FALSE, 0);
2177         } else {
2178                 /* stat specific file */
2179                 flowop_beginop(threadflow, flowop);
2180                 if (FB_FSTAT(&threadflow->tf_fd[fd], &statbuf) == -1)
2181                         filebench_log(LOG_ERROR,
2182                             "statfile flowop %s failed", flowop->fo_name);
2183                 flowop_endop(threadflow, flowop, 0);
2184 
2185         }
2186 
2187         return (FILEBENCH_OK);
2188 }
2189 
2190 
2191 /*
2192  * Additional reads and writes. Read and write whole files, write
2193  * and append to files. Some of these work with both fileobjs and
2194  * filesets, others only with filesets. The flowoplib_write routine
2195  * writes from thread memory, while the others read or write using
2196  * fo_buf memory. Note that both flowoplib_read() and
2197  * flowoplib_aiowrite() use thread memory as well.
2198  */
2199 
2200 
2201 /*
2202  * Emulate a read of a whole file. The file must be open with
2203  * file descriptor and filesetentry stored at the locations indexed
2204  * by the flowop's fdnumber. It then seeks to the beginning of the
2205  * associated file, and reads fs_iosize bytes at a time until the end
2206  * of the file. Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if
2207  * out of files, and FILEBENCH_OK on success.
2208  */
2209 static int
2210 flowoplib_readwholefile(threadflow_t *threadflow, flowop_t *flowop)
2211 {
2212         caddr_t iobuf;
2213         off64_t bytes = 0;
2214         fb_fdesc_t *fdesc;
2215         uint64_t wss;
2216         fbint_t iosize;
2217         int ret;
2218         char zerordbuf;
2219 
2220         /* get the file to use */
2221         if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2222             &fdesc)) != FILEBENCH_OK)
2223                 return (ret);
2224 
2225         /* an I/O size of zero means read entire working set with one I/O */
2226         if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2227                 iosize = wss;
2228 
2229         /*
2230          * The file may actually be 0 bytes long, in which case skip
2231          * the buffer set up call (which would fail) and substitute
2232          * a small buffer, which won't really be used.
2233          */
2234         if (iosize == 0) {
2235                 iobuf = (caddr_t)&zerordbuf;
2236                 filebench_log(LOG_DEBUG_SCRIPT,
2237                     "flowop %s read zero length file", flowop->fo_name);
2238         } else {
2239                 if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2240                     iosize) != 0)
2241                         return (FILEBENCH_ERROR);
2242         }
2243 
2244         /* Measure time to read bytes */
2245         flowop_beginop(threadflow, flowop);
2246         (void) FB_LSEEK(fdesc, 0, SEEK_SET);
2247         while ((ret = FB_READ(fdesc, iobuf, iosize)) > 0)
2248                 bytes += ret;
2249 
2250         flowop_endop(threadflow, flowop, bytes);
2251 
2252         if (ret < 0) {
2253                 filebench_log(LOG_ERROR,
2254                     "readwhole fail Failed to read whole file: %s",
2255                     strerror(errno));
2256                 return (FILEBENCH_ERROR);
2257         }
2258 
2259         return (FILEBENCH_OK);
2260 }
2261 
2262 /*
2263  * Emulate a write to a file of size fo_iosize.  Will write
2264  * to a file from a fileset if the flowop's fo_fileset field
2265  * specifies one or its fdnumber is non zero. Otherwise it
2266  * will write to a fileobj file, if one exists. If the file
2267  * is not currently open, the routine will attempt to open
2268  * it. The flowop's fo_wss parameter will be used to set the
2269  * maximum file size if it is non-zero, otherwise the
2270  * filesetentry's  fse_size will be used. A random memory
2271  * buffer offset is calculated, and, if fo_random is TRUE,
2272  * a random file offset is used for the write. Otherwise the
2273  * write is to the next sequential location. Returns
2274  * FILEBENCH_ERROR on errors, FILEBENCH_NORSC if iosetup can't
2275  * obtain a file, or FILEBENCH_OK on success.
2276  */
2277 static int
2278 flowoplib_write(threadflow_t *threadflow, flowop_t *flowop)
2279 {
2280         caddr_t iobuf;
2281         fbint_t wss;
2282         fbint_t iosize;
2283         fb_fdesc_t *fdesc;
2284         int ret;
2285 
2286         iosize = avd_get_int(flowop->fo_iosize);
2287         if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2288             &fdesc, iosize)) != FILEBENCH_OK)
2289                 return (ret);
2290 
2291         if (avd_get_bool(flowop->fo_random)) {
2292                 uint64_t fileoffset;
2293 
2294                 if (filebench_randomno64(&fileoffset,
2295                     wss, iosize, NULL) == -1) {
2296                         filebench_log(LOG_ERROR,
2297                             "file size smaller than IO size for thread %s",
2298                             flowop->fo_name);
2299                         return (FILEBENCH_ERROR);
2300                 }
2301                 flowop_beginop(threadflow, flowop);
2302                 if (FB_PWRITE(fdesc, iobuf,
2303                     iosize, (off64_t)fileoffset) == -1) {
2304                         filebench_log(LOG_ERROR, "write failed, "
2305                             "offset %llu io buffer %zd: %s",
2306                             (u_longlong_t)fileoffset, iobuf, strerror(errno));
2307                         flowop_endop(threadflow, flowop, 0);
2308                         return (FILEBENCH_ERROR);
2309                 }
2310                 flowop_endop(threadflow, flowop, iosize);
2311         } else {
2312                 flowop_beginop(threadflow, flowop);
2313                 if (FB_WRITE(fdesc, iobuf, iosize) == -1) {
2314                         filebench_log(LOG_ERROR,
2315                             "write failed, io buffer %zd: %s",
2316                             iobuf, strerror(errno));
2317                         flowop_endop(threadflow, flowop, 0);
2318                         return (FILEBENCH_ERROR);
2319                 }
2320                 flowop_endop(threadflow, flowop, iosize);
2321         }
2322 
2323         return (FILEBENCH_OK);
2324 }
2325 
2326 /*
2327  * Emulate a write of a whole file.  The size of the file
2328  * is taken from a filesetentry identified by fo_srcfdnumber or
2329  * from the working set size, while the file descriptor used is
2330  * identified by fo_fdnumber. Does multiple writes of fo_iosize
2331  * length length until full file has been written. Returns FILEBENCH_ERROR on
2332  * error, FILEBENCH_NORSC if out of files, FILEBENCH_OK on success.
2333  */
2334 static int
2335 flowoplib_writewholefile(threadflow_t *threadflow, flowop_t *flowop)
2336 {
2337         caddr_t iobuf;
2338         filesetentry_t *file;
2339         int wsize;
2340         off64_t seek;
2341         off64_t bytes = 0;
2342         uint64_t wss;
2343         fbint_t iosize;
2344         fb_fdesc_t *fdesc;
2345         int srcfd = flowop->fo_srcfdnumber;
2346         int ret;
2347         char zerowrtbuf;
2348 
2349         /* get the file to use */
2350         if ((ret = flowoplib_filesetup(threadflow, flowop, &wss,
2351             &fdesc)) != FILEBENCH_OK)
2352                 return (ret);
2353 
2354         /* an I/O size of zero means write entire working set with one I/O */
2355         if ((iosize = avd_get_int(flowop->fo_iosize)) == 0)
2356                 iosize = wss;
2357 
2358         /*
2359          * The file may actually be 0 bytes long, in which case skip
2360          * the buffer set up call (which would fail) and substitute
2361          * a small buffer, which won't really be used.
2362          */
2363         if (iosize == 0) {
2364                 iobuf = (caddr_t)&zerowrtbuf;
2365                 filebench_log(LOG_DEBUG_SCRIPT,
2366                     "flowop %s wrote zero length file", flowop->fo_name);
2367         } else {
2368                 if (flowoplib_iobufsetup(threadflow, flowop, &iobuf,
2369                     iosize) != 0)
2370                         return (FILEBENCH_ERROR);
2371         }
2372 
2373         file = threadflow->tf_fse[srcfd];
2374         if ((srcfd != 0) && (file == NULL)) {
2375                 filebench_log(LOG_ERROR, "flowop %s: NULL src file",
2376                     flowop->fo_name);
2377                 return (FILEBENCH_ERROR);
2378         }
2379 
2380         if (file)
2381                 wss = file->fse_size;
2382 
2383         wsize = (int)MIN(wss, iosize);
2384 
2385         /* Measure time to write bytes */
2386         flowop_beginop(threadflow, flowop);
2387         for (seek = 0; seek < wss; seek += wsize) {
2388                 ret = FB_WRITE(fdesc, iobuf, wsize);
2389                 if (ret != wsize) {
2390                         filebench_log(LOG_ERROR,
2391                             "Failed to write %d bytes on fd %d: %s",
2392                             wsize, fdesc->fd_num, strerror(errno));
2393                         flowop_endop(threadflow, flowop, 0);
2394                         return (FILEBENCH_ERROR);
2395                 }
2396                 wsize = (int)MIN(wss - seek, iosize);
2397                 bytes += ret;
2398         }
2399         flowop_endop(threadflow, flowop, bytes);
2400 
2401         return (FILEBENCH_OK);
2402 }
2403 
2404 
2405 /*
2406  * Emulate a fixed size append to a file. Will append data to
2407  * a file chosen from a fileset if the flowop's fo_fileset
2408  * field specifies one or if its fdnumber is non zero.
2409  * Otherwise it will write to a fileobj file, if one exists.
2410  * The flowop's fo_wss parameter will be used to set the
2411  * maximum file size if it is non-zero, otherwise the
2412  * filesetentry's fse_size will be used. A random memory
2413  * buffer offset is calculated, then a logical seek to the
2414  * end of file is done followed by a write of fo_iosize
2415  * bytes. Writes are actually done from fo_buf, rather than
2416  * tf_mem as is done with flowoplib_write(), and no check
2417  * is made to see if fo_iosize exceeds the size of fo_buf.
2418  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2419  * files in the fileset, FILEBENCH_OK on success.
2420  */
2421 static int
2422 flowoplib_appendfile(threadflow_t *threadflow, flowop_t *flowop)
2423 {
2424         caddr_t iobuf;
2425         fb_fdesc_t *fdesc;
2426         fbint_t wss;
2427         fbint_t iosize;
2428         int ret;
2429 
2430         iosize = avd_get_int(flowop->fo_iosize);
2431         if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2432             &fdesc, iosize)) != FILEBENCH_OK)
2433                 return (ret);
2434 
2435         /* XXX wss is not being used */
2436 
2437         /* Measure time to write bytes */
2438         flowop_beginop(threadflow, flowop);
2439         (void) FB_LSEEK(fdesc, 0, SEEK_END);
2440         ret = FB_WRITE(fdesc, iobuf, iosize);
2441         if (ret != iosize) {
2442                 filebench_log(LOG_ERROR,
2443                     "Failed to write %llu bytes on fd %d: %s",
2444                     (u_longlong_t)iosize, fdesc->fd_num, strerror(errno));
2445                 flowop_endop(threadflow, flowop, ret);
2446                 return (FILEBENCH_ERROR);
2447         }
2448         flowop_endop(threadflow, flowop, ret);
2449 
2450         return (FILEBENCH_OK);
2451 }
2452 
2453 /*
2454  * Emulate a random size append to a file. Will append data
2455  * to a file chosen from a fileset if the flowop's fo_fileset
2456  * field specifies one or if its fdnumber is non zero. Otherwise
2457  * it will write to a fileobj file, if one exists. The flowop's
2458  * fo_wss parameter will be used to set the maximum file size
2459  * if it is non-zero, otherwise the filesetentry's fse_size
2460  * will be used.  A random transfer size (but at most fo_iosize
2461  * bytes) and a random memory offset are calculated. A logical
2462  * seek to the end of file is done, then writes of up to
2463  * FILE_ALLOC_BLOCK in size are done until the full transfer
2464  * size has been written. Writes are actually done from fo_buf,
2465  * rather than tf_mem as is done with flowoplib_write().
2466  * Returns FILEBENCH_ERROR on error, FILEBENCH_NORSC if out of
2467  * files in the fileset, FILEBENCH_OK on success.
2468  */
2469 static int
2470 flowoplib_appendfilerand(threadflow_t *threadflow, flowop_t *flowop)
2471 {
2472         caddr_t iobuf;
2473         uint64_t appendsize;
2474         fb_fdesc_t *fdesc;
2475         fbint_t wss;
2476         fbint_t iosize;
2477         int ret = 0;
2478 
2479         if ((iosize = avd_get_int(flowop->fo_iosize)) == 0) {
2480                 filebench_log(LOG_ERROR, "zero iosize for flowop %s",
2481                     flowop->fo_name);
2482                 return (FILEBENCH_ERROR);
2483         }
2484 
2485         if (filebench_randomno64(&appendsize, iosize, 1LL, NULL) != 0)
2486                 return (FILEBENCH_ERROR);
2487 
2488         /* skip if attempting zero length append */
2489         if (appendsize == 0) {
2490                 flowop_beginop(threadflow, flowop);
2491                 flowop_endop(threadflow, flowop, 0LL);
2492                 return (FILEBENCH_OK);
2493         }
2494 
2495         if ((ret = flowoplib_iosetup(threadflow, flowop, &wss, &iobuf,
2496             &fdesc, appendsize)) != FILEBENCH_OK)
2497                 return (ret);
2498 
2499         /* XXX wss is not being used */
2500 
2501         /* Measure time to write bytes */
2502         flowop_beginop(threadflow, flowop);
2503 
2504         (void) FB_LSEEK(fdesc, 0, SEEK_END);
2505         ret = FB_WRITE(fdesc, iobuf, appendsize);
2506         if (ret != appendsize) {
2507                 filebench_log(LOG_ERROR,
2508                     "Failed to write %llu bytes on fd %d: %s",
2509                     (u_longlong_t)appendsize, fdesc->fd_num, strerror(errno));
2510                 flowop_endop(threadflow, flowop, 0);
2511                 return (FILEBENCH_ERROR);
2512         }
2513 
2514         flowop_endop(threadflow, flowop, appendsize);
2515 
2516         return (FILEBENCH_OK);
2517 }
2518 
2519 typedef struct testrandvar_priv {
2520         uint64_t sample_count;
2521         double val_sum;
2522         double sqr_sum;
2523 } testrandvar_priv_t;
2524 
2525 /*
2526  * flowop to calculate various statistics from the number stream
2527  * produced by a random variable. This allows verification that the
2528  * random distribution used to define the random variable is producing
2529  * the expected distribution of random numbers.
2530  */
2531 /* ARGSUSED */
2532 static int
2533 flowoplib_testrandvar(threadflow_t *threadflow, flowop_t *flowop)
2534 {
2535         testrandvar_priv_t      *mystats;
2536         double                  value;
2537 
2538         if ((mystats = (testrandvar_priv_t *)flowop->fo_private) == NULL) {
2539                 filebench_log(LOG_ERROR, "testrandvar not initialized\n");
2540                 filebench_shutdown(1);
2541                 return (-1);
2542         }
2543 
2544         value = avd_get_dbl(flowop->fo_value);
2545 
2546         mystats->sample_count++;
2547         mystats->val_sum += value;
2548         mystats->sqr_sum += (value * value);
2549 
2550         return (0);
2551 }
2552 
2553 /*
2554  * Initialize the private data area used to accumulate the statistics
2555  */
2556 static int
2557 flowoplib_testrandvar_init(flowop_t *flowop)
2558 {
2559         testrandvar_priv_t      *mystats;
2560 
2561         if ((mystats = (testrandvar_priv_t *)
2562             malloc(sizeof (testrandvar_priv_t))) == NULL) {
2563                 filebench_log(LOG_ERROR, "could not initialize testrandvar");
2564                 filebench_shutdown(1);
2565                 return (-1);
2566         }
2567 
2568         mystats->sample_count = 0;
2569         mystats->val_sum = 0;
2570         mystats->sqr_sum = 0;
2571         flowop->fo_private = (void *)mystats;
2572 
2573         (void) ipc_mutex_unlock(&flowop->fo_lock);
2574         return (0);
2575 }
2576 
2577 /*
2578  * Print out the accumulated statistics, and free the private storage
2579  */
2580 static void
2581 flowoplib_testrandvar_destruct(flowop_t *flowop)
2582 {
2583         testrandvar_priv_t      *mystats;
2584         double mean, std_dev, dbl_count;
2585 
2586         (void) ipc_mutex_lock(&flowop->fo_lock);
2587         if ((mystats = (testrandvar_priv_t *)
2588             flowop->fo_private) == NULL) {
2589                 (void) ipc_mutex_unlock(&flowop->fo_lock);
2590                 return;
2591         }
2592 
2593         flowop->fo_private = NULL;
2594         (void) ipc_mutex_unlock(&flowop->fo_lock);
2595 
2596         dbl_count = (double)mystats->sample_count;
2597         mean = mystats->val_sum / dbl_count;
2598         std_dev = sqrt((mystats->sqr_sum / dbl_count) - (mean * mean)) / mean;
2599 
2600         filebench_log(LOG_VERBOSE,
2601             "testrandvar: ops = %llu, mean = %8.2lf, stddev = %8.2lf",
2602             (u_longlong_t)mystats->sample_count, mean, std_dev);
2603         free(mystats);
2604 }
2605 
2606 /*
2607  * prints message to the console from within a thread
2608  */
2609 static int
2610 flowoplib_print(threadflow_t *threadflow, flowop_t *flowop)
2611 {
2612         procflow_t *procflow;
2613 
2614         procflow = threadflow->tf_process;
2615         filebench_log(LOG_INFO,
2616             "Message from process (%s,%d), thread (%s,%d): %s",
2617             procflow->pf_name, procflow->pf_instance,
2618             threadflow->tf_name, threadflow->tf_instance,
2619             avd_get_str(flowop->fo_value));
2620 
2621         return (FILEBENCH_OK);
2622 }
2623 
2624 /*
2625  * Prints usage information for flowop operations.
2626  */
2627 void
2628 flowoplib_usage()
2629 {
2630         (void) fprintf(stderr,
2631             "flowop [openfile|createfile] name=<name>,fileset=<fname>\n");
2632         (void) fprintf(stderr,
2633             "                       [,fd=<file desc num>]\n");
2634         (void) fprintf(stderr, "\n");
2635         (void) fprintf(stderr,
2636             "flowop closefile name=<name>,fd=<file desc num>]\n");
2637         (void) fprintf(stderr, "\n");
2638         (void) fprintf(stderr, "flowop deletefile name=<name>\n");
2639         (void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2640         (void) fprintf(stderr,
2641             "                       [,fd=<file desc num>]\n");
2642         (void) fprintf(stderr, "\n");
2643         (void) fprintf(stderr, "flowop statfile name=<name>\n");
2644         (void) fprintf(stderr, "                       [,fileset=<fname>]\n");
2645         (void) fprintf(stderr,
2646             "                       [,fd=<file desc num>]\n");
2647         (void) fprintf(stderr, "\n");
2648         (void) fprintf(stderr,
2649             "flowop fsync name=<name>,fd=<file desc num>]\n");
2650         (void) fprintf(stderr, "\n");
2651         (void) fprintf(stderr,
2652             "flowop fsyncset name=<name>,fileset=<fname>]\n");
2653         (void) fprintf(stderr, "\n");
2654         (void) fprintf(stderr, "flowop [write|read|aiowrite] name=<name>, \n");
2655         (void) fprintf(stderr,
2656             "                       filename|fileset=<fname>,\n");
2657         (void) fprintf(stderr, "                       iosize=<size>\n");
2658         (void) fprintf(stderr, "                       [,directio]\n");
2659         (void) fprintf(stderr, "                       [,dsync]\n");
2660         (void) fprintf(stderr, "                       [,iters=<count>]\n");
2661         (void) fprintf(stderr, "                       [,random]\n");
2662         (void) fprintf(stderr, "                       [,opennext]\n");
2663         (void) fprintf(stderr, "                       [,workingset=<size>]\n");
2664         (void) fprintf(stderr,
2665             "flowop [appendfile|appendfilerand] name=<name>, \n");
2666         (void) fprintf(stderr,
2667             "                       filename|fileset=<fname>,\n");
2668         (void) fprintf(stderr, "                       iosize=<size>\n");
2669         (void) fprintf(stderr, "                       [,dsync]\n");
2670         (void) fprintf(stderr, "                       [,iters=<count>]\n");
2671         (void) fprintf(stderr, "                       [,workingset=<size>]\n");
2672         (void) fprintf(stderr,
2673             "flowop [readwholefile|writewholefile] name=<name>, \n");
2674         (void) fprintf(stderr,
2675             "                       filename|fileset=<fname>,\n");
2676         (void) fprintf(stderr, "                       iosize=<size>\n");
2677         (void) fprintf(stderr, "                       [,dsync]\n");
2678         (void) fprintf(stderr, "                       [,iters=<count>]\n");
2679         (void) fprintf(stderr, "\n");
2680         (void) fprintf(stderr, "flowop aiowait name=<name>,target="
2681             "<aiowrite-flowop>\n");
2682         (void) fprintf(stderr, "\n");
2683         (void) fprintf(stderr, "flowop sempost name=<name>,"
2684             "target=<semblock-flowop>,\n");
2685         (void) fprintf(stderr,
2686             "                       value=<increment-to-post>\n");
2687         (void) fprintf(stderr, "\n");
2688         (void) fprintf(stderr, "flowop semblock name=<name>,value="
2689             "<decrement-to-receive>,\n");
2690         (void) fprintf(stderr, "                       highwater="
2691             "<inbound-queue-max>\n");
2692         (void) fprintf(stderr, "\n");
2693         (void) fprintf(stderr, "flowop block name=<name>\n");
2694         (void) fprintf(stderr, "\n");
2695         (void) fprintf(stderr,
2696             "flowop wakeup name=<name>,target=<block-flowop>,\n");
2697         (void) fprintf(stderr, "\n");
2698         (void) fprintf(stderr,
2699             "flowop hog name=<name>,value=<number-of-mem-ops>\n");
2700         (void) fprintf(stderr,
2701             "flowop delay name=<name>,value=<number-of-seconds>\n");
2702         (void) fprintf(stderr, "\n");
2703         (void) fprintf(stderr, "flowop eventlimit name=<name>\n");
2704         (void) fprintf(stderr, "flowop bwlimit name=<name>,value=<mb/s>\n");
2705         (void) fprintf(stderr, "flowop iopslimit name=<name>,value=<iop/s>\n");
2706         (void) fprintf(stderr,
2707             "flowop finishoncount name=<name>,value=<ops/s>\n");
2708         (void) fprintf(stderr,
2709             "flowop finishonbytes name=<name>,value=<bytes>\n");
2710         (void) fprintf(stderr, "\n");
2711         (void) fprintf(stderr, "\n");
2712 }