00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_graph_H
00022 #define __TBB_graph_H
00023
00024 #if !TBB_PREVIEW_GRAPH
00025 #error Set TBB_PREVIEW_GRAPH to include graph.h
00026 #endif
00027
00028 #include "tbb_stddef.h"
00029 #include "atomic.h"
00030 #include "spin_mutex.h"
00031 #include "null_mutex.h"
00032 #include "spin_rw_mutex.h"
00033 #include "null_rw_mutex.h"
00034 #include "task.h"
00035 #include "concurrent_vector.h"
00036 #include "_aggregator_internal.h"
00037
00038
00039 #if TBB_IMPLEMENT_CPP0X && (!defined(_MSC_VER) || _MSC_VER < 1600)
00040 #define TBB_PREVIEW_TUPLE 1
00041 #include "compat/tuple"
00042 #else
00043 #include <tuple>
00044 #endif
00045
00046 #include<list>
00047 #include<queue>
00048
00049
00060 namespace tbb {
00061
00063 class graph_node {
00064 public:
00065 virtual ~graph_node() {}
00066 };
00067
00069 class continue_msg {};
00070
00071 template< typename T > class sender;
00072 template< typename T > class receiver;
00073 class continue_receiver;
00074
00076 template< typename T >
00077 class sender {
00078 public:
00080 typedef T output_type;
00081
00083 typedef receiver<T> successor_type;
00084
00085 virtual ~sender() {}
00086
00088 virtual bool register_successor( successor_type &r ) = 0;
00089
00091 virtual bool remove_successor( successor_type &r ) = 0;
00092
00094 virtual bool try_get( T & ) { return false; }
00095
00097 virtual bool try_reserve( T & ) { return false; }
00098
00100 virtual bool try_release( ) { return false; }
00101
00103 virtual bool try_consume( ) { return false; }
00104
00105 };
00106
00107
00109 template< typename T >
00110 class receiver {
00111 public:
00112
00114 typedef T input_type;
00115
00117 typedef sender<T> predecessor_type;
00118
00120 virtual ~receiver() {}
00121
00123 virtual bool try_put( T t ) = 0;
00124
00126 virtual bool register_predecessor( predecessor_type & ) { return false; }
00127
00129 virtual bool remove_predecessor( predecessor_type & ) { return false; }
00130
00131 };
00132
00134
00135 class continue_receiver : public receiver< continue_msg > {
00136 public:
00137
00139 typedef continue_msg input_type;
00140
00142 typedef sender< continue_msg > predecessor_type;
00143
00145 continue_receiver( int number_of_predecessors = 0 ) {
00146 my_predecessor_count = number_of_predecessors;
00147 my_current_count = 0;
00148 }
00149
00151 virtual ~continue_receiver() { }
00152
00154 bool register_predecessor( predecessor_type & ) {
00155 spin_mutex::scoped_lock l(my_mutex);
00156 ++my_predecessor_count;
00157 return true;
00158 }
00159
00161
00164 bool remove_predecessor( predecessor_type & ) {
00165 spin_mutex::scoped_lock l(my_mutex);
00166 --my_predecessor_count;
00167 return true;
00168 }
00169
00171
00173 bool try_put( input_type ) {
00174 {
00175 spin_mutex::scoped_lock l(my_mutex);
00176 if ( ++my_current_count < my_predecessor_count )
00177 return true;
00178 else
00179 my_current_count = 0;
00180 }
00181 execute();
00182 return true;
00183 }
00184
00185 protected:
00186
00187 spin_mutex my_mutex;
00188 int my_predecessor_count;
00189 int my_current_count;
00190
00192
00194 virtual void execute() = 0;
00195
00196 };
00197
00199 namespace internal {
00200
00202 enum node_state { node_state_idle=0, node_state_nonidle=1, node_state_inactive=2 };
00203
00204
00206 template< typename Output >
00207 class source_body : no_assign {
00208 public:
00209 virtual ~source_body() {}
00210 virtual bool operator()(Output &output) = 0;
00211 };
00212
00214 template< typename Output, typename Body>
00215 class source_body_leaf : public source_body<Output> {
00216 public:
00217 source_body_leaf( Body _body ) : body(_body) { }
00218 bool operator()(Output &output) { return body( output ); }
00219 private:
00220 Body body;
00221 };
00222
00224 template< typename Input, typename Output >
00225 class function_body : no_assign {
00226 public:
00227 virtual ~function_body() {}
00228 virtual Output operator()(Input input) = 0;
00229 };
00230
00232 template <typename Input, typename Output, typename B>
00233 class function_body_leaf : public function_body< Input, Output > {
00234 public:
00235 function_body_leaf( B _body ) : body(_body) { }
00236 Output operator()(Input i) { return body(i); }
00237
00238 private:
00239 B body;
00240 };
00241
00243 template <typename B>
00244 class function_body_leaf< continue_msg, continue_msg, B> : public function_body< continue_msg, continue_msg > {
00245 public:
00246 function_body_leaf( B _body ) : body(_body) { }
00247 continue_msg operator()( continue_msg i ) {
00248 body(i);
00249 return i;
00250 }
00251
00252 private:
00253 B body;
00254 };
00255
00257 template <typename Input, typename B>
00258 class function_body_leaf< Input, continue_msg, B> : public function_body< Input, continue_msg > {
00259 public:
00260 function_body_leaf( B _body ) : body(_body) { }
00261 continue_msg operator()(Input i) {
00262 body(i);
00263 return continue_msg();
00264 }
00265
00266 private:
00267 B body;
00268 };
00269
00271 template <typename Output, typename B>
00272 class function_body_leaf< continue_msg, Output, B > : public function_body< continue_msg, Output > {
00273 public:
00274 function_body_leaf( B _body ) : body(_body) { }
00275 Output operator()(continue_msg i) {
00276 return body(i);
00277 }
00278
00279 private:
00280 B body;
00281 };
00282
00284 template< typename NodeType >
00285 class forward_task : public task {
00286
00287 NodeType &my_node;
00288
00289 public:
00290
00291 forward_task( NodeType &n ) : my_node(n) {}
00292
00293 task *execute() {
00294 my_node.forward();
00295 return NULL;
00296 }
00297 };
00298
00300 template< typename NodeType, typename Input >
00301 class apply_body_task : public task {
00302
00303 NodeType &my_node;
00304 Input my_input;
00305
00306 public:
00307
00308 apply_body_task( NodeType &n, Input i ) : my_node(n), my_input(i) {}
00309
00310 task *execute() {
00311 my_node.apply_body( my_input );
00312 return NULL;
00313 }
00314 };
00315
00317 template< typename NodeType >
00318 class source_task : public task {
00319
00320 NodeType &my_node;
00321
00322 public:
00323
00324 source_task( NodeType &n ) : my_node(n) {}
00325
00326 task *execute() {
00327 my_node.apply_body( );
00328 return NULL;
00329 }
00330 };
00331
00333 template< typename Input, typename Output >
00334 struct empty_body {
00335 Output operator()( Input & ) const { return Output(); }
00336 };
00337
00339 template< typename T, typename M=spin_mutex >
00340 class node_cache {
00341 public:
00342
00343 typedef size_t size_type;
00344
00345 bool empty() {
00346 typename my_mutex_type::scoped_lock lock( my_mutex );
00347 return internal_empty();
00348 }
00349
00350 void add( T &n ) {
00351 typename my_mutex_type::scoped_lock lock( my_mutex );
00352 internal_push(n);
00353 }
00354
00355 void remove( T &n ) {
00356 typename my_mutex_type::scoped_lock lock( my_mutex );
00357 for ( size_t i = internal_size(); i != 0; --i ) {
00358 T &s = internal_pop();
00359 if ( &s != &n ) {
00360 internal_push(s);
00361 }
00362 }
00363 }
00364
00365 protected:
00366
00367 typedef M my_mutex_type;
00368 my_mutex_type my_mutex;
00369 std::queue< T * > my_q;
00370
00371
00372 inline bool internal_empty( ) {
00373 return my_q.empty();
00374 }
00375
00376
00377 inline size_type internal_size( ) {
00378 return my_q.size();
00379 }
00380
00381
00382 inline void internal_push( T &n ) {
00383 my_q.push(&n);
00384 }
00385
00386
00387 inline T &internal_pop() {
00388 T *v = my_q.front();
00389 my_q.pop();
00390 return *v;
00391 }
00392
00393 };
00394
00396 template< typename T, typename M=spin_mutex >
00397 class predecessor_cache : public node_cache< sender<T>, M > {
00398 public:
00399 typedef M my_mutex_type;
00400 typedef T output_type;
00401 typedef sender<output_type> predecessor_type;
00402 typedef receiver<output_type> successor_type;
00403
00404 predecessor_cache( ) : my_owner( NULL ) { }
00405
00406 void set_owner( successor_type *owner ) { my_owner = owner; }
00407
00408 bool get_item( output_type &v ) {
00409
00410 bool msg = false;
00411
00412 do {
00413 predecessor_type *src;
00414 {
00415 typename my_mutex_type::scoped_lock lock(this->my_mutex);
00416 if ( this->internal_empty() ) {
00417 break;
00418 }
00419 src = &this->internal_pop();
00420 }
00421
00422
00423 msg = src->try_get( v );
00424
00425 if (msg == false) {
00426
00427 if ( my_owner)
00428 src->register_successor( *my_owner );
00429 } else {
00430
00431 this->add(*src);
00432 }
00433 } while ( msg == false );
00434 return msg;
00435 }
00436
00437 protected:
00438 successor_type *my_owner;
00439 };
00440
00442 template< typename T, typename M=spin_mutex >
00443 class reservable_predecessor_cache : public predecessor_cache< T, M > {
00444 public:
00445 typedef M my_mutex_type;
00446 typedef T output_type;
00447 typedef sender<T> predecessor_type;
00448 typedef receiver<T> successor_type;
00449
00450 reservable_predecessor_cache( ) : reserved_src(NULL) { }
00451
00452 bool
00453 try_reserve( output_type &v ) {
00454 bool msg = false;
00455
00456 do {
00457 {
00458 typename my_mutex_type::scoped_lock lock(this->my_mutex);
00459 if ( reserved_src || this->internal_empty() )
00460 return false;
00461
00462 reserved_src = &this->internal_pop();
00463 }
00464
00465
00466 msg = reserved_src->try_reserve( v );
00467
00468 if (msg == false) {
00469 typename my_mutex_type::scoped_lock lock(this->my_mutex);
00470
00471 reserved_src->register_successor( *this->my_owner );
00472 reserved_src = NULL;
00473 } else {
00474
00475 this->add( *reserved_src );
00476 }
00477 } while ( msg == false );
00478
00479 return msg;
00480 }
00481
00482 bool
00483 try_release( ) {
00484 reserved_src->try_release( );
00485 reserved_src = NULL;
00486 return true;
00487 }
00488
00489 bool
00490 try_consume( ) {
00491 reserved_src->try_consume( );
00492 reserved_src = NULL;
00493 return true;
00494 }
00495
00496 private:
00497 predecessor_type *reserved_src;
00498 };
00499
00500
00502 template<typename T, typename M=spin_rw_mutex >
00503 class successor_cache : no_copy {
00504 protected:
00505
00506 typedef M my_mutex_type;
00507 my_mutex_type my_mutex;
00508
00509 typedef std::list< receiver<T> * > my_successors_type;
00510 my_successors_type my_successors;
00511
00512 sender<T> *my_owner;
00513
00514 public:
00515
00516 successor_cache( ) : my_owner(NULL) {}
00517
00518 void set_owner( sender<T> *owner ) { my_owner = owner; }
00519
00520 virtual ~successor_cache() {}
00521
00522 void register_successor( receiver<T> &r ) {
00523 typename my_mutex_type::scoped_lock l(my_mutex, true);
00524 my_successors.push_back( &r );
00525 }
00526
00527 void remove_successor( receiver<T> &r ) {
00528 typename my_mutex_type::scoped_lock l(my_mutex, true);
00529 for ( typename my_successors_type::iterator i = my_successors.begin();
00530 i != my_successors.end(); ++i ) {
00531 if ( *i == & r ) {
00532 my_successors.erase(i);
00533 break;
00534 }
00535 }
00536 }
00537
00538 bool empty() {
00539 typename my_mutex_type::scoped_lock l(my_mutex, false);
00540 return my_successors.empty();
00541 }
00542
00543 virtual bool try_put( T t ) = 0;
00544 };
00545
00547 template<>
00548 class successor_cache< continue_msg > : no_copy {
00549 protected:
00550
00551 typedef spin_rw_mutex my_mutex_type;
00552 my_mutex_type my_mutex;
00553
00554 typedef std::list< receiver<continue_msg> * > my_successors_type;
00555 my_successors_type my_successors;
00556
00557 sender<continue_msg> *my_owner;
00558
00559 public:
00560
00561 successor_cache( ) : my_owner(NULL) {}
00562
00563 void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
00564
00565 virtual ~successor_cache() {}
00566
00567 void register_successor( receiver<continue_msg> &r ) {
00568 my_mutex_type::scoped_lock l(my_mutex, true);
00569 my_successors.push_back( &r );
00570 if ( my_owner )
00571 r.register_predecessor( *my_owner );
00572 }
00573
00574 void remove_successor( receiver<continue_msg> &r ) {
00575 my_mutex_type::scoped_lock l(my_mutex, true);
00576 for ( my_successors_type::iterator i = my_successors.begin();
00577 i != my_successors.end(); ++i ) {
00578 if ( *i == & r ) {
00579 if ( my_owner )
00580 r.remove_predecessor( *my_owner );
00581 my_successors.erase(i);
00582 break;
00583 }
00584 }
00585 }
00586
00587 bool empty() {
00588 my_mutex_type::scoped_lock l(my_mutex, false);
00589 return my_successors.empty();
00590 }
00591
00592 virtual bool try_put( continue_msg t ) = 0;
00593
00594 };
00595
00597 template<typename T, typename M=spin_rw_mutex>
00598 class broadcast_cache : public successor_cache<T, M> {
00599 typedef M my_mutex_type;
00600 typedef std::list< receiver<T> * > my_successors_type;
00601
00602 public:
00603
00604 broadcast_cache( ) {}
00605
00606 bool try_put( T t ) {
00607 bool msg = false;
00608 bool upgraded = false;
00609 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
00610 typename my_successors_type::iterator i = this->my_successors.begin();
00611 while ( i != this->my_successors.end() ) {
00612 if ( (*i)->try_put( t ) == true ) {
00613 ++i;
00614 msg = true;
00615 } else {
00616 if ( (*i)->register_predecessor(*this->my_owner) ) {
00617 if (!upgraded) {
00618 l.upgrade_to_writer();
00619 upgraded = true;
00620 }
00621 i = this->my_successors.erase(i);
00622 }
00623 else {
00624 ++i;
00625 }
00626 }
00627 }
00628 return msg;
00629 }
00630 };
00631
00633 template<typename T, typename M=spin_rw_mutex >
00634 class round_robin_cache : public successor_cache<T, M> {
00635 typedef size_t size_type;
00636 typedef M my_mutex_type;
00637 typedef std::list< receiver<T> * > my_successors_type;
00638
00639 public:
00640
00641 round_robin_cache( ) {}
00642
00643 size_type size() {
00644 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
00645 return this->my_successors.size();
00646 }
00647
00648 bool try_put( T t ) {
00649 bool upgraded = false;
00650 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
00651 typename my_successors_type::iterator i = this->my_successors.begin();
00652 while ( i != this->my_successors.end() ) {
00653 if ( (*i)->try_put( t ) ) {
00654 return true;
00655 } else {
00656 if ( (*i)->register_predecessor(*this->my_owner) ) {
00657 if (!upgraded) {
00658 l.upgrade_to_writer();
00659 upgraded = true;
00660 }
00661 i = this->my_successors.erase(i);
00662 }
00663 else {
00664 ++i;
00665 }
00666 }
00667 }
00668 return false;
00669 }
00670 };
00671
00672 template<typename T>
00673 class decrementer : public continue_receiver, internal::no_copy {
00674
00675 T *my_node;
00676
00677 void execute() {
00678 my_node->decrement_counter();
00679 }
00680
00681 public:
00682
00683 typedef continue_msg input_type;
00684 typedef continue_msg output_type;
00685 decrementer( int number_of_predecessors = 0 ) : continue_receiver( number_of_predecessors ) { }
00686 void set_owner( T *node ) { my_node = node; }
00687 };
00688
00689 }
00691
00692
00694
00695 class graph : internal::no_copy {
00696
00697 template< typename Body >
00698 class run_task : public task {
00699 public:
00700 run_task( Body& body ) : my_body(body) {}
00701 task *execute() {
00702 my_body();
00703 return NULL;
00704 }
00705 private:
00706 Body my_body;
00707 };
00708
00709 template< typename Receiver, typename Body >
00710 class run_and_put_task : public task {
00711 public:
00712 run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
00713 task *execute() {
00714 my_receiver.try_put( my_body() );
00715 return NULL;
00716 }
00717 private:
00718 Receiver &my_receiver;
00719 Body my_body;
00720 };
00721
00722 public:
00723
00725 enum concurrency { unlimited = 0, serial = 1 };
00726
00728 graph() : my_root_task( new ( task::allocate_root( ) ) empty_task ) {
00729 my_root_task->set_ref_count(1);
00730 }
00731
00733
00735 ~graph() {
00736 wait_for_all();
00737 my_root_task->set_ref_count(0);
00738 task::destroy( *my_root_task );
00739 }
00740
00741
00743
00745 void increment_wait_count() {
00746 if (my_root_task)
00747 my_root_task->increment_ref_count();
00748 }
00749
00751
00753 void decrement_wait_count() {
00754 if (my_root_task)
00755 my_root_task->decrement_ref_count();
00756 }
00757
00759
00761 template< typename Receiver, typename Body >
00762 void run( Receiver &r, Body body ) {
00763 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00764 run_and_put_task< Receiver, Body >( r, body ) );
00765 }
00766
00768
00770 template< typename Body >
00771 void run( Body body ) {
00772 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
00773 run_task< Body >( body ) );
00774 }
00775
00777
00778 void wait_for_all() {
00779 if (my_root_task)
00780 my_root_task->wait_for_all();
00781 my_root_task->set_ref_count(1);
00782 }
00783
00785 task * root_task() {
00786 return my_root_task;
00787 }
00788
00789 private:
00790
00791 task *my_root_task;
00792
00793 };
00794
00795
00797 namespace internal {
00798
00800 template< typename Input, typename Output >
00801 class function_input : public receiver<Input>, no_assign {
00802 typedef sender<Input> predecessor_type;
00803 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
00804 enum op_type {reg_pred, rem_pred, app_body, tryput, try_fwd};
00805
00806 public:
00808 typedef Input input_type;
00810 typedef Output output_type;
00811
00813 template< typename Body >
00814 function_input( graph &g, size_t max_concurrency, Body& body )
00815 : my_root_task(g.root_task()), my_max_concurrency(max_concurrency), my_concurrency(internal::node_state_idle),
00816 my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ),
00817 forwarder_busy(false) {
00818 my_predecessors.set_owner(this);
00819 my_aggregator.initialize_handler(my_functor_t(this));
00820 }
00821
00823 virtual ~function_input() { delete my_body; }
00824
00826 virtual bool try_put( input_type t ) {
00827 if ( my_max_concurrency == 0 ) {
00828 spawn_body_task( t );
00829 return true;
00830 } else {
00831 my_operation op_data(t, tryput);
00832 my_aggregator.execute(&op_data);
00833 return op_data.status == SUCCEEDED;
00834 }
00835 }
00836
00838 bool register_predecessor( predecessor_type &src ) {
00839 my_operation op_data(reg_pred);
00840 op_data.r = &src;
00841 my_aggregator.execute(&op_data);
00842 return true;
00843 }
00844
00846 bool remove_predecessor( predecessor_type &src ) {
00847 my_operation op_data(rem_pred);
00848 op_data.r = &src;
00849 my_aggregator.execute(&op_data);
00850 return true;
00851 }
00852
00853 protected:
00854 task *my_root_task;
00855 const size_t my_max_concurrency;
00856 size_t my_concurrency;
00857 function_body<input_type, output_type> *my_body;
00858 predecessor_cache<input_type, null_mutex > my_predecessors;
00859
00860 virtual broadcast_cache<output_type > &successors() = 0;
00861
00862 private:
00863 friend class apply_body_task< function_input< input_type, output_type >, input_type >;
00864 friend class forward_task< function_input< input_type, output_type > >;
00865
00866 class my_operation : public aggregated_operation< my_operation > {
00867 public:
00868 char type;
00869 union {
00870 input_type *elem;
00871 predecessor_type *r;
00872 };
00873 my_operation(const input_type& e, op_type t) :
00874 type(char(t)), elem(const_cast<input_type*>(&e)) {}
00875 my_operation(op_type t) : type(char(t)), r(NULL) {}
00876 };
00877
00878 class my_functor_t {
00879 function_input<input_type, output_type> *fi;
00880 public:
00881 my_functor_t() {}
00882 my_functor_t(function_input<input_type, output_type> *fi_) : fi(fi_) {}
00883 void operator()(my_operation* op_list) {
00884 fi->handle_operations(op_list);
00885 }
00886 };
00887
00888 bool forwarder_busy;
00889 aggregator< my_functor_t, my_operation > my_aggregator;
00890
00891 void handle_operations(my_operation *op_list) {
00892 my_operation *tmp;
00893 while (op_list) {
00894 tmp = op_list;
00895 op_list = op_list->next;
00896 switch (tmp->type) {
00897 case reg_pred:
00898 my_predecessors.add(*(tmp->r));
00899 __TBB_store_with_release(tmp->status, SUCCEEDED);
00900 if (!forwarder_busy) {
00901 forwarder_busy = true;
00902 spawn_forward_task();
00903 }
00904 break;
00905 case rem_pred:
00906 my_predecessors.remove(*(tmp->r));
00907 __TBB_store_with_release(tmp->status, SUCCEEDED);
00908 break;
00909 case app_body:
00910 __TBB_ASSERT(my_max_concurrency != 0, NULL);
00911 --my_concurrency;
00912 __TBB_store_with_release(tmp->status, SUCCEEDED);
00913 if (my_concurrency<my_max_concurrency) {
00914 input_type i;
00915 if (my_predecessors.get_item(i)) {
00916 ++my_concurrency;
00917 spawn_body_task(i);
00918 }
00919 }
00920 break;
00921 case tryput: internal_try_put(tmp); break;
00922 case try_fwd: internal_forward(tmp); break;
00923 }
00924 }
00925 }
00926
00927
00929 void internal_try_put(my_operation *op) {
00930 __TBB_ASSERT(my_max_concurrency != 0, NULL);
00931 if (my_concurrency < my_max_concurrency) {
00932 ++my_concurrency;
00933 spawn_body_task(*(op->elem));
00934 __TBB_store_with_release(op->status, SUCCEEDED);
00935 } else {
00936 __TBB_store_with_release(op->status, FAILED);
00937 }
00938 }
00939
00941 void internal_forward(my_operation *op) {
00942 if (my_concurrency<my_max_concurrency || !my_max_concurrency) {
00943 input_type i;
00944 if (my_predecessors.get_item(i)) {
00945 ++my_concurrency;
00946 __TBB_store_with_release(op->status, SUCCEEDED);
00947 spawn_body_task(i);
00948 return;
00949 }
00950 }
00951 __TBB_store_with_release(op->status, FAILED);
00952 forwarder_busy = false;
00953 }
00954
00956 void apply_body( input_type &i ) {
00957 successors().try_put( (*my_body)(i) );
00958 if ( my_max_concurrency != 0 ) {
00959 my_operation op_data(app_body);
00960 my_aggregator.execute(&op_data);
00961 }
00962 }
00963
00965 inline void spawn_body_task( input_type &input ) {
00966 task::enqueue(*new(task::allocate_additional_child_of(*my_root_task)) apply_body_task<function_input<input_type, output_type>, input_type >(*this, input));
00967 }
00968
00970 void forward() {
00971 my_operation op_data(try_fwd);
00972 do {
00973 op_data.status = WAIT;
00974 my_aggregator.execute(&op_data);
00975 } while (op_data.status == SUCCEEDED);
00976 }
00977
00979 inline void spawn_forward_task() {
00980 task::enqueue(*new(task::allocate_additional_child_of(*my_root_task)) forward_task<function_input<input_type, output_type> >(*this));
00981 }
00982 };
00983
00985 template< typename Output >
00986 class continue_input : public continue_receiver {
00987 public:
00988
00990 typedef continue_msg input_type;
00991
00993 typedef Output output_type;
00994
00995 template< typename Body >
00996 continue_input( graph &g, Body& body )
00997 : my_root_task(g.root_task()),
00998 my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) { }
00999
01000 template< typename Body >
01001 continue_input( graph &g, int number_of_predecessors, Body& body )
01002 : continue_receiver( number_of_predecessors ), my_root_task(g.root_task()),
01003 my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) { }
01004
01005 protected:
01006
01007 task *my_root_task;
01008 function_body<input_type, output_type> *my_body;
01009
01010 virtual broadcast_cache<output_type > &successors() = 0;
01011
01012 friend class apply_body_task< continue_input< Output >, continue_msg >;
01013
01015 void apply_body( input_type ) {
01016 successors().try_put( (*my_body)( continue_msg() ) );
01017 }
01018
01020 void execute( ) {
01021 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01022 apply_body_task< continue_input< Output >, continue_msg >( *this, continue_msg() ) );
01023 }
01024 };
01025
01027 template< typename Output >
01028 class function_output : public sender<Output> {
01029 public:
01030
01031 typedef Output output_type;
01032
01033 function_output() { }
01034
01036 bool register_successor( receiver<output_type> &r ) {
01037 successors().register_successor( r );
01038 return true;
01039 }
01040
01042 bool remove_successor( receiver<output_type> &r ) {
01043 successors().remove_successor( r );
01044 return true;
01045 }
01046
01047 protected:
01048
01049 virtual broadcast_cache<output_type > &successors() = 0;
01050
01051 };
01052
01053 }
01055
01057 template < typename Output >
01058 class source_node : public graph_node, public sender< Output > {
01059 public:
01060
01062 typedef Output output_type;
01063
01065 typedef receiver< Output > successor_type;
01066
01068 template< typename Body >
01069 source_node( graph &g, Body body, bool is_active = true )
01070 : my_root_task(g.root_task()), my_state( is_active ? internal::node_state_idle : internal::node_state_inactive ),
01071 my_body( new internal::source_body_leaf< output_type, Body>(body) ),
01072 my_reserved(false), my_has_cached_item(false) {
01073 my_successors.set_owner(this);
01074 }
01075
01077 ~source_node() { delete my_body; }
01078
01080 bool register_successor( receiver<output_type> &r ) {
01081 spin_mutex::scoped_lock lock(my_mutex);
01082 my_successors.register_successor(r);
01083 if ( my_state != internal::node_state_inactive )
01084 spawn_put();
01085 return true;
01086 }
01087
01089 bool remove_successor( receiver<output_type> &r ) {
01090 spin_mutex::scoped_lock lock(my_mutex);
01091 my_successors.remove_successor(r);
01092 return true;
01093 }
01094
01096 bool try_get( output_type &v ) {
01097 spin_mutex::scoped_lock lock(my_mutex);
01098 if ( my_reserved )
01099 return false;
01100
01101 if ( my_has_cached_item ) {
01102 v = my_cached_item;
01103 my_has_cached_item = false;
01104 } else if ( (*my_body)(v) == false ) {
01105 return false;
01106 }
01107 return true;
01108 }
01109
01111 bool try_reserve( output_type &v ) {
01112 spin_mutex::scoped_lock lock(my_mutex);
01113 if ( my_reserved ) {
01114 return false;
01115 }
01116
01117 if ( !my_has_cached_item && (*my_body)(my_cached_item) )
01118 my_has_cached_item = true;
01119
01120 if ( my_has_cached_item ) {
01121 v = my_cached_item;
01122 my_reserved = true;
01123 return true;
01124 } else {
01125 return false;
01126 }
01127 }
01128
01130
01131 bool try_release( ) {
01132 spin_mutex::scoped_lock lock(my_mutex);
01133 __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
01134 my_reserved = false;
01135 spawn_put();
01136 return true;
01137 }
01138
01140 bool try_consume( ) {
01141 spin_mutex::scoped_lock lock(my_mutex);
01142 __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
01143 my_reserved = false;
01144 my_has_cached_item = false;
01145 if ( !my_successors.empty() ) {
01146 spawn_put();
01147 }
01148 return true;
01149 }
01150
01152 void activate() {
01153 spin_mutex::scoped_lock lock(my_mutex);
01154 my_state = internal::node_state_idle;
01155 if ( !my_successors.empty() )
01156 spawn_put();
01157 }
01158
01159 private:
01160
01161 task *my_root_task;
01162 spin_mutex my_mutex;
01163 internal::node_state my_state;
01164 internal::source_body<output_type> *my_body;
01165 internal::broadcast_cache< output_type > my_successors;
01166 bool my_reserved;
01167 bool my_has_cached_item;
01168 output_type my_cached_item;
01169
01170 friend class internal::source_task< source_node< output_type > >;
01171
01173 void apply_body( ) {
01174 output_type v;
01175 if ( try_reserve(v) == false )
01176 return;
01177
01178 if ( my_successors.try_put( v ) )
01179 try_consume();
01180 else
01181 try_release();
01182 }
01183
01185 void spawn_put( ) {
01186 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01187 internal::source_task< source_node< output_type > >( *this ) );
01188 }
01189
01190 };
01191
01193 template <typename Input, typename Output = continue_msg >
01194 class function_node : public graph_node, public internal::function_input<Input,Output>, public internal::function_output<Output> {
01195 public:
01196
01197 typedef Input input_type;
01198 typedef Output output_type;
01199 typedef sender< input_type > predecessor_type;
01200 typedef receiver< output_type > successor_type;
01201
01203 template< typename Body >
01204 function_node( graph &g, size_t concurrency, Body body )
01205 : internal::function_input<input_type,output_type>( g, concurrency, body ) {
01206 my_successors.set_owner(this);
01207 }
01208
01209 protected:
01210
01211 internal::broadcast_cache<output_type> my_successors;
01212 internal::broadcast_cache<output_type> &successors () { return my_successors; }
01213
01214 };
01215
01217 template <typename Output>
01218 class executable_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
01219 public:
01220
01221 typedef continue_msg input_type;
01222 typedef Output output_type;
01223 typedef sender< input_type > predecessor_type;
01224 typedef receiver< output_type > successor_type;
01225
01227 template <typename Body >
01228 executable_node( graph &g, Body body )
01229 : internal::continue_input<output_type>( g, body ) {
01230 my_successors.set_owner(this);
01231 }
01232
01234 template <typename Body >
01235 executable_node( graph &g, int number_of_predecessors, Body body )
01236 : internal::continue_input<output_type>( g, number_of_predecessors, body ) {
01237 my_successors.set_owner(this);
01238 }
01239
01240 protected:
01241
01242 internal::broadcast_cache<output_type> my_successors;
01243 internal::broadcast_cache<output_type> &successors () { return my_successors; }
01244
01245 };
01246
01247
01248
01249 template< typename T >
01250 class overwrite_node : public graph_node, public receiver<T>, public sender<T>, internal::no_copy {
01251 public:
01252
01253 typedef T input_type;
01254 typedef T output_type;
01255 typedef sender< input_type > predecessor_type;
01256 typedef receiver< output_type > successor_type;
01257
01258 overwrite_node() : my_buffer_is_valid(false) {
01259 my_successors.set_owner( this );
01260 }
01261
01262 ~overwrite_node() {}
01263
01264 bool register_successor( successor_type &s ) {
01265 spin_mutex::scoped_lock l( my_mutex );
01266 if ( my_buffer_is_valid ) {
01267
01268 if ( s.try_put( my_buffer ) || !s.register_predecessor( *this ) ) {
01269
01270 my_successors.register_successor( s );
01271 return true;
01272 } else {
01273
01274 return false;
01275 }
01276 } else {
01277
01278 my_successors.register_successor( s );
01279 return true;
01280 }
01281 }
01282
01283 bool remove_successor( successor_type &s ) {
01284 spin_mutex::scoped_lock l( my_mutex );
01285 my_successors.remove_successor(s);
01286 return true;
01287 }
01288
01289 bool try_put( T v ) {
01290 spin_mutex::scoped_lock l( my_mutex );
01291 my_buffer = v;
01292 my_buffer_is_valid = true;
01293 my_successors.try_put(v);
01294 return true;
01295 }
01296
01297 bool try_get( T &v ) {
01298 spin_mutex::scoped_lock l( my_mutex );
01299 if ( my_buffer_is_valid ) {
01300 v = my_buffer;
01301 return true;
01302 } else {
01303 return false;
01304 }
01305 }
01306
01307 bool is_valid() {
01308 spin_mutex::scoped_lock l( my_mutex );
01309 return my_buffer_is_valid;
01310 }
01311
01312 void clear() {
01313 spin_mutex::scoped_lock l( my_mutex );
01314 my_buffer_is_valid = false;
01315 }
01316
01317 protected:
01318
01319 spin_mutex my_mutex;
01320 internal::broadcast_cache< T, null_rw_mutex > my_successors;
01321 T my_buffer;
01322 bool my_buffer_is_valid;
01323
01324 };
01325
01326 template< typename T >
01327 class write_once_node : public overwrite_node<T> {
01328 public:
01329
01330 typedef T input_type;
01331 typedef T output_type;
01332 typedef sender< input_type > predecessor_type;
01333 typedef receiver< output_type > successor_type;
01334
01335 bool try_put( T v ) {
01336 spin_mutex::scoped_lock l( this->my_mutex );
01337 if ( this->my_buffer_is_valid ) {
01338 return false;
01339 } else {
01340 this->my_buffer = v;
01341 this->my_buffer_is_valid = true;
01342 this->my_successors.try_put(v);
01343 return true;
01344 }
01345 }
01346 };
01347
01349
01350 class continue_node : public executable_node< continue_msg > {
01351 public:
01352
01353 typedef continue_msg input_type;
01354 typedef continue_msg output_type;
01355 typedef sender< input_type > predecessor_type;
01356 typedef receiver< output_type > successor_type;
01357
01358 continue_node( graph &g ) : executable_node<continue_msg>( g, internal::empty_body< continue_msg, continue_msg>() ) {}
01359 };
01360
01362 template <typename T>
01363 class broadcast_node : public graph_node, public receiver<T>, public sender<T>, internal::no_copy {
01364
01365 internal::broadcast_cache<T> my_successors;
01366
01367 public:
01368
01369 typedef T input_type;
01370 typedef T output_type;
01371 typedef sender< input_type > predecessor_type;
01372 typedef receiver< output_type > successor_type;
01373
01374 broadcast_node( ) {
01375 my_successors.set_owner( this );
01376 }
01377
01379 virtual bool register_successor( receiver<T> &r ) {
01380 my_successors.register_successor( r );
01381 return true;
01382 }
01383
01385 virtual bool remove_successor( receiver<T> &r ) {
01386 my_successors.remove_successor( r );
01387 return true;
01388 }
01389
01390 bool try_put( T t ) {
01391 my_successors.try_put(t);
01392 return true;
01393 }
01394
01395 };
01396
01397
01399 template <typename T>
01400 class buffer_node : public graph_node, public receiver<T>, public sender<T>, internal::no_copy {
01401 public:
01402 typedef T input_type;
01403 typedef T output_type;
01404 typedef sender< input_type > predecessor_type;
01405 typedef receiver< output_type > successor_type;
01406 protected:
01407 typedef size_t size_type;
01408 typedef std::pair< T, bool > item_type;
01409
01410 internal::round_robin_cache< T, null_rw_mutex > my_successors;
01411
01412 task *my_parent;
01413 item_type *my_array;
01414 size_type my_array_size;
01415 static const size_type initial_buffer_size = 4;
01416 size_type my_head;
01417 size_type my_tail;
01418 spin_mutex my_mutex;
01419 bool my_reserved;
01420 size_type my_reserved_id;
01421
01422 friend class internal::forward_task< buffer_node< T > >;
01423
01424 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd};
01425 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01426
01427
01428 class buffer_operation : public internal::aggregated_operation< buffer_operation > {
01429 public:
01430 char type;
01431 T *elem;
01432 successor_type *r;
01433 buffer_operation(const T& e, op_type t) :
01434 type(char(t)), elem(const_cast<T*>(&e)), r(NULL) {}
01435 buffer_operation(op_type t) : type(char(t)), r(NULL) {}
01436 };
01437
01438 class my_functor_t {
01439 buffer_node<T> *bfr;
01440 public:
01441 my_functor_t(buffer_node<T> *bfr_) : bfr(bfr_) {}
01442 my_functor_t() {}
01443 void operator()(buffer_operation* op_list) {
01444 bfr->handle_operations(op_list);
01445 }
01446 };
01447
01448 bool forwarder_busy;
01449 internal::aggregator< my_functor_t, buffer_operation> my_aggregator;
01450
01451 virtual void handle_operations(buffer_operation *op_list) {
01452 buffer_operation *tmp;
01453 bool try_forwarding=false;
01454 while (op_list) {
01455 tmp = op_list;
01456 op_list = op_list->next;
01457 switch (tmp->type) {
01458 case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
01459 case rem_succ: internal_rem_succ(tmp); break;
01460 case req_item: internal_pop(tmp); break;
01461 case res_item: internal_reserve(tmp); break;
01462 case rel_res: internal_release(tmp); try_forwarding = true; break;
01463 case con_res: internal_consume(tmp); try_forwarding = true; break;
01464 case put_item: internal_push(tmp); try_forwarding = true; break;
01465 case try_fwd: internal_forward(tmp); break;
01466 }
01467 }
01468 if (try_forwarding && !forwarder_busy) {
01469 forwarder_busy = true;
01470 task::enqueue(*new(task::allocate_additional_child_of(*my_parent)) internal::forward_task< buffer_node<input_type> >(*this));
01471 }
01472 }
01473
01475 virtual void forward() {
01476 buffer_operation op_data(try_fwd);
01477 do {
01478 op_data.status = WAIT;
01479 my_aggregator.execute(&op_data);
01480 } while (op_data.status == SUCCEEDED);
01481 }
01482
01484 virtual void internal_reg_succ(buffer_operation *op) {
01485 my_successors.register_successor(*(op->r));
01486 __TBB_store_with_release(op->status, SUCCEEDED);
01487 }
01488
01490 virtual void internal_rem_succ(buffer_operation *op) {
01491 my_successors.remove_successor(*(op->r));
01492 __TBB_store_with_release(op->status, SUCCEEDED);
01493 }
01494
01496 virtual void internal_forward(buffer_operation *op) {
01497 T i_copy;
01498 bool success = false;
01499 size_type counter = my_successors.size();
01500
01501 while (counter>0 && my_tail>my_head && my_array[ (my_tail-1) & (my_array_size-1)].second == true ) {
01502 i_copy = my_array[ (my_tail-1) & (my_array_size-1)].first;
01503 bool msg = my_successors.try_put(i_copy);
01504 if ( msg == true ) {
01505 my_array[ (my_tail-1) & (my_array_size-1)].second = false;
01506 --my_tail;
01507 success = true;
01508 }
01509 --counter;
01510 }
01511 if (success && !counter)
01512 __TBB_store_with_release(op->status, SUCCEEDED);
01513 else {
01514 __TBB_store_with_release(op->status, FAILED);
01515 forwarder_busy = false;
01516 }
01517 }
01518
01519 virtual void internal_push(buffer_operation *op) {
01520 while( my_tail-my_head >= my_array_size ) {
01521 grow_my_array( my_tail - my_head + 1 );
01522 }
01523 my_array[my_tail&(my_array_size-1)] = std::make_pair( *(op->elem), true );
01524 ++my_tail;
01525 __TBB_store_with_release(op->status, SUCCEEDED);
01526 }
01527 virtual void internal_pop(buffer_operation *op) {
01528 if ( my_array[(my_tail-1) & (my_array_size-1)].second == false ) {
01529 __TBB_store_with_release(op->status, FAILED);
01530 }
01531 else {
01532 *(op->elem) = my_array[(my_tail-1) & (my_array_size-1)].first;
01533 my_array[(my_tail-1) & (my_array_size-1)].second = false;
01534 --my_tail;
01535 __TBB_store_with_release(op->status, SUCCEEDED);
01536 }
01537 }
01538 virtual void internal_reserve(buffer_operation *op) {
01539 if (my_reserved == true || my_array[ my_head & (my_array_size-1)].second == false ) {
01540 __TBB_store_with_release(op->status, FAILED);
01541 }
01542 else {
01543 my_reserved = true;
01544 *(op->elem) = my_array[ my_head & (my_array_size-1)].first;
01545 my_array[ my_head & (my_array_size-1)].second = false;
01546 __TBB_store_with_release(op->status, SUCCEEDED);
01547 }
01548 }
01549 virtual void internal_consume(buffer_operation *op) {
01550 my_reserved = false;
01551 ++my_head;
01552 __TBB_store_with_release(op->status, SUCCEEDED);
01553 }
01554
01555 virtual void internal_release(buffer_operation *op) {
01556 my_array[my_head&(my_array_size-1)].second = true;
01557 my_reserved = false;
01558 __TBB_store_with_release(op->status, SUCCEEDED);
01559 }
01560
01562 void grow_my_array( size_t minimum_size ) {
01563 size_type old_size = my_array_size;
01564 size_type new_size = old_size ? 2*old_size : initial_buffer_size;
01565 while( new_size<minimum_size )
01566 new_size*=2;
01567
01568 item_type* new_array = cache_aligned_allocator<item_type>().allocate(new_size);
01569 item_type* old_array = my_array;
01570
01571 for( size_type i=0; i<new_size; ++i )
01572 new_array[i].second = false;
01573
01574 size_t t=my_head;
01575 for( size_type i=0; i<old_size; ++i, ++t )
01576 new_array[t&(new_size-1)] = old_array[t&(old_size-1)];
01577 my_array = new_array;
01578 my_array_size = new_size;
01579 if( old_array )
01580 cache_aligned_allocator<item_type>().deallocate(old_array,old_size);
01581 }
01582
01583 public:
01585 buffer_node( graph &g ) :
01586 my_parent( g.root_task() ), my_array(NULL), my_array_size(0),
01587 my_head(0), my_tail(0), my_reserved(false), forwarder_busy(false) {
01588 my_successors.set_owner(this);
01589 my_aggregator.initialize_handler(my_functor_t(this));
01590 grow_my_array(initial_buffer_size);
01591 }
01592
01593 virtual ~buffer_node() {}
01594
01595
01596
01597
01598
01600
01601 bool register_successor( receiver<output_type> &r ) {
01602 buffer_operation op_data(reg_succ);
01603 op_data.r = &r;
01604 my_aggregator.execute(&op_data);
01605 return true;
01606 }
01607
01609
01611 bool remove_successor( receiver<output_type> &r ) {
01612 r.remove_predecessor(*this);
01613 buffer_operation op_data(rem_succ);
01614 op_data.r = &r;
01615 my_aggregator.execute(&op_data);
01616 return true;
01617 }
01618
01620
01622 bool try_get( T &v ) {
01623 buffer_operation op_data(req_item);
01624 op_data.elem = &v;
01625 my_aggregator.execute(&op_data);
01626 return (op_data.status==SUCCEEDED);
01627 }
01628
01630
01632 bool try_reserve( T &v ) {
01633 buffer_operation op_data(res_item);
01634 op_data.elem = &v;
01635 my_aggregator.execute(&op_data);
01636 return (op_data.status==SUCCEEDED);
01637 }
01638
01640
01641 bool try_release() {
01642 buffer_operation op_data(rel_res);
01643 my_aggregator.execute(&op_data);
01644 return true;
01645 }
01646
01648
01649 bool try_consume() {
01650 buffer_operation op_data(con_res);
01651 my_aggregator.execute(&op_data);
01652 return true;
01653 }
01654
01656
01657 bool try_put(T t) {
01658 buffer_operation op_data(t, put_item);
01659 my_aggregator.execute(&op_data);
01660 return true;
01661 }
01662 };
01663
01664
01666 template <typename T>
01667 class queue_node : public buffer_node<T> {
01668 protected:
01669 typedef typename buffer_node<T>::size_type size_type;
01670 typedef typename buffer_node<T>::buffer_operation queue_operation;
01671
01672 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01673
01675 void internal_forward(queue_operation *op) {
01676 T i_copy;
01677 bool success = false;
01678 size_type counter = this->my_successors.size();
01679 if (this->my_reserved || this->my_array[ this->my_head & (this->my_array_size-1)].second == false) {
01680 __TBB_store_with_release(op->status, FAILED);
01681 this->forwarder_busy = false;
01682 return;
01683 }
01684
01685 while (counter>0 && this->my_array[ this->my_head & (this->my_array_size-1)].second == true ) {
01686 i_copy = this->my_array[ this->my_head & (this->my_array_size-1)].first;
01687 bool msg = this->my_successors.try_put(i_copy);
01688 if ( msg == true ) {
01689 this->my_array[ this->my_head & (this->my_array_size-1)].second = false;
01690 ++(this->my_head);
01691 success = true;
01692 }
01693 --counter;
01694 }
01695 if (success && !counter)
01696 __TBB_store_with_release(op->status, SUCCEEDED);
01697 else {
01698 __TBB_store_with_release(op->status, FAILED);
01699 this->forwarder_busy = false;
01700 }
01701 }
01702
01703 void internal_pop(queue_operation *op) {
01704 if ( this->my_reserved == true || this->my_array[ this->my_head & (this->my_array_size-1)].second == false ) {
01705 __TBB_store_with_release(op->status, FAILED);
01706 }
01707 else {
01708 *(op->elem) = this->my_array[ this->my_head & (this->my_array_size-1)].first;
01709 this->my_array[ this->my_head & (this->my_array_size-1)].second = false;
01710 ++(this->my_head);
01711 __TBB_store_with_release(op->status, SUCCEEDED);
01712 }
01713 }
01714 void internal_reserve(queue_operation *op) {
01715 if (this->my_reserved == true || this->my_array[ this->my_head & (this->my_array_size-1)].second == false ) {
01716 __TBB_store_with_release(op->status, FAILED);
01717 }
01718 else {
01719 this->my_reserved = true;
01720 *(op->elem) = this->my_array[ this->my_head & (this->my_array_size-1)].first;
01721 __TBB_store_with_release(op->status, SUCCEEDED);
01722 }
01723 }
01724 void internal_consume(queue_operation *op) {
01725 this->my_reserved = false;
01726 this->my_array[ this->my_head & (this->my_array_size-1)].second = false;
01727 ++(this->my_head);
01728 __TBB_store_with_release(op->status, SUCCEEDED);
01729 }
01730
01731 public:
01732
01733 typedef T input_type;
01734 typedef T output_type;
01735 typedef sender< input_type > predecessor_type;
01736 typedef receiver< output_type > successor_type;
01737
01739 queue_node( graph &g ) : buffer_node<T>(g) {}
01740 };
01741
01743 template< typename T >
01744 class sequencer_node : public queue_node<T> {
01745 internal::function_body< T, size_t > *my_sequencer;
01746 public:
01747
01748 typedef T input_type;
01749 typedef T output_type;
01750 typedef sender< input_type > predecessor_type;
01751 typedef receiver< output_type > successor_type;
01752
01754 template< typename Sequencer >
01755 sequencer_node( graph &g, const Sequencer& s ) : queue_node<T>(g),
01756 my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {}
01757
01759 ~sequencer_node() { delete my_sequencer; }
01760 protected:
01761 typedef typename buffer_node<T>::size_type size_type;
01762 typedef typename buffer_node<T>::buffer_operation sequencer_operation;
01763
01764 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01765
01766 private:
01767 void internal_push(sequencer_operation *op) {
01768 size_type tag = (*my_sequencer)(*(op->elem));
01769
01770 this->my_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
01771 while ( this->my_tail - this->my_head >= this->my_array_size ) {
01772 this->grow_my_array( this->my_tail - this->my_head + 1);
01773 }
01774 this->my_array[tag&(this->my_array_size-1)] = std::make_pair( *(op->elem), true );
01775 __TBB_store_with_release(op->status, SUCCEEDED);
01776 }
01777 };
01778
01780 template< typename T, typename Compare = std::less<T> >
01781 class priority_queue_node : public buffer_node<T> {
01782 public:
01783 typedef T input_type;
01784 typedef T output_type;
01785 typedef sender< input_type > predecessor_type;
01786 typedef receiver< output_type > successor_type;
01787
01789 priority_queue_node( graph &g ) : buffer_node<T>(g), mark(0) {}
01790
01791 protected:
01792 typedef typename buffer_node<T>::size_type size_type;
01793 typedef typename buffer_node<T>::item_type item_type;
01794 typedef typename buffer_node<T>::buffer_operation prio_operation;
01795
01796 enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01797
01798 void handle_operations(prio_operation *op_list) {
01799 prio_operation *tmp ;
01800 bool try_forwarding=false;
01801 while (op_list) {
01802 tmp = op_list;
01803 op_list = op_list->next;
01804 switch (tmp->type) {
01805 case buffer_node<T>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
01806 case buffer_node<T>::rem_succ: this->internal_rem_succ(tmp); break;
01807 case buffer_node<T>::put_item: internal_push(tmp); try_forwarding = true; break;
01808 case buffer_node<T>::try_fwd: internal_forward(tmp); break;
01809 case buffer_node<T>::rel_res: internal_release(tmp); try_forwarding = true; break;
01810 case buffer_node<T>::con_res: internal_consume(tmp); try_forwarding = true; break;
01811 case buffer_node<T>::req_item: internal_pop(tmp); break;
01812 case buffer_node<T>::res_item: internal_reserve(tmp); break;
01813 }
01814 }
01815
01816 if (mark<this->my_tail) heapify();
01817 if (try_forwarding && !this->forwarder_busy) {
01818 this->forwarder_busy = true;
01819 task::enqueue(*new(task::allocate_additional_child_of(*(this->my_parent))) internal::forward_task< buffer_node<input_type> >(*this));
01820 }
01821 }
01822
01824 void internal_forward(prio_operation *op) {
01825 T i_copy;
01826 bool success = false;
01827 size_type counter = this->my_successors.size();
01828
01829 if (this->my_reserved || this->my_tail == 0) {
01830 __TBB_store_with_release(op->status, FAILED);
01831 this->forwarder_busy = false;
01832 return;
01833 }
01834
01835 while (counter>0 && this->my_tail > 0) {
01836 i_copy = this->my_array[0].first;
01837 bool msg = this->my_successors.try_put(i_copy);
01838 if ( msg == true ) {
01839 if (mark == this->my_tail) --mark;
01840 --(this->my_tail);
01841 this->my_array[0].first=this->my_array[this->my_tail].first;
01842 if (this->my_tail > 1)
01843 reheap();
01844 success = true;
01845 }
01846 --counter;
01847 }
01848 if (success && !counter)
01849 __TBB_store_with_release(op->status, SUCCEEDED);
01850 else {
01851 __TBB_store_with_release(op->status, FAILED);
01852 this->forwarder_busy = false;
01853 }
01854 }
01855
01856 void internal_push(prio_operation *op) {
01857 if ( this->my_tail >= this->my_array_size )
01858 this->grow_my_array( this->my_tail + 1 );
01859 this->my_array[this->my_tail] = std::make_pair( *(op->elem), true );
01860 ++(this->my_tail);
01861 __TBB_store_with_release(op->status, SUCCEEDED);
01862 }
01863 void internal_pop(prio_operation *op) {
01864 if ( this->my_reserved == true || this->my_tail == 0 ) {
01865 __TBB_store_with_release(op->status, FAILED);
01866 }
01867 else {
01868 if (mark<this->my_tail &&
01869 compare(this->my_array[0].first,
01870 this->my_array[this->my_tail-1].first)) {
01871
01872
01873 *(op->elem) = this->my_array[this->my_tail-1].first;
01874 --(this->my_tail);
01875 __TBB_store_with_release(op->status, SUCCEEDED);
01876 }
01877 else {
01878 *(op->elem) = this->my_array[0].first;
01879 if (mark == this->my_tail) --mark;
01880 --(this->my_tail);
01881 __TBB_store_with_release(op->status, SUCCEEDED);
01882 this->my_array[0].first=this->my_array[this->my_tail].first;
01883 if (this->my_tail > 1)
01884 reheap();
01885 }
01886 }
01887 }
01888 void internal_reserve(prio_operation *op) {
01889 if (this->my_reserved == true || this->my_tail == 0) {
01890 __TBB_store_with_release(op->status, FAILED);
01891 }
01892 else {
01893 this->my_reserved = true;
01894 *(op->elem) = reserved_item = this->my_array[0].first;
01895 if (mark == this->my_tail) --mark;
01896 --(this->my_tail);
01897 __TBB_store_with_release(op->status, SUCCEEDED);
01898 this->my_array[0].first = this->my_array[this->my_tail].first;
01899 if (this->my_tail > 1)
01900 reheap();
01901 }
01902 }
01903 void internal_consume(prio_operation *op) {
01904 this->my_reserved = false;
01905 __TBB_store_with_release(op->status, SUCCEEDED);
01906 }
01907 void internal_release(prio_operation *op) {
01908 if (this->my_tail >= this->my_array_size)
01909 this->grow_my_array( this->my_tail + 1 );
01910 this->my_array[this->my_tail] = std::make_pair(reserved_item, true);
01911 ++(this->my_tail);
01912 this->my_reserved = false;
01913 __TBB_store_with_release(op->status, SUCCEEDED);
01914 heapify();
01915 }
01916 private:
01917 Compare compare;
01918 size_type mark;
01919 input_type reserved_item;
01920
01921 void heapify() {
01922 if (!mark) mark = 1;
01923 for (; mark<this->my_tail; ++mark) {
01924 size_type cur_pos = mark;
01925 input_type to_place = this->my_array[mark].first;
01926 do {
01927 size_type parent = (cur_pos-1)>>1;
01928 if (!compare(this->my_array[parent].first, to_place))
01929 break;
01930 this->my_array[cur_pos].first = this->my_array[parent].first;
01931 cur_pos = parent;
01932 } while( cur_pos );
01933 this->my_array[cur_pos].first = to_place;
01934 }
01935 }
01936
01937 void reheap() {
01938 size_type cur_pos=0, child=1;
01939 while (child < mark) {
01940 size_type target = child;
01941 if (child+1<mark &&
01942 compare(this->my_array[child].first,
01943 this->my_array[child+1].first))
01944 ++target;
01945
01946 if (compare(this->my_array[target].first,
01947 this->my_array[this->my_tail].first))
01948 break;
01949 this->my_array[cur_pos].first = this->my_array[target].first;
01950 cur_pos = target;
01951 child = (cur_pos<<1)+1;
01952 }
01953 this->my_array[cur_pos].first = this->my_array[this->my_tail].first;
01954 }
01955 };
01956
01958
01961 template< typename T >
01962 class limiter_node : public graph_node, public receiver< T >, public sender< T >, internal::no_copy {
01963 public:
01964
01965 typedef T input_type;
01966 typedef T output_type;
01967 typedef sender< input_type > predecessor_type;
01968 typedef receiver< output_type > successor_type;
01969
01970 private:
01971
01972 task *my_root_task;
01973 size_t my_threshold;
01974 size_t my_count;
01975 internal::predecessor_cache< T > my_predecessors;
01976 spin_mutex my_mutex;
01977 internal::broadcast_cache< T > my_successors;
01978
01979 friend class internal::forward_task< limiter_node<T> >;
01980
01981
01982 friend class internal::decrementer< limiter_node<T> >;
01983
01984 void decrement_counter() {
01985 input_type v;
01986
01987
01988 if ( my_predecessors.get_item( v ) == false
01989 || my_successors.try_put(v) == false ) {
01990 spin_mutex::scoped_lock lock(my_mutex);
01991 --my_count;
01992 if ( !my_predecessors.empty() )
01993 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
01994 internal::forward_task< limiter_node<T> >( *this ) );
01995 }
01996 }
01997
01998 void forward() {
01999 {
02000 spin_mutex::scoped_lock lock(my_mutex);
02001 if ( my_count < my_threshold )
02002 ++my_count;
02003 else
02004 return;
02005 }
02006 decrement_counter();
02007 }
02008
02009 public:
02010
02012 internal::decrementer< limiter_node<T> > decrement;
02013
02015 limiter_node( graph &g, size_t threshold, int number_of_decrement_predecessors = 0 ) :
02016 my_root_task(g.root_task()), my_threshold(threshold), my_count(0), decrement(number_of_decrement_predecessors) {
02017 my_predecessors.set_owner(this);
02018 my_successors.set_owner(this);
02019 decrement.set_owner(this);
02020 }
02021
02023 bool register_successor( receiver<output_type> &r ) {
02024 my_successors.register_successor(r);
02025 return true;
02026 }
02027
02029
02030 bool remove_successor( receiver<output_type> &r ) {
02031 r.remove_predecessor(*this);
02032 my_successors.remove_successor(r);
02033 return true;
02034 }
02035
02037 bool try_put( T t ) {
02038 {
02039 spin_mutex::scoped_lock lock(my_mutex);
02040 if ( my_count >= my_threshold )
02041 return false;
02042 else
02043 ++my_count;
02044 }
02045
02046 bool msg = my_successors.try_put(t);
02047
02048 if ( msg != true ) {
02049 spin_mutex::scoped_lock lock(my_mutex);
02050 --my_count;
02051 if ( !my_predecessors.empty() )
02052 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
02053 internal::forward_task< limiter_node<T> >( *this ) );
02054 }
02055
02056 return msg;
02057 }
02058
02060 bool register_predecessor( predecessor_type &src ) {
02061 spin_mutex::scoped_lock lock(my_mutex);
02062 my_predecessors.add( src );
02063 if ( my_count < my_threshold && !my_successors.empty() )
02064 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
02065 internal::forward_task< limiter_node<T> >( *this ) );
02066 return true;
02067 }
02068
02070 bool remove_predecessor( predecessor_type &src ) {
02071 my_predecessors.remove( src );
02072 return true;
02073 }
02074
02075 };
02076
02077 namespace internal {
02078
02079 struct forwarding_base {
02080 virtual ~forwarding_base() {}
02081 virtual void decrement_port_count() = 0;
02082 virtual void increment_port_count() = 0;
02083 };
02084
02085 template< int N >
02086 struct join_helper {
02087
02088 template< typename TupleType, typename PortType >
02089 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
02090 std::get<N-1>( my_input ).set_join_node_pointer(port);
02091 join_helper<N-1>::set_join_node_pointer( my_input, port );
02092 }
02093 template< typename TupleType >
02094 static inline void consume_reservations( TupleType &my_input ) {
02095 std::get<N-1>( my_input ).consume();
02096 join_helper<N-1>::consume_reservations( my_input );
02097 }
02098
02099 template< typename TupleType >
02100 static inline void release_my_reservation( TupleType &my_input ) {
02101 std::get<N-1>( my_input ).release();
02102 }
02103
02104 template <typename TupleType>
02105 static inline void release_reservations( TupleType &my_input) {
02106 join_helper<N-1>::release_reservations(my_input);
02107 release_my_reservation(my_input);
02108 }
02109
02110 template< typename InputTuple, typename OutputTuple >
02111 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
02112 if ( !std::get<N-1>( my_input ).reserve( std::get<N-1>( out ) ) ) return false;
02113 if ( !join_helper<N-1>::reserve( my_input, out ) ) {
02114 release_my_reservation( my_input );
02115 return false;
02116 }
02117 return true;
02118 }
02119 };
02120
02121 template< >
02122 struct join_helper<1> {
02123
02124 template< typename TupleType, typename PortType >
02125 static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
02126 std::get<0>( my_input ).set_join_node_pointer(port);
02127 }
02128
02129 template< typename TupleType >
02130 static inline void consume_reservations( TupleType &my_input ) {
02131 std::get<0>( my_input ).consume();
02132 }
02133
02134 template< typename TupleType >
02135 static inline void release_my_reservation( TupleType &my_input ) {
02136 std::get<0>( my_input ).release();
02137 }
02138
02139 template<typename TupleType>
02140 static inline void release_reservations( TupleType &my_input) {
02141 release_my_reservation(my_input);
02142 }
02143
02144 template< typename InputTuple, typename OutputTuple >
02145 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
02146 return std::get<0>( my_input ).reserve( std::get<0>( out ) );
02147 }
02148 };
02149
02150 namespace join_policy_namespace {
02151 enum join_policy { two_phase
02152 };
02153 }
02154 using namespace join_policy_namespace;
02155
02157 template< typename T >
02158 class two_phase_port : public receiver<T> {
02159 public:
02160 typedef T input_type;
02161 typedef sender<T> predecessor_type;
02162
02164 two_phase_port() : reserved(false) {
02165 my_join = NULL;
02166 my_predecessors.set_owner( this );
02167 }
02168
02169
02170 two_phase_port(const two_phase_port& ) : receiver<T>() {
02171 reserved = false;
02172 my_join = NULL;
02173 my_predecessors.set_owner( this );
02174 }
02175
02176 void set_join_node_pointer(forwarding_base *join) {
02177 my_join = join;
02178 }
02179
02180 bool try_put( T ) {
02181 return false;
02182 }
02183
02185 bool register_predecessor( sender<T> &src ) {
02186 spin_mutex::scoped_lock l(my_mutex);
02187 bool no_predecessors = my_predecessors.empty();
02188 my_predecessors.add(src);
02189 if ( no_predecessors ) {
02190 my_join->decrement_port_count( );
02191 }
02192 return true;
02193 }
02194
02196 bool remove_predecessor( sender<T> &src ) {
02197 spin_mutex::scoped_lock l(my_mutex);
02198 my_predecessors.remove( src );
02199 if(my_predecessors.empty()) my_join->increment_port_count();
02200 return true;
02201 }
02202
02204 bool reserve( T &v ) {
02205 spin_mutex::scoped_lock l(my_mutex);
02206 if ( reserved ) {
02207 return false;
02208 }
02209 if ( my_predecessors.try_reserve( v ) ) {
02210 reserved = true;
02211 return true;
02212 } else if ( my_predecessors.empty() ) {
02213 my_join->increment_port_count();
02214 }
02215 return false;
02216 }
02217
02219 void release( ) {
02220 spin_mutex::scoped_lock l(my_mutex);
02221 reserved = false;
02222 my_predecessors.try_release( );
02223 }
02224
02226 void consume( ) {
02227 spin_mutex::scoped_lock l(my_mutex);
02228 reserved = false;
02229 my_predecessors.try_consume( );
02230 }
02231
02232 private:
02233 spin_mutex my_mutex;
02234 forwarding_base *my_join;
02235 reservable_predecessor_cache< T > my_predecessors;
02236 bool reserved;
02237 };
02238
02239 template<join_policy JP, typename InputTuple, typename OutputTuple>
02240 class join_node_base;
02241
02243 template<join_policy JP, typename InputTuple, typename OutputTuple>
02244 class join_node_FE;
02245
02246 template<typename InputTuple, typename OutputTuple>
02247 class join_node_FE<two_phase, InputTuple, OutputTuple> : public forwarding_base {
02248 public:
02249 static const int N = std::tuple_size<OutputTuple>::value;
02250 typedef OutputTuple output_type;
02251 typedef InputTuple input_type;
02252 typedef join_node_base<two_phase, InputTuple, OutputTuple> my_node_type;
02253
02254 join_node_FE(graph &g) : my_root_task(g.root_task()), my_node(NULL) {
02255 ports_with_no_inputs = N;
02256 join_helper<N>::set_join_node_pointer(my_inputs, this);
02257 }
02258
02259 void set_my_node(my_node_type *new_my_node) { my_node = new_my_node; }
02260
02261 void increment_port_count() {
02262 ++ports_with_no_inputs;
02263 }
02264
02265
02266 void decrement_port_count() {
02267 if(ports_with_no_inputs.fetch_and_decrement() == 1) {
02268 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
02269 forward_task<my_node_type>(*my_node) );
02270 }
02271 }
02272
02273 input_type &inputs() { return my_inputs; }
02274 protected:
02275
02276
02277 bool tuple_build_may_succeed() {
02278 return !ports_with_no_inputs;
02279 }
02280
02281 bool try_to_make_tuple(output_type &out) {
02282 if(ports_with_no_inputs) return false;
02283 return join_helper<N>::reserve(my_inputs, out);
02284 }
02285
02286 void tuple_accepted() {
02287 join_helper<N>::consume_reservations(my_inputs);
02288 }
02289 void tuple_rejected() {
02290 join_helper<N>::release_reservations(my_inputs);
02291 }
02292
02293 input_type my_inputs;
02294 task *my_root_task;
02295 my_node_type *my_node;
02296 atomic<size_t> ports_with_no_inputs;
02297 };
02298
02300 template<join_policy JP, typename InputTuple, typename OutputTuple>
02301 class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>,
02302 public sender<OutputTuple>, no_copy {
02303 public:
02304 typedef OutputTuple output_type;
02305
02306 typedef receiver<output_type> successor_type;
02307 typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type;
02308 using input_ports_type::tuple_build_may_succeed;
02309 using input_ports_type::try_to_make_tuple;
02310 using input_ports_type::tuple_accepted;
02311 using input_ports_type::tuple_rejected;
02312
02313 join_node_base(graph &g) : input_ports_type(g), my_root_task(g.root_task()) {
02314 my_successors.set_owner(this);
02315 input_ports_type::set_my_node(this);
02316 }
02317
02318 bool register_successor(successor_type &r) {
02319 spin_mutex::scoped_lock l(my_mutex);
02320 my_successors.register_successor(r);
02321 if(tuple_build_may_succeed()) {
02322 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task) )
02323 forward_task<join_node_base<JP,InputTuple,OutputTuple> >( *this ) );
02324 }
02325 return true;
02326 }
02327
02328 template<size_t N>
02329 receiver<typename std::tuple_element<N, OutputTuple>::value> & input_port(void) { return std::get<N>(input_ports_type::inputs()); }
02330
02331 bool remove_successor( successor_type &r) {
02332 spin_mutex::scoped_lock l(my_mutex);
02333 my_successors.remove_successor(r);
02334 return true;
02335 }
02336
02337 bool try_get( output_type &v) {
02338 spin_mutex::scoped_lock l(my_mutex);
02339 if(tuple_build_may_succeed()) {
02340 if(try_to_make_tuple(v)) {
02341
02342 tuple_accepted();
02343 return true;
02344 }
02345 }
02346 return false;
02347 }
02348
02349 private:
02350 task *my_root_task;
02351 broadcast_cache<output_type, null_rw_mutex> my_successors;
02352 spin_mutex my_mutex;
02353
02354 friend class forward_task< join_node_base<JP, InputTuple, OutputTuple> >;
02355
02356 void forward() {
02357 spin_mutex::scoped_lock l(my_mutex);
02358 output_type out;
02359 if(!tuple_build_may_succeed()) return;
02360 while(try_to_make_tuple(out)) {
02361 if(my_successors.try_put(out)) {
02362 tuple_accepted();
02363 }
02364 else {
02365 tuple_rejected();
02366 return;
02367 }
02368 }
02369 }
02370 };
02371
02373
02374 template<int N, typename OutputTuple, join_policy JP>
02375 class unfolded_join_node;
02376
02377 template<typename OutputTuple>
02378 class unfolded_join_node<2,OutputTuple,two_phase> : public internal::join_node_base<two_phase,
02379 std::tuple<
02380 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02381 two_phase_port<typename std::tuple_element<1,OutputTuple>::type> >,
02382 OutputTuple
02383 >
02384 {
02385 private:
02386 typedef typename std::tuple<
02387 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02388 two_phase_port<typename std::tuple_element<1,OutputTuple>::type> > port_tuple_type;
02389 public:
02390 typedef OutputTuple output_type;
02391 private:
02392 typedef join_node_base<two_phase, port_tuple_type, output_type > base_type;
02393 public:
02394 unfolded_join_node(graph &g) : base_type(g) {}
02395 };
02396
02397 template<typename OutputTuple>
02398 class unfolded_join_node<3,OutputTuple,two_phase> : public internal::join_node_base<two_phase,
02399 std::tuple<
02400 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02401 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02402 two_phase_port<typename std::tuple_element<2,OutputTuple>::type> >,
02403 OutputTuple
02404 >
02405 {
02406 private:
02407 typedef typename std::tuple<
02408 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02409 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02410 two_phase_port<typename std::tuple_element<2,OutputTuple>::type> > port_tuple_type;
02411 public:
02412 typedef OutputTuple output_type;
02413 private:
02414 typedef join_node_base<two_phase, port_tuple_type, output_type > base_type;
02415 public:
02416 unfolded_join_node(graph &g) : base_type(g) {}
02417 };
02418
02419 template<typename OutputTuple>
02420 class unfolded_join_node<4,OutputTuple,two_phase> : public internal::join_node_base<two_phase,
02421 std::tuple<
02422 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02423 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02424 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02425 two_phase_port<typename std::tuple_element<3,OutputTuple>::type> >,
02426 OutputTuple
02427 > {
02428 private:
02429 typedef typename std::tuple<
02430 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02431 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02432 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02433 two_phase_port<typename std::tuple_element<3,OutputTuple>::type> > port_tuple_type;
02434 public:
02435 typedef OutputTuple output_type;
02436 private:
02437 typedef join_node_base<two_phase, port_tuple_type, output_type > base_type;
02438 public:
02439 unfolded_join_node(graph &g) : base_type(g) {}
02440 };
02441
02442 template<typename OutputTuple>
02443 class unfolded_join_node<5,OutputTuple,two_phase> : public internal::join_node_base<two_phase,
02444 std::tuple<
02445 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02446 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02447 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02448 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>,
02449 two_phase_port<typename std::tuple_element<4,OutputTuple>::type> >,
02450 OutputTuple
02451 > {
02452 private:
02453 typedef typename std::tuple<
02454 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02455 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02456 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02457 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>,
02458 two_phase_port<typename std::tuple_element<4,OutputTuple>::type> > port_tuple_type;
02459 public:
02460 typedef OutputTuple output_type;
02461 private:
02462 typedef join_node_base<two_phase, port_tuple_type, output_type > base_type;
02463 public:
02464 unfolded_join_node(graph &g) : base_type(g) {}
02465 };
02466
02467 template<typename OutputTuple>
02468 class unfolded_join_node<6,OutputTuple,two_phase> : public internal::join_node_base<two_phase,
02469 std::tuple<
02470 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02471 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02472 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02473 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>,
02474 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>,
02475 two_phase_port<typename std::tuple_element<5,OutputTuple>::type> >,
02476 OutputTuple
02477 > {
02478 private:
02479 typedef typename std::tuple<
02480 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02481 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02482 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02483 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>,
02484 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>,
02485 two_phase_port<typename std::tuple_element<5,OutputTuple>::type> > port_tuple_type;
02486 public:
02487 typedef OutputTuple output_type;
02488 private:
02489 typedef join_node_base<two_phase, port_tuple_type, output_type > base_type;
02490 public:
02491 unfolded_join_node(graph &g) : base_type(g) {}
02492 };
02493
02494 template<typename OutputTuple>
02495 class unfolded_join_node<7,OutputTuple,two_phase> : public internal::join_node_base<two_phase,
02496 std::tuple<
02497 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02498 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02499 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02500 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>,
02501 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>,
02502 two_phase_port<typename std::tuple_element<5,OutputTuple>::type>,
02503 two_phase_port<typename std::tuple_element<6,OutputTuple>::type> >,
02504 OutputTuple
02505 > {
02506 private:
02507 typedef typename std::tuple<
02508 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02509 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02510 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02511 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>,
02512 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>,
02513 two_phase_port<typename std::tuple_element<5,OutputTuple>::type>,
02514 two_phase_port<typename std::tuple_element<6,OutputTuple>::type> > port_tuple_type;
02515 public:
02516 typedef OutputTuple output_type;
02517 private:
02518 typedef join_node_base<two_phase, port_tuple_type, output_type > base_type;
02519 public:
02520 unfolded_join_node(graph &g) : base_type(g) {}
02521 };
02522
02523 template<typename OutputTuple>
02524 class unfolded_join_node<8,OutputTuple,two_phase> : public internal::join_node_base<two_phase,
02525 std::tuple<
02526 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02527 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02528 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02529 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>,
02530 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>,
02531 two_phase_port<typename std::tuple_element<5,OutputTuple>::type>,
02532 two_phase_port<typename std::tuple_element<6,OutputTuple>::type>,
02533 two_phase_port<typename std::tuple_element<7,OutputTuple>::type> >,
02534 OutputTuple
02535 > {
02536 private:
02537 typedef typename std::tuple<
02538 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02539 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02540 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02541 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>,
02542 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>,
02543 two_phase_port<typename std::tuple_element<5,OutputTuple>::type>,
02544 two_phase_port<typename std::tuple_element<6,OutputTuple>::type>,
02545 two_phase_port<typename std::tuple_element<7,OutputTuple>::type> > port_tuple_type;
02546 public:
02547 typedef OutputTuple output_type;
02548 private:
02549 typedef join_node_base<two_phase, port_tuple_type, output_type > base_type;
02550 public:
02551 unfolded_join_node(graph &g) : base_type(g) {}
02552 };
02553
02554 template<typename OutputTuple>
02555 class unfolded_join_node<9,OutputTuple,two_phase> : public internal::join_node_base<two_phase,
02556 std::tuple<
02557 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02558 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02559 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02560 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>,
02561 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>,
02562 two_phase_port<typename std::tuple_element<5,OutputTuple>::type>,
02563 two_phase_port<typename std::tuple_element<6,OutputTuple>::type>,
02564 two_phase_port<typename std::tuple_element<7,OutputTuple>::type>,
02565 two_phase_port<typename std::tuple_element<8,OutputTuple>::type> >,
02566 OutputTuple
02567 > {
02568 private:
02569 typedef typename std::tuple<
02570 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02571 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02572 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02573 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>,
02574 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>,
02575 two_phase_port<typename std::tuple_element<5,OutputTuple>::type>,
02576 two_phase_port<typename std::tuple_element<6,OutputTuple>::type>,
02577 two_phase_port<typename std::tuple_element<7,OutputTuple>::type>,
02578 two_phase_port<typename std::tuple_element<8,OutputTuple>::type> > port_tuple_type;
02579 public:
02580 typedef OutputTuple output_type;
02581 private:
02582 typedef join_node_base<two_phase, port_tuple_type, output_type > base_type;
02583 public:
02584 unfolded_join_node(graph &g) : base_type(g) {}
02585 };
02586
02587 template<typename OutputTuple>
02588 class unfolded_join_node<10,OutputTuple,two_phase> : public internal::join_node_base<two_phase,
02589 std::tuple<
02590 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02591 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02592 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02593 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>,
02594 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>,
02595 two_phase_port<typename std::tuple_element<5,OutputTuple>::type>,
02596 two_phase_port<typename std::tuple_element<6,OutputTuple>::type>,
02597 two_phase_port<typename std::tuple_element<7,OutputTuple>::type>,
02598 two_phase_port<typename std::tuple_element<8,OutputTuple>::type>,
02599 two_phase_port<typename std::tuple_element<9,OutputTuple>::type> >,
02600 OutputTuple
02601 > {
02602 private:
02603 typedef typename std::tuple<
02604 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02605 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02606 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02607 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>,
02608 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>,
02609 two_phase_port<typename std::tuple_element<5,OutputTuple>::type>,
02610 two_phase_port<typename std::tuple_element<6,OutputTuple>::type>,
02611 two_phase_port<typename std::tuple_element<7,OutputTuple>::type>,
02612 two_phase_port<typename std::tuple_element<8,OutputTuple>::type>,
02613 two_phase_port<typename std::tuple_element<9,OutputTuple>::type> > port_tuple_type;
02614 public:
02615 typedef OutputTuple output_type;
02616 private:
02617 typedef join_node_base<two_phase, port_tuple_type, output_type > base_type;
02618 public:
02619 unfolded_join_node(graph &g) : base_type(g) {}
02620 };
02621
02622 }
02623
02624 using namespace internal::join_policy_namespace;
02625
02626 template<typename OutputTuple, join_policy JP=two_phase>
02627 class join_node: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, OutputTuple, JP> {
02628 private:
02629 static const int N = std::tuple_size<OutputTuple>::value;
02630 typedef typename internal::unfolded_join_node<N, OutputTuple, JP> unfolded_type;
02631 public:
02632 typedef OutputTuple output_type;
02633 join_node(graph &g) : unfolded_type(g) { }
02634 };
02635
02636
02637
02638
02639
02641 template< typename T >
02642 inline void make_edge( sender<T> &p, receiver<T> &s ) {
02643 p.register_successor( s );
02644 }
02645
02647 template< typename T, typename SIterator >
02648 inline void make_edges( sender<T> &p, SIterator s_begin, SIterator s_end ) {
02649 for ( SIterator i = s_begin; i != s_end; ++i ) {
02650 make_edge( p, **i );
02651 }
02652 }
02653
02655 template< typename T, typename PIterator >
02656 inline void make_edges( PIterator p_begin, PIterator p_end, receiver<T> &s ) {
02657 for ( PIterator i = p_begin; i != p_end; ++i ) {
02658 make_edge( **i, s );
02659 }
02660 }
02661
02662 }
02663
02664 #endif
02665