1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 * 25 * Portions Copyright 2008 Denis Cheng 26 */ 27 28 #include "config.h" 29 #include <pthread.h> 30 #ifdef HAVE_LWPS 31 #include <sys/lwp.h> 32 #endif 33 #include <signal.h> 34 35 #include "filebench.h" 36 #include "threadflow.h" 37 #include "flowop.h" 38 #include "ipc.h" 39 40 static threadflow_t *threadflow_define_common(procflow_t *procflow, 41 char *name, threadflow_t *inherit, int instance); 42 43 /* 44 * Threadflows are filebench entities which manage operating system 45 * threads. Each worker threadflow spawns a separate filebench thread, 46 * with attributes inherited from a FLOW_MASTER threadflow created during 47 * f model language parsing. This section contains routines to define, 48 * create, control, and delete threadflows. 49 * 50 * Each thread defined in the f model creates a FLOW_MASTER 51 * threadflow which encapsulates the defined attributes and flowops of 52 * the f language thread, including the number of instances to create. 53 * At runtime, a worker threadflow instance with an associated filebench 54 * thread is created, which runs until told to quit or is specifically 55 * deleted. 56 */ 57 58 59 /* 60 * Prints information about threadflow syntax. 61 */ 62 void 63 threadflow_usage(void) 64 { 65 (void) fprintf(stderr, " thread name=<name>[,instances=<count>]\n"); 66 (void) fprintf(stderr, "\n"); 67 (void) fprintf(stderr, " {\n"); 68 (void) fprintf(stderr, " flowop ...\n"); 69 (void) fprintf(stderr, " flowop ...\n"); 70 (void) fprintf(stderr, " flowop ...\n"); 71 (void) fprintf(stderr, " }\n"); 72 (void) fprintf(stderr, "\n"); 73 } 74 75 /* 76 * Creates a thread for the supplied threadflow. If interprocess 77 * shared memory is desired, then increments the amount of shared 78 * memory needed by the amount specified in the threadflow's 79 * tf_memsize parameter. The thread starts in routine 80 * flowop_start() with a poineter to the threadflow supplied 81 * as the argument. 82 */ 83 static int 84 threadflow_createthread(threadflow_t *threadflow) 85 { 86 fbint_t memsize; 87 memsize = avd_get_int(threadflow->tf_memsize); 88 threadflow->tf_constmemsize = memsize; 89 90 filebench_log(LOG_DEBUG_SCRIPT, "Creating thread %s, memory = %ld", 91 threadflow->tf_name, memsize); 92 93 if (threadflow->tf_attrs & THREADFLOW_USEISM) 94 filebench_shm->shm_required += memsize; 95 96 if (pthread_create(&threadflow->tf_tid, NULL, 97 (void *(*)(void*))flowop_start, threadflow) != 0) { 98 filebench_log(LOG_ERROR, "thread create failed"); 99 filebench_shutdown(1); 100 return (FILEBENCH_ERROR); 101 } 102 103 return (FILEBENCH_OK); 104 } 105 106 /* 107 * Creates threads for the threadflows associated with a procflow. 108 * The routine iterates through the list of threadflows in the 109 * supplied procflow's pf_threads list. For each threadflow on 110 * the list, it defines tf_instances number of cloned 111 * threadflows, and then calls threadflow_createthread() for 112 * each to create and start the actual operating system thread. 113 * Note that each of the newly defined threadflows will be linked 114 * into the procflows threadflow list, but at the head of the 115 * list, so they will not become part of the supplied set. After 116 * all the threads have been created, threadflow_init enters 117 * a join loop for all the threads in the newly defined 118 * threadflows. Once all the created threads have exited, 119 * threadflow_init will return 0. If errors are encountered, it 120 * will return a non zero value. 121 */ 122 int 123 threadflow_init(procflow_t *procflow) 124 { 125 threadflow_t *threadflow = procflow->pf_threads; 126 int ret = 0; 127 128 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock); 129 130 while (threadflow) { 131 threadflow_t *newthread; 132 int instances; 133 int i; 134 135 instances = avd_get_int(threadflow->tf_instances); 136 filebench_log(LOG_VERBOSE, 137 "Starting %d %s threads", 138 instances, threadflow->tf_name); 139 140 for (i = 1; i < instances; i++) { 141 /* Create threads */ 142 newthread = 143 threadflow_define_common(procflow, 144 threadflow->tf_name, threadflow, i + 1); 145 if (newthread == NULL) 146 return (-1); 147 ret |= threadflow_createthread(newthread); 148 } 149 150 newthread = threadflow_define_common(procflow, 151 threadflow->tf_name, 152 threadflow, 1); 153 154 if (newthread == NULL) 155 return (-1); 156 157 /* Create each thread */ 158 ret |= threadflow_createthread(newthread); 159 160 threadflow = threadflow->tf_next; 161 } 162 163 threadflow = procflow->pf_threads; 164 165 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock); 166 167 while (threadflow) { 168 /* wait for all threads to finish */ 169 if (threadflow->tf_tid) { 170 void *status; 171 172 if (pthread_join(threadflow->tf_tid, &status) == 0) 173 ret += *(int *)status; 174 } 175 threadflow = threadflow->tf_next; 176 } 177 178 procflow->pf_running = 0; 179 180 return (ret); 181 } 182 183 /* 184 * Tells the threadflow's thread to stop and optionally signals 185 * its associated process to end the thread. 186 */ 187 static void 188 threadflow_kill(threadflow_t *threadflow) 189 { 190 int wait_cnt = 2; 191 192 /* Tell thread to finish */ 193 threadflow->tf_abort = 1; 194 195 /* wait a bit for threadflow to stop */ 196 while (wait_cnt && threadflow->tf_running) { 197 (void) sleep(1); 198 wait_cnt--; 199 } 200 201 if (threadflow->tf_running) { 202 threadflow->tf_running = FALSE; 203 (void) pthread_kill(threadflow->tf_tid, SIGKILL); 204 } 205 } 206 207 /* 208 * Deletes the specified threadflow from the specified threadflow 209 * list after first terminating the threadflow's thread, deleting 210 * the threadflow's flowops, and finally freeing the threadflow 211 * entity. It also subtracts the threadflow's shared memory 212 * requirements from the total amount required, shm_required. If 213 * the specified threadflow is found, returns 0, otherwise 214 * returns -1. 215 */ 216 static int 217 threadflow_delete(threadflow_t **threadlist, threadflow_t *threadflow) 218 { 219 threadflow_t *entry = *threadlist; 220 221 filebench_log(LOG_DEBUG_IMPL, "Deleting thread: (%s-%d)", 222 threadflow->tf_name, 223 threadflow->tf_instance); 224 225 if (threadflow->tf_attrs & THREADFLOW_USEISM) 226 filebench_shm->shm_required -= threadflow->tf_constmemsize; 227 228 if (threadflow == *threadlist) { 229 /* First on list */ 230 filebench_log(LOG_DEBUG_IMPL, "Deleted thread: (%s-%d)", 231 threadflow->tf_name, 232 threadflow->tf_instance); 233 234 threadflow_kill(threadflow); 235 flowop_delete_all(&threadflow->tf_thrd_fops); 236 *threadlist = threadflow->tf_next; 237 (void) pthread_mutex_destroy(&threadflow->tf_lock); 238 ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow); 239 return (0); 240 } 241 242 while (entry->tf_next) { 243 filebench_log(LOG_DEBUG_IMPL, 244 "Delete thread: (%s-%d) == (%s-%d)", 245 entry->tf_next->tf_name, 246 entry->tf_next->tf_instance, 247 threadflow->tf_name, 248 threadflow->tf_instance); 249 250 if (threadflow == entry->tf_next) { 251 /* Delete */ 252 filebench_log(LOG_DEBUG_IMPL, 253 "Deleted thread: (%s-%d)", 254 entry->tf_next->tf_name, 255 entry->tf_next->tf_instance); 256 threadflow_kill(entry->tf_next); 257 flowop_delete_all(&entry->tf_next->tf_thrd_fops); 258 (void) pthread_mutex_destroy(&threadflow->tf_lock); 259 ipc_free(FILEBENCH_THREADFLOW, (char *)threadflow); 260 entry->tf_next = entry->tf_next->tf_next; 261 return (0); 262 } 263 entry = entry->tf_next; 264 } 265 266 return (-1); 267 } 268 269 /* 270 * Given a pointer to the thread list of a procflow, cycles 271 * through all the threadflows on the list, deleting each one 272 * except the FLOW_MASTER. 273 */ 274 void 275 threadflow_delete_all(threadflow_t **threadlist) 276 { 277 threadflow_t *threadflow; 278 279 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock); 280 281 threadflow = *threadlist; 282 filebench_log(LOG_DEBUG_IMPL, "Deleting all threads"); 283 284 while (threadflow) { 285 if (threadflow->tf_instance && 286 (threadflow->tf_instance == FLOW_MASTER)) { 287 threadflow = threadflow->tf_next; 288 continue; 289 } 290 (void) threadflow_delete(threadlist, threadflow); 291 threadflow = threadflow->tf_next; 292 } 293 294 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock); 295 } 296 297 /* 298 * Waits till all threadflows are started, or a timeout occurs. 299 * Checks through the list of threadflows, waiting up to 10 300 * seconds for each one to set its tf_running flag to 1. If not 301 * set after 10 seconds, continues on to the next threadflow 302 * anyway. 303 */ 304 void 305 threadflow_allstarted(pid_t pid, threadflow_t *threadflow) 306 { 307 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock); 308 309 while (threadflow) { 310 int waits; 311 312 if ((threadflow->tf_instance == 0) || 313 (threadflow->tf_instance == FLOW_MASTER)) { 314 threadflow = threadflow->tf_next; 315 continue; 316 } 317 318 filebench_log(LOG_DEBUG_IMPL, "Checking pid %d thread %s-%d", 319 pid, 320 threadflow->tf_name, 321 threadflow->tf_instance); 322 323 waits = 10; 324 while (waits && (threadflow->tf_running == 0) && 325 (filebench_shm->shm_f_abort == 0)) { 326 (void) ipc_mutex_unlock( 327 &filebench_shm->shm_threadflow_lock); 328 if (waits < 3) 329 filebench_log(LOG_INFO, 330 "Waiting for pid %d thread %s-%d", 331 pid, 332 threadflow->tf_name, 333 threadflow->tf_instance); 334 335 (void) sleep(1); 336 (void) ipc_mutex_lock( 337 &filebench_shm->shm_threadflow_lock); 338 waits--; 339 } 340 341 threadflow = threadflow->tf_next; 342 } 343 344 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock); 345 } 346 347 /* 348 * Create an in-memory thread object linked to a parent procflow. 349 * A threadflow entity is allocated from shared memory and 350 * initialized from the "inherit" threadflow if supplied, 351 * otherwise to zeros. The threadflow is assigned a unique 352 * thread id, the supplied instance number, the supplied name 353 * and added to the procflow's pf_thread list. If no name is 354 * supplied or the threadflow can't be allocated, NULL is 355 * returned Otherwise a pointer to the newly allocated threadflow 356 * is returned. 357 * 358 * The filebench_shm->shm_threadflow_lock must be held by the caller. 359 */ 360 static threadflow_t * 361 threadflow_define_common(procflow_t *procflow, char *name, 362 threadflow_t *inherit, int instance) 363 { 364 threadflow_t *threadflow; 365 threadflow_t **threadlistp = &procflow->pf_threads; 366 367 if (name == NULL) 368 return (NULL); 369 370 threadflow = (threadflow_t *)ipc_malloc(FILEBENCH_THREADFLOW); 371 372 if (threadflow == NULL) 373 return (NULL); 374 375 if (inherit) 376 (void) memcpy(threadflow, inherit, sizeof (threadflow_t)); 377 else 378 (void) memset(threadflow, 0, sizeof (threadflow_t)); 379 380 threadflow->tf_utid = ++filebench_shm->shm_utid; 381 382 threadflow->tf_instance = instance; 383 (void) strcpy(threadflow->tf_name, name); 384 threadflow->tf_process = procflow; 385 (void) pthread_mutex_init(&threadflow->tf_lock, 386 ipc_mutexattr(IPC_MUTEX_NORMAL)); 387 388 filebench_log(LOG_DEBUG_IMPL, "Defining thread %s-%d", 389 name, instance); 390 391 /* Add threadflow to list */ 392 if (*threadlistp == NULL) { 393 *threadlistp = threadflow; 394 threadflow->tf_next = NULL; 395 } else { 396 threadflow->tf_next = *threadlistp; 397 *threadlistp = threadflow; 398 } 399 400 return (threadflow); 401 } 402 403 /* 404 * Create an in memory FLOW_MASTER thread object as described 405 * by the syntax. Acquire the filebench_shm->shm_threadflow_lock and 406 * call threadflow_define_common() to create a threadflow entity. 407 * Set the number of instances to create at runtime, 408 * tf_instances, to "instances". Return the threadflow pointer 409 * returned by the threadflow_define_common call. 410 */ 411 threadflow_t * 412 threadflow_define(procflow_t *procflow, char *name, 413 threadflow_t *inherit, avd_t instances) 414 { 415 threadflow_t *threadflow; 416 417 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock); 418 419 if ((threadflow = threadflow_define_common(procflow, name, 420 inherit, FLOW_MASTER)) == NULL) 421 return (NULL); 422 423 threadflow->tf_instances = instances; 424 425 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock); 426 427 return (threadflow); 428 } 429 430 431 /* 432 * Searches the provided threadflow list for the named threadflow. 433 * A pointer to the threadflow is returned, or NULL if threadflow 434 * is not found. 435 */ 436 threadflow_t * 437 threadflow_find(threadflow_t *threadlist, char *name) 438 { 439 threadflow_t *threadflow = threadlist; 440 441 (void) ipc_mutex_lock(&filebench_shm->shm_threadflow_lock); 442 443 while (threadflow) { 444 if (strcmp(name, threadflow->tf_name) == 0) { 445 446 (void) ipc_mutex_unlock( 447 &filebench_shm->shm_threadflow_lock); 448 449 return (threadflow); 450 } 451 threadflow = threadflow->tf_next; 452 } 453 454 (void) ipc_mutex_unlock(&filebench_shm->shm_threadflow_lock); 455 456 457 return (NULL); 458 }