00001 /* 00002 Copyright 2005-2011 Intel Corporation. All Rights Reserved. 00003 00004 The source code contained or described herein and all documents related 00005 to the source code ("Material") are owned by Intel Corporation or its 00006 suppliers or licensors. Title to the Material remains with Intel 00007 Corporation or its suppliers and licensors. The Material is protected 00008 by worldwide copyright laws and treaty provisions. No part of the 00009 Material may be used, copied, reproduced, modified, published, uploaded, 00010 posted, transmitted, distributed, or disclosed in any way without 00011 Intel's prior express written permission. 00012 00013 No license under any patent, copyright, trade secret or other 00014 intellectual property right is granted to or conferred upon you by 00015 disclosure or delivery of the Materials, either expressly, by 00016 implication, inducement, estoppel or otherwise. Any license under such 00017 intellectual property rights must be express and approved by Intel in 00018 writing. 00019 */ 00020 00021 #ifndef __TBB_aggregator_internal_H 00022 #define __TBB_aggregator_internal_H 00023 00024 #include "atomic.h" 00025 #include "tbb_profiling.h" 00026 00027 namespace tbb { 00028 namespace interface6 { 00029 namespace internal { 00030 00031 using namespace tbb::internal; 00032 00034 template <typename Derived> 00035 class aggregated_operation { 00036 public: 00037 uintptr_t status; 00038 Derived *next; 00039 aggregated_operation() : status(0), next(NULL) {} 00040 }; 00041 00043 00048 template < typename handler_type, typename operation_type > 00049 class aggregator { 00050 public: 00051 aggregator() : handler_busy(false) { pending_operations = NULL; } 00052 ~aggregator() {} 00053 00054 void initialize_handler(handler_type h) { handle_operations = h; } 00055 00057 00059 void execute(operation_type *op) { 00060 operation_type *res; 00061 00062 // ITT note: &(op->status) tag is used to cover accesses to this op node. This 00063 // thread has created the operation, and now releases it so that the handler 00064 // thread may handle the associated operation w/o triggering a race condition; 00065 // thus this tag will be acquired just before the operation is handled in the 00066 // handle_operations functor. 00067 call_itt_notify(releasing, &(op->status)); 00068 // insert the operation in the queue 00069 do { 00070 // ITT may flag the following line as a race; it is a false positive: 00071 // This is an atomic read; we don't provide itt_hide_load_word for atomics 00072 op->next = res = pending_operations; // NOT A RACE 00073 } while (pending_operations.compare_and_swap(op, res) != res); 00074 if (!res) { // first in the list; handle the operations 00075 // ITT note: &pending_operations tag covers access to the handler_busy flag, 00076 // which this waiting handler thread will try to set before entering 00077 // handle_operations. 00078 call_itt_notify(acquired, &pending_operations); 00079 start_handle_operations(); 00080 __TBB_ASSERT(op->status, NULL); 00081 } 00082 else { // not first; wait for op to be ready 00083 call_itt_notify(prepare, &(op->status)); 00084 spin_wait_while_eq(op->status, uintptr_t(0)); 00085 itt_load_word_with_acquire(op->status); 00086 } 00087 } 00088 00089 private: 00091 atomic<operation_type *> pending_operations; 00093 uintptr_t handler_busy; 00094 handler_type handle_operations; 00095 00097 void start_handle_operations() { 00098 operation_type *op_list; 00099 00100 // ITT note: &handler_busy tag covers access to pending_operations as it is passed 00101 // between active and waiting handlers. Below, the waiting handler waits until 00102 // the active handler releases, and the waiting handler acquires &handler_busy as 00103 // it becomes the active_handler. The release point is at the end of this 00104 // function, when all operations in pending_operations have been handled by the 00105 // owner of this aggregator. 00106 call_itt_notify(prepare, &handler_busy); 00107 // get the handler_busy: 00108 // only one thread can possibly spin here at a time 00109 spin_wait_until_eq(handler_busy, uintptr_t(0)); 00110 call_itt_notify(acquired, &handler_busy); 00111 // acquire fence not necessary here due to causality rule and surrounding atomics 00112 __TBB_store_with_release(handler_busy, uintptr_t(1)); 00113 00114 // ITT note: &pending_operations tag covers access to the handler_busy flag 00115 // itself. Capturing the state of the pending_operations signifies that 00116 // handler_busy has been set and a new active handler will now process that list's 00117 // operations. 00118 call_itt_notify(releasing, &pending_operations); 00119 // grab pending_operations 00120 op_list = pending_operations.fetch_and_store(NULL); 00121 00122 // handle all the operations 00123 handle_operations(op_list); 00124 00125 // release the handler 00126 itt_store_word_with_release(handler_busy, uintptr_t(0)); 00127 } 00128 }; 00129 00130 } // namespace internal 00131 } // namespace interface6 00132 00133 namespace internal { 00134 using interface6::internal::aggregated_operation; 00135 using interface6::internal::aggregator; 00136 } // namespace internal 00137 00138 } // namespace tbb 00139 00140 #endif