관리-도구
편집 파일: transport.c
/* -*- linux-c -*- * Transport Functions * Copyright (C) 2013 Red Hat Inc. * * This file is part of systemtap, and is free software. You can * redistribute it and/or modify it under the terms of the GNU General * Public License (GPL); either version 2, or (at your option) any * later version. */ #ifndef _STAPDYN_TRANSPORT_C_ #define _STAPDYN_TRANSPORT_C_ #include <time.h> #include <unistd.h> #include <sys/types.h> #include <spawn.h> #include <sys/syscall.h> #include <errno.h> #include <string.h> #include <search.h> #include <signal.h> #include "transport.h" //////////////////////////////////////// // // GENERAL TRANSPORT OVERVIEW // // Each context structure has a '_stp_transport_context_data' // structure (described in more detail later) in it, which contains // that context's print and log (warning/error) buffers. There is a // session-wide double-buffered queue (stored in the // '_stp_transport_session_data' structure) where each probe can send // print/control messages to a fairly simple consumer thread (see // _stp_dyninst_transport_thread_func() for details). The consumer // thread swaps the read/write queues, then handles each request. // // Note that there is as little as possible data copying going on. A // probe adds data to a print/log buffer stored in shared memory, then // the consumer queue outputs the data from that same buffer. // // // QUEUE OVERVIEW // // See the session-wide queue's definition in transport.h. It is // composed of the '_stp_transport_queue_item', '_stp_transport_queue' // and '_stp_transport_session_data' structures. // // The queue is double-buffered and stored in shared memory. Because // it is session-wide, and multiple threads can be trying to add data // to it simultaneously, the 'queue_mutex' is used to serialize // access. Probes write to the write queue. When the consumer thread // realizes data is available, it swaps the read/write queues (by // changing the 'write_queue' value) and then processes each // '_stp_transport_queue_item' on the read queue. // // If the queue is full, probes will wait on the 'queue_space_avail' // condition variable for more space. The consumer thread sets // 'queue_space_avail' when it swaps the read/write queues. // // The consumer thread waits on the 'queue_data_avail' condition // variable to know when more items are available. When probes add // items to the queue (using __stp_dyninst_transport_queue_add()), // 'queue_data_avail' gets set. // // // LOG BUFFER OVERVIEW // // See the context-specific log buffer's (struct // _stp_transport_context_data) definition in transport.h. // // The log buffer, used for warning/error messages, is stored in // shared memory. Each context structure has its own log buffer. Each // log buffer logically contains '_STP_LOG_BUF_ENTRIES' buffers of // length 'STP_LOG_BUF_LEN'. In other words, the log buffer allocation // is done in chunks of size 'STP_LOG_BUF_LEN'. The log buffer is // circular, and the indices use an extra most significant bit to // indicate wrapping. // // Only the consumer thread removes items from the log buffer. The // log buffer is circular, and the indices use an extra most // significant bit to indicate wrapping. // // If the log buffer is full, probes will wait on the // 'log_space_avail' condition variable for more space. The consumer // thread sets 'log_space_avail' after finishing with a particular log // buffer chunk. // // Note that the read index 'log_start' is only written to by the // consumer thread and that the write index 'log_end' is only written // to by the probes (with a locked context). // // // PRINT BUFFER OVERVIEW // // See the context-specific print buffer definition (struct // _stp_transport_context_data) in transport.h. // // The print buffer is stored in shared memory. Each context structure // has its own print buffer. The print buffer really isn't a true // circular buffer, it is more like a "semi-cicular" buffer. If a // reservation request won't fit after the write offset, we go ahead // and wrap around to the beginning (if available), leaving an unused // gap at the end of the buffer. This is done to not break up // reservation requests. Like a circular buffer, the offsets use an // extra most significant bit to indicate wrapping. // // Only the consumer thread (normally) removes items from the print // buffer. It is possible to 'unreserve' bytes using // _stp_dyninst_transport_unreserve_bytes() if the bytes haven't been // flushed. // // If the print buffer doesn't have enough bytes available, probes // will flush any reserved bytes earlier than normal, then wait on the // 'print_space_avail' condition variable for more space to become // available. The consumer thread sets 'print_space_avail' after // finishing with a particular print buffer segment. // // Note that the read index 'read_offset' is only written to by the // consumer thread and that the write index 'write_offset' (and number // of bytes to write 'write_bytes) is only written to by the probes // (with a locked context). // //////////////////////////////////////// static pthread_t _stp_transport_thread; static int _stp_transport_thread_started = 0; #ifndef STP_DYNINST_TIMEOUT_SECS #define STP_DYNINST_TIMEOUT_SECS 5 #endif // When we're converting an circular buffer/index into a pointer // value, we need the "normalized" value (i.e. one without the extra // msb possibly set). #define _STP_D_T_LOG_NORM(x) ((x) & (_STP_LOG_BUF_ENTRIES - 1)) #define _STP_D_T_PRINT_NORM(x) ((x) & (_STP_DYNINST_BUFFER_SIZE - 1)) // Define a macro to generically add circular buffer // offsets/indicies. #define __STP_D_T_ADD(offset, increment, buffer_size) \ (((offset) + (increment)) & (2 * (buffer_size) - 1)) // Using __STP_D_T_ADD(), define a specific macro for each circular // buffer. #define _STP_D_T_LOG_INC(offset) \ __STP_D_T_ADD((offset), 1, _STP_LOG_BUF_ENTRIES) #define _STP_D_T_PRINT_ADD(offset, increment) \ __STP_D_T_ADD((offset), (increment), _STP_DYNINST_BUFFER_SIZE) // Return a pointer to the session's current write queue. #define _STP_D_T_WRITE_QUEUE(sess_data) \ (&((sess_data)->queues[(sess_data)->write_queue])) // Limit remembered strings in __stp_d_t_eliminate_duplicate_warnings #define MAX_STORED_WARNINGS 1024 // If the transport has an error or debug message to print, it can't very well // recurse on itself, so we just print to the local stderr and hope... static void _stp_transport_err (const char *fmt, ...) __attribute ((format (printf, 1, 2))); static void _stp_transport_err (const char *fmt, ...) { va_list args; va_start(args, fmt); vfprintf (stderr, fmt, args); va_end(args); } #ifdef DEBUG_TRANS #define _stp_transport_debug(fmt, ...) \ _stp_transport_err("%s:%d - " fmt, __FUNCTION__, __LINE__, ##__VA_ARGS__) #else #define _stp_transport_debug(fmt, ...) do { } while(0) #endif static void __stp_dyninst_transport_queue_add(unsigned type, int data_index, size_t offset, size_t bytes) { struct _stp_transport_session_data *sess_data = stp_transport_data(); if (sess_data == NULL) return; pthread_mutex_lock(&(sess_data->queue_mutex)); // While the write queue is full, wait. while (_STP_D_T_WRITE_QUEUE(sess_data)->items == (STP_DYNINST_QUEUE_ITEMS - 1)) { pthread_cond_wait(&(sess_data->queue_space_avail), &(sess_data->queue_mutex)); } struct _stp_transport_queue *q = _STP_D_T_WRITE_QUEUE(sess_data); struct _stp_transport_queue_item *item = &(q->queue[q->items]); q->items++; item->type = type; item->data_index = data_index; item->offset = offset; item->bytes = bytes; pthread_cond_signal(&(sess_data->queue_data_avail)); pthread_mutex_unlock(&(sess_data->queue_mutex)); } /* Handle duplicate warning elimination. Returns 0 if we've seen this * warning (and should be eliminated), 1 otherwise. */ static int __stp_d_t_eliminate_duplicate_warnings(char *data, size_t bytes) { static void *seen = 0; static unsigned seen_count = 0; char *dupstr = strndup (data, bytes); char *retval; int rc = 1; if (! dupstr) { /* OOM, should not happen. */ return 1; } retval = tfind (dupstr, &seen, (int (*)(const void*, const void*))strcmp); if (! retval) { /* new message */ /* We set a maximum for stored warning messages, to * prevent a misbehaving script/environment from * emitting countless _stp_warn()s, and overflow * staprun's memory. */ if (seen_count++ == MAX_STORED_WARNINGS) { _stp_transport_err("WARNING deduplication table full\n"); free (dupstr); } else if (seen_count > MAX_STORED_WARNINGS) { /* Be quiet in the future, but stop counting * to preclude overflow. */ free (dupstr); seen_count = MAX_STORED_WARNINGS + 1; } else if (seen_count < MAX_STORED_WARNINGS) { /* NB: don't free dupstr; it's going into the tree. */ retval = tsearch (dupstr, & seen, (int (*)(const void*, const void*))strcmp); if (retval == 0) { /* OOM, should not happen. Next time * we should get the 'full' * message. */ free (dupstr); seen_count = MAX_STORED_WARNINGS; } } } else { /* old message */ free (dupstr); rc = 0; } return rc; } static void __stp_d_t_run_command(char *command) { /* * FIXME: We'll need to make sure the output from system goes * to the correct file descriptor. We may need some posix file * actions to pass to posix_spawnp(). */ char *spawn_argv[4] = { "sh", "-c", command, NULL }; int rc = posix_spawnp(NULL, "sh", NULL, NULL, spawn_argv, NULL); if (rc != 0) { _stp_transport_err("ERROR: %s : %s\n", command, strerror(rc)); } /* Notice we're not waiting on the resulting process to finish. */ } static void __stp_d_t_request_exit(void) { /* * We want stapdyn to trigger this module's exit code from outside. It * knows to do this on receipt of signals, so we must kill ourselves. * The signal handler will forward that to the main thread. * * NB: If the target process was created rather than attached, SIGTERM * waits for it to exit. SIGQUIT always exits immediately. It's * somewhat debateable which is most appropriate here... */ pthread_kill(pthread_self(), SIGTERM); } static ssize_t _stp_write_retry(int fd, const void *buf, size_t count) { size_t remaining = count; while (remaining > 0) { ssize_t ret = write(fd, buf, remaining); if (ret >= 0) { buf += ret; remaining -= ret; } else if (errno != EINTR) { return ret; } } return count; } static int stap_strfloctime(char *buf, size_t max, const char *fmt, time_t t) { struct tm tm; size_t ret; if (buf == NULL || fmt == NULL || max <= 1) return -EINVAL; localtime_r(&t, &tm); /* NB: this following invocation means that stapdyn modules can't be checked with -Wformat-nonliteral. See compile_dyninst() buildrun.cxx for the flags chosen. strftime parsing does not have security implications AFAIK, but gcc still wants to check them. */ ret = strftime(buf, max, fmt, &tm); if (ret == 0) return -EINVAL; return (int)ret; } static void * _stp_dyninst_transport_thread_func(void *arg __attribute((unused))) { int stopping = 0; int out_fd, err_fd; struct _stp_transport_session_data *sess_data = stp_transport_data(); if (sess_data == NULL) return NULL; if (strlen(stp_session_attributes()->outfile_name)) { char buf[PATH_MAX]; int rc; rc = stap_strfloctime(buf, PATH_MAX, stp_session_attributes()->outfile_name, time(NULL)); if (rc < 0) { _stp_transport_err("Invalid FILE name format\n"); return NULL; } out_fd = open (buf, O_CREAT|O_TRUNC|O_WRONLY|O_CLOEXEC, 0666); if (out_fd < 0) { _stp_transport_err("ERROR: Couldn't open output file %s: %s\n", buf, strerror(rc)); return NULL; } } else out_fd = STDOUT_FILENO; err_fd = STDERR_FILENO; if (out_fd < 0 || err_fd < 0) return NULL; while (! stopping) { struct _stp_transport_queue *q; struct _stp_transport_queue_item *item; struct context *c; struct _stp_transport_context_data *data; void *read_ptr; pthread_mutex_lock(&(sess_data->queue_mutex)); // While there are no queue entries, wait. q = _STP_D_T_WRITE_QUEUE(sess_data); while (q->items == 0) { // Mutex is locked. It is automatically // unlocked while we are waiting. pthread_cond_wait(&(sess_data->queue_data_avail), &(sess_data->queue_mutex)); // Mutex is locked again. } // We've got data. Swap the queues and let any waiters // know there is more space available. sess_data->write_queue ^= 1; pthread_cond_broadcast(&(sess_data->queue_space_avail)); pthread_mutex_unlock(&(sess_data->queue_mutex)); // Note that we're processing the read queue with no // locking. This is possible since no other thread // will be accessing it until we're finished with it // (and we make it the write queue). // Process the queue twice. First handle the OOB data types. for (size_t i = 0; i < q->items; i++) { int write_data = 1; item = &(q->queue[i]); if (! (item->type & STP_DYN_OOB_DATA_MASK)) continue; c = stp_session_context(item->data_index); data = &c->transport_data; read_ptr = data->log_buf + item->offset; switch (item->type) { case STP_DYN_OOB_DATA: _stp_transport_debug( "STP_DYN_OOB_DATA (%ld bytes at offset %ld)\n", item->bytes, item->offset); /* Note that "WARNING:" should not be * translated, since it is part of the * module cmd protocol. */ if (strncmp(read_ptr, "WARNING:", 7) == 0) { if (stp_session_attributes()->suppress_warnings) { write_data = 0; } /* If we're not verbose, eliminate * duplicate warning messages. */ else if (stp_session_attributes()->log_level == 0) { write_data = __stp_d_t_eliminate_duplicate_warnings(read_ptr, item->bytes); } } /* "ERROR:" also should not be translated. */ else if (strncmp(read_ptr, "ERROR:", 5) == 0) { if (_stp_exit_status == 0) _stp_exit_status = 1; } if (! write_data) { break; } if (_stp_write_retry(err_fd, read_ptr, item->bytes) < 0) _stp_transport_err( "couldn't write %ld bytes OOB data: %s\n", (long)item->bytes, strerror(errno)); break; case STP_DYN_SYSTEM: _stp_transport_debug("STP_DYN_SYSTEM (%.*s) %d bytes\n", (int)item->bytes, (char *)read_ptr, (int)item->bytes); /* * Note that the null character is * already included in the system * string. */ __stp_d_t_run_command(read_ptr); break; default: _stp_transport_err( "Error - unknown OOB item type %d\n", item->type); break; } // Signal there is a log buffer available to // any waiters. data->log_start = _STP_D_T_LOG_INC(data->log_start); pthread_mutex_lock(&(data->log_mutex)); pthread_cond_signal(&(data->log_space_avail)); pthread_mutex_unlock(&(data->log_mutex)); } // Handle the non-OOB data. for (size_t i = 0; i < q->items; i++) { item = &(q->queue[i]); switch (item->type) { case STP_DYN_NORMAL_DATA: _stp_transport_debug("STP_DYN_NORMAL_DATA" " (%ld bytes at offset %ld)\n", item->bytes, item->offset); c = stp_session_context(item->data_index); data = &c->transport_data; read_ptr = (data->print_buf + _STP_D_T_PRINT_NORM(item->offset)); if (_stp_write_retry(out_fd, read_ptr, item->bytes) < 0) _stp_transport_err( "couldn't write %ld bytes data: %s\n", (long)item->bytes, strerror(errno)); pthread_mutex_lock(&(data->print_mutex)); // Now we need to update the read // pointer, using the data_index we // received. Note that we're doing // this with or without that context // locked, but the print_mutex is // locked. data->read_offset = _STP_D_T_PRINT_ADD(item->offset, item->bytes); // Signal more bytes available to any waiters. pthread_cond_signal(&(data->print_space_avail)); pthread_mutex_unlock(&(data->print_mutex)); _stp_transport_debug( "STP_DYN_NORMAL_DATA flushed," " read_offset %ld, write_offset %ld)\n", data->read_offset, data->write_offset); break; case STP_DYN_EXIT: _stp_transport_debug("STP_DYN_EXIT\n"); stopping = 1; break; case STP_DYN_REQUEST_EXIT: _stp_transport_debug("STP_DYN_REQUEST_EXIT\n"); __stp_d_t_request_exit(); break; default: if (! (item->type & STP_DYN_OOB_DATA_MASK)) { _stp_transport_err( "Error - unknown item type" " %d\n", item->type); } break; } } // We're now finished with the read queue. Clear it // out. q->items = 0; } return NULL; } static int _stp_ctl_send(int type, void *data, unsigned len) { _stp_transport_debug("type 0x%x data %p len %d\n", type, data, len); // This thread should already have a context structure. struct context* c = _stp_runtime_get_context(); if (c == NULL) return EINVAL; // Currently, we're only handling 'STP_SYSTEM' control // messages, converting it to a STP_DYN_SYSTEM message. if (type != STP_SYSTEM) return 0; char *buffer = _stp_dyninst_transport_log_buffer(); if (buffer == NULL) return 0; memcpy(buffer, data, len); size_t offset = buffer - c->transport_data.log_buf; __stp_dyninst_transport_queue_add(STP_DYN_SYSTEM, c->data_index, offset, len); return len; } static void _stp_dyninst_transport_signal_exit(void) { __stp_dyninst_transport_queue_add(STP_DYN_EXIT, 0, 0, 0); } static void _stp_dyninst_transport_request_exit(void) { __stp_dyninst_transport_queue_add(STP_DYN_REQUEST_EXIT, 0, 0, 0); } static int _stp_dyninst_transport_session_init(void) { int rc; // Set up the transport session data. struct _stp_transport_session_data *sess_data = stp_transport_data(); if (sess_data != NULL) { rc = stp_pthread_mutex_init_shared(&(sess_data->queue_mutex)); if (rc != 0) { _stp_error("transport queue mutex initialization" " failed"); return rc; } rc = stp_pthread_cond_init_shared(&(sess_data->queue_space_avail)); if (rc != 0) { _stp_error("transport queue space avail cond variable" " initialization failed"); return rc; } rc = stp_pthread_cond_init_shared(&(sess_data->queue_data_avail)); if (rc != 0) { _stp_error("transport queue empty cond variable" " initialization failed"); return rc; } } // Set up each context's transport data. int i; for_each_possible_cpu(i) { struct context *c; struct _stp_transport_context_data *data; c = stp_session_context(i); if (c == NULL) continue; data = &c->transport_data; rc = stp_pthread_mutex_init_shared(&(data->print_mutex)); if (rc != 0) { _stp_error("transport mutex initialization failed"); return rc; } rc = stp_pthread_cond_init_shared(&(data->print_space_avail)); if (rc != 0) { _stp_error("transport cond variable initialization failed"); return rc; } rc = stp_pthread_mutex_init_shared(&(data->log_mutex)); if (rc != 0) { _stp_error("transport log mutex initialization failed"); return rc; } rc = stp_pthread_cond_init_shared(&(data->log_space_avail)); if (rc != 0) { _stp_error("transport log cond variable initialization failed"); return rc; } } return 0; } static int _stp_dyninst_transport_session_start(void) { int rc; // Start the thread. rc = pthread_create(&_stp_transport_thread, NULL, &_stp_dyninst_transport_thread_func, NULL); if (rc != 0) { _stp_error("transport thread creation failed (%d)", rc); return rc; } _stp_transport_thread_started = 1; return 0; } static int _stp_dyninst_transport_write_oob_data(char *buffer, size_t bytes) { // This thread should already have a context structure. struct context* c = _stp_runtime_get_context(); if (c == NULL) return EINVAL; size_t offset = buffer - c->transport_data.log_buf; __stp_dyninst_transport_queue_add(STP_DYN_OOB_DATA, c->data_index, offset, bytes); return 0; } static int _stp_dyninst_transport_write(void) { // This thread should already have a context structure. struct context* c = _stp_runtime_get_context(); if (c == NULL) return 0; struct _stp_transport_context_data *data = &c->transport_data; size_t bytes = data->write_bytes; if (bytes == 0) return 0; // This should be thread-safe without using any additional // locking. This probe is the only one using this context and // the transport thread (the consumer) only writes to // 'read_offset'. Any concurrent-running probe will be using a // different context. _stp_transport_debug( "read_offset %ld, write_offset %ld, write_bytes %ld\n", data->read_offset, data->write_offset, data->write_bytes); // Notice we're not normalizing 'write_offset'. The consumer // thread needs "raw" offsets. size_t saved_write_offset = data->write_offset; data->write_bytes = 0; // Note that if we're writing all remaining bytes in the // buffer, it can wrap (but only to either "high" or "low" // 0). data->write_offset = _STP_D_T_PRINT_ADD(data->write_offset, bytes); __stp_dyninst_transport_queue_add(STP_DYN_NORMAL_DATA, c->data_index, saved_write_offset, bytes); return 0; } static void _stp_dyninst_transport_shutdown(void) { // If we started the thread, tear everything down. if (_stp_transport_thread_started != 1) { return; } // Signal the thread to stop. _stp_dyninst_transport_signal_exit(); // Wait for thread to quit... pthread_join(_stp_transport_thread, NULL); _stp_transport_thread_started = 0; // Tear down the transport session data. struct _stp_transport_session_data *sess_data = stp_transport_data(); if (sess_data != NULL) { pthread_mutex_destroy(&(sess_data->queue_mutex)); pthread_cond_destroy(&(sess_data->queue_space_avail)); pthread_cond_destroy(&(sess_data->queue_data_avail)); } // Tear down each context's transport data. int i; for_each_possible_cpu(i) { struct context *c; struct _stp_transport_context_data *data; c = stp_session_context(i); if (c == NULL) continue; data = &c->transport_data; pthread_mutex_destroy(&(data->print_mutex)); pthread_cond_destroy(&(data->print_space_avail)); pthread_mutex_destroy(&(data->log_mutex)); pthread_cond_destroy(&(data->log_space_avail)); } } static int _stp_dyninst_transport_log_buffer_full(struct _stp_transport_context_data *data) { // This inverts the most significant bit of 'log_start' before // comparison. return (data->log_end == (data->log_start ^ _STP_LOG_BUF_ENTRIES)); } static char *_stp_dyninst_transport_log_buffer(void) { // This thread should already have a context structure. struct context* c = _stp_runtime_get_context(); if (c == NULL) return NULL; // Note that the context structure is locked, so only one // probe at a time can be operating on it. struct _stp_transport_context_data *data = &c->transport_data; // If there isn't an available log buffer, wait. if (_stp_dyninst_transport_log_buffer_full(data)) { pthread_mutex_lock(&(data->log_mutex)); while (_stp_dyninst_transport_log_buffer_full(data)) { pthread_cond_wait(&(data->log_space_avail), &(data->log_mutex)); } pthread_mutex_unlock(&(data->log_mutex)); } // Note that we're taking 'log_end' and normalizing it to start // at 0 to get the proper entry number. We then multiply it by // STP_LOG_BUF_LEN to find the proper buffer offset. // // Every "allocation" here is done in STP_LOG_BUF_LEN-sized // chunks. char *ptr = &data->log_buf[_STP_D_T_LOG_NORM(data->log_end) * STP_LOG_BUF_LEN]; // Increment 'log_end'. data->log_end = _STP_D_T_LOG_INC(data->log_end); return ptr; } static size_t __stp_d_t_space_before(struct _stp_transport_context_data *data, size_t read_offset) { // If the offsets have differing most significant bits, then // the write offset has wrapped, so there isn't any available // space before the write offset. if ((read_offset & _STP_DYNINST_BUFFER_SIZE) != (data->write_offset & _STP_DYNINST_BUFFER_SIZE)) { return 0; } return (_STP_D_T_PRINT_NORM(read_offset)); } static size_t __stp_d_t_space_after(struct _stp_transport_context_data *data, size_t read_offset) { // We have to worry about wraparound here, in the case of a // full buffer. size_t write_end_offset = _STP_D_T_PRINT_ADD(data->write_offset, data->write_bytes); // If the offsets have differing most significant bits, then // the write offset has wrapped, so the only available space // after the write offset is between the (normalized) write // offset and the (normalized) read offset. if ((read_offset & _STP_DYNINST_BUFFER_SIZE) != (write_end_offset & _STP_DYNINST_BUFFER_SIZE)) { return (_STP_D_T_PRINT_NORM(read_offset) - _STP_D_T_PRINT_NORM(write_end_offset)); } return (_STP_DYNINST_BUFFER_SIZE - _STP_D_T_PRINT_NORM(write_end_offset)); } static void *_stp_dyninst_transport_reserve_bytes(int numbytes) { void *ret; // This thread should already have a context structure. struct context* c = _stp_runtime_get_context(); if (c == NULL) { _stp_transport_debug("NULL context!\n"); return NULL; } struct _stp_transport_context_data *data = &c->transport_data; size_t space_before, space_after, read_offset; recheck: pthread_mutex_lock(&(data->print_mutex)); // If the buffer is empty, reset everything to the // beginning. This cuts down on fragmentation. if (data->write_bytes == 0 && data->read_offset == data->write_offset && data->read_offset != 0) { data->read_offset = 0; data->write_offset = 0; } // We cache the read_offset value to get a consistent view of // the buffer (between calls to get the space before/after). read_offset = data->read_offset; pthread_mutex_unlock(&(data->print_mutex)); space_before = __stp_d_t_space_before(data, read_offset); space_after = __stp_d_t_space_after(data, read_offset); // If we don't have enough space, try to get more space by // flushing and/or waiting. if (space_before < numbytes && space_after < numbytes) { // First, lock the mutex. pthread_mutex_lock(&(data->print_mutex)); // There is a race condition here. We've checked for // available free space, then locked the mutex. It is // possible for more free space to have become // available between the time we checked and the time // we locked the mutex. Recheck the available free // space. read_offset = data->read_offset; space_before = __stp_d_t_space_before(data, read_offset); space_after = __stp_d_t_space_after(data, read_offset); // If we still don't have enough space and we have // data we haven't flushed, go ahead and flush to free // up space. if (space_before < numbytes && space_after < numbytes && data->write_bytes != 0) { // Flush the buffer. We have to do this while // the mutex is locked, so that we can't miss // the condition change. (If we did flush // without the mutex locked, it would be // possible for the consumer thread to signal // the condition variable before we were // waiting on it.) _stp_dyninst_transport_write(); // Mutex is locked. It is automatically // unlocked while we are waiting. pthread_cond_wait(&(data->print_space_avail), &(data->print_mutex)); // Mutex is locked again. // Recheck available free space. read_offset = data->read_offset; space_before = __stp_d_t_space_before(data, read_offset); space_after = __stp_d_t_space_after(data, read_offset); } // If we don't have enough bytes available, do a timed // wait for more bytes to become available. This might // fail if there isn't anything in the queue for this // context structure. if (space_before < numbytes && space_after < numbytes) { _stp_transport_debug( "waiting for more space, numbytes %d," " before %ld, after %ld\n", numbytes, space_before, space_after); // Setup a timeout for // STP_DYNINST_TIMEOUT_SECS seconds into the // future. struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += STP_DYNINST_TIMEOUT_SECS; // Mutex is locked. It is automatically // unlocked while we are waiting. pthread_cond_timedwait(&(data->print_space_avail), &(data->print_mutex), &ts); // When pthread_cond_timedwait() returns, the // mutex has been (re)locked. // Now see if we've got more bytes available. read_offset = data->read_offset; space_before = __stp_d_t_space_before(data, read_offset); space_after = __stp_d_t_space_after(data, read_offset); } // We're finished with the mutex. pthread_mutex_unlock(&(data->print_mutex)); // If we *still* don't have enough space available, // quit. We've done all we can do. if (space_before < numbytes && space_after < numbytes) { _stp_transport_debug( "not enough space available," " numbytes %d, before %ld, after %ld," " read_offset %ld, write_offset %ld\n", numbytes, space_before, space_after, read_offset, data->write_offset); return NULL; } } // OK, now we have enough space, either before or after the // current write offset. // // We prefer using the size after the current write, which // will help keep writes contiguous. if (space_after >= numbytes) { ret = (data->print_buf + _STP_D_T_PRINT_NORM(data->write_offset) + data->write_bytes); data->write_bytes += numbytes; _stp_transport_debug( "reserve %d bytes after, bytes available" " (%ld, %ld) read_offset %ld, write_offset %ld," " write_bytes %ld\n", numbytes, space_before, space_after, data->read_offset, data->write_offset, data->write_bytes); return ret; } // OK, now we know we need to use the space before the write // offset. If we've got existing bytes that haven't been // flushed, flush them now. if (data->write_bytes != 0) { _stp_dyninst_transport_write(); // Flushing the buffer updates the write_offset, which // could have caused it to wrap. Start all over. _stp_transport_debug( "rechecking available bytes after a flush...\n"); goto recheck; } // Wrap the offset around by inverting the most significant // bit, then clearing out the lower bits. data->write_offset = ((data->write_offset ^ _STP_DYNINST_BUFFER_SIZE) & _STP_DYNINST_BUFFER_SIZE); ret = data->print_buf; data->write_bytes += numbytes; _stp_transport_debug( "reserve %d bytes before, bytes available" " (%ld, %ld) read_offset %ld, write_offset %ld," " write_bytes %ld\n", numbytes, space_before, space_after, data->read_offset, data->write_offset, data->write_bytes); return ret; } static void _stp_dyninst_transport_unreserve_bytes(int numbytes) { // This thread should already have a context structure. struct context* c = _stp_runtime_get_context(); if (c == NULL) return; struct _stp_transport_context_data *data = &c->transport_data; if (unlikely(numbytes <= 0 || numbytes > data->write_bytes)) return; data->write_bytes -= numbytes; } #endif /* _STAPDYN_TRANSPORT_C_ */