graph.h

Go to the documentation of this file.
00001 /*
00002     Copyright 2005-2011 Intel Corporation.  All Rights Reserved.
00003 
00004     The source code contained or described herein and all documents related
00005     to the source code ("Material") are owned by Intel Corporation or its
00006     suppliers or licensors.  Title to the Material remains with Intel
00007     Corporation or its suppliers and licensors.  The Material is protected
00008     by worldwide copyright laws and treaty provisions.  No part of the
00009     Material may be used, copied, reproduced, modified, published, uploaded,
00010     posted, transmitted, distributed, or disclosed in any way without
00011     Intel's prior express written permission.
00012 
00013     No license under any patent, copyright, trade secret or other
00014     intellectual property right is granted to or conferred upon you by
00015     disclosure or delivery of the Materials, either expressly, by
00016     implication, inducement, estoppel or otherwise.  Any license under such
00017     intellectual property rights must be express and approved by Intel in
00018     writing.
00019 */
00020 
00021 #ifndef __TBB_graph_H
00022 #define __TBB_graph_H
00023 
00024 #if !TBB_PREVIEW_GRAPH
00025 #error Set TBB_PREVIEW_GRAPH to include graph.h
00026 #endif
00027 
00028 #include "tbb_stddef.h"
00029 #include "atomic.h"
00030 #include "spin_mutex.h"
00031 #include "null_mutex.h"
00032 #include "spin_rw_mutex.h"
00033 #include "null_rw_mutex.h"
00034 #include "task.h"
00035 #include "concurrent_vector.h"
00036 #include "_aggregator_internal.h"
00037 
00038 // use the VC10 or gcc version of tuple if it is available.
00039 #if TBB_IMPLEMENT_CPP0X && (!defined(_MSC_VER) || _MSC_VER < 1600)
00040 #define TBB_PREVIEW_TUPLE 1
00041 #include "compat/tuple"
00042 #else
00043 #include <tuple>
00044 #endif
00045 
00046 #include<list>
00047 #include<queue>
00048 
00049 
00060 namespace tbb {
00061 
00063     class graph_node {
00064     public:
00065         virtual ~graph_node() {} 
00066     }; 
00067 
00069     class continue_msg {};
00070 
00071     template< typename T > class sender;
00072     template< typename T > class receiver;
00073     class continue_receiver;
00074 
00076     template< typename T >
00077     class sender {
00078     public:
00080         typedef T output_type;
00081 
00083         typedef receiver<T> successor_type;
00084 
00085         virtual ~sender() {}
00086 
00088         virtual bool register_successor( successor_type &r ) = 0;
00089 
00091         virtual bool remove_successor( successor_type &r ) = 0;
00092 
00094         virtual bool try_get( T & ) { return false; }
00095 
00097         virtual bool try_reserve( T & ) { return false; }
00098 
00100         virtual bool try_release( ) { return false; }
00101 
00103         virtual bool try_consume( ) { return false; }
00104 
00105     };
00106 
00107 
00109     template< typename T >
00110     class receiver {
00111     public:
00112 
00114         typedef T input_type;
00115 
00117         typedef sender<T> predecessor_type;
00118 
00120         virtual ~receiver() {}
00121 
00123         virtual bool try_put( T t ) = 0;
00124 
00126         virtual bool register_predecessor( predecessor_type & ) { return false; }
00127 
00129         virtual bool remove_predecessor( predecessor_type & ) { return false; }
00130 
00131     };
00132 
00134 
00135     class continue_receiver : public receiver< continue_msg > {
00136     public:
00137 
00139         typedef continue_msg input_type;
00140 
00142         typedef sender< continue_msg > predecessor_type;
00143 
00145         continue_receiver( int number_of_predecessors = 0 ) { 
00146             my_predecessor_count = number_of_predecessors;
00147             my_current_count = 0;
00148         }
00149 
00151         virtual ~continue_receiver() { }
00152 
00154         /* override */ bool register_predecessor( predecessor_type & ) {
00155             spin_mutex::scoped_lock l(my_mutex);
00156             ++my_predecessor_count;
00157             return true;
00158         }
00159 
00161 
00164         /* override */ bool remove_predecessor( predecessor_type & ) {
00165             spin_mutex::scoped_lock l(my_mutex);
00166             --my_predecessor_count;
00167             return true;
00168         }
00169 
00171 
00173         /* override */ bool try_put( input_type ) {
00174             {
00175                 spin_mutex::scoped_lock l(my_mutex);
00176                 if ( ++my_current_count < my_predecessor_count ) 
00177                     return true;
00178                 else
00179                     my_current_count = 0;
00180             }
00181             execute();
00182             return true;
00183         }
00184 
00185     protected:
00186 
00187         spin_mutex my_mutex;
00188         int my_predecessor_count;
00189         int my_current_count;
00190 
00192 
00194         virtual void execute() = 0;
00195 
00196     };
00197 
00199     namespace internal {
00200 
00202         enum node_state { node_state_idle=0, node_state_nonidle=1, node_state_inactive=2 };
00203 
00204 
00206         template< typename Output >
00207         class source_body : no_assign   {
00208         public:
00209             virtual ~source_body() {}
00210             virtual bool operator()(Output &output) = 0;
00211         };
00212 
00214         template< typename Output, typename Body>
00215         class source_body_leaf : public source_body<Output> {
00216         public:
00217             source_body_leaf( Body _body ) : body(_body) { }
00218             /*override */ bool operator()(Output &output) { return body( output ); }
00219         private:
00220             Body body;
00221         };
00222 
00224         template< typename Input, typename Output >
00225             class function_body : no_assign {
00226         public:
00227             virtual ~function_body() {}
00228             virtual Output operator()(Input input) = 0;
00229         };
00230 
00232         template <typename Input, typename Output, typename B>
00233         class function_body_leaf : public function_body< Input, Output > {
00234         public:
00235             function_body_leaf( B _body ) : body(_body) { }
00236             Output operator()(Input i) { return body(i); }
00237 
00238         private:
00239             B body;
00240         };
00241 
00243         template <typename B>
00244         class function_body_leaf< continue_msg, continue_msg, B> : public function_body< continue_msg, continue_msg > {
00245         public:
00246             function_body_leaf( B _body ) : body(_body) { }
00247             continue_msg operator()( continue_msg i ) { 
00248                 body(i); 
00249                 return i; 
00250             }
00251 
00252         private:
00253             B body;
00254         };
00255 
00257         template <typename Input, typename B>
00258         class function_body_leaf< Input, continue_msg, B> : public function_body< Input, continue_msg > {
00259         public:
00260             function_body_leaf( B _body ) : body(_body) { }
00261             continue_msg operator()(Input i) { 
00262                 body(i); 
00263                 return continue_msg();
00264             }
00265 
00266         private:
00267             B body;
00268         };
00269 
00271         template <typename Output, typename B>
00272         class function_body_leaf< continue_msg, Output, B > : public function_body< continue_msg, Output > {
00273         public:
00274             function_body_leaf( B _body ) : body(_body) { }
00275             Output operator()(continue_msg i) { 
00276                 return body(i); 
00277             }
00278 
00279         private:
00280             B body;
00281         };
00282 
00284         template< typename NodeType >
00285         class forward_task : public task {
00286 
00287             NodeType &my_node;
00288 
00289         public:
00290 
00291             forward_task( NodeType &n ) : my_node(n) {}
00292 
00293             task *execute() {
00294                 my_node.forward();
00295                 return NULL;
00296             }
00297         };
00298 
00300         template< typename NodeType, typename Input >
00301         class apply_body_task : public task {
00302 
00303             NodeType &my_node;
00304             Input my_input;
00305 
00306         public:
00307 
00308             apply_body_task( NodeType &n, Input i ) : my_node(n), my_input(i) {}
00309 
00310             task *execute() {
00311                 my_node.apply_body( my_input );
00312                 return NULL;
00313             }
00314         };
00315 
00317         template< typename NodeType >
00318         class source_task : public task {
00319 
00320             NodeType &my_node;
00321 
00322         public:
00323 
00324             source_task( NodeType &n ) : my_node(n) {}
00325 
00326             task *execute() {
00327                 my_node.apply_body( );
00328                 return NULL;
00329             }
00330         };
00331 
00333         template< typename Input, typename Output >
00334         struct empty_body {
00335            Output operator()( Input & ) const { return Output(); } 
00336         };
00337 
00339         template< typename T, typename M=spin_mutex >
00340         class node_cache {
00341             public:
00342 
00343             typedef size_t size_type;
00344 
00345             bool empty() {
00346                 typename my_mutex_type::scoped_lock lock( my_mutex );
00347                 return internal_empty();
00348             }
00349 
00350             void add( T &n ) {
00351                 typename my_mutex_type::scoped_lock lock( my_mutex );
00352                 internal_push(n);
00353             }
00354 
00355             void remove( T &n ) {
00356                 typename my_mutex_type::scoped_lock lock( my_mutex );
00357                 for ( size_t i = internal_size(); i != 0; --i ) {
00358                     T &s = internal_pop();
00359                     if ( &s != &n ) {
00360                         internal_push(s);
00361                     }
00362                 }
00363             }
00364 
00365         protected:
00366 
00367             typedef M my_mutex_type;
00368             my_mutex_type my_mutex;
00369             std::queue< T * > my_q;
00370 
00371             // Assumes lock is held
00372             inline bool internal_empty( )  {
00373                 return my_q.empty();
00374             }
00375 
00376             // Assumes lock is held
00377             inline size_type internal_size( )  {
00378                 return my_q.size(); 
00379             }
00380 
00381             // Assumes lock is held
00382             inline void internal_push( T &n )  {
00383                 my_q.push(&n);
00384             }
00385 
00386             // Assumes lock is held
00387             inline T &internal_pop() {
00388                 T *v = my_q.front();
00389                 my_q.pop();
00390                 return *v;
00391             }
00392 
00393         };
00394 
00396         template< typename T, typename M=spin_mutex >
00397         class predecessor_cache : public node_cache< sender<T>, M > {
00398             public:
00399             typedef M my_mutex_type;
00400             typedef T output_type; 
00401             typedef sender<output_type> predecessor_type;
00402             typedef receiver<output_type> successor_type;
00403 
00404             predecessor_cache( ) : my_owner( NULL ) { }
00405 
00406             void set_owner( successor_type *owner ) { my_owner = owner; }
00407 
00408             bool get_item( output_type &v ) {
00409 
00410                 bool msg = false;
00411 
00412                 do {
00413                     predecessor_type *src;
00414                     {
00415                         typename my_mutex_type::scoped_lock lock(this->my_mutex);
00416                         if ( this->internal_empty() ) {
00417                             break;
00418                         }
00419                         src = &this->internal_pop();
00420                     }
00421 
00422                     // Try to get from this sender
00423                     msg = src->try_get( v );
00424 
00425                     if (msg == false) {
00426                         // Relinquish ownership of the edge
00427                         if ( my_owner) 
00428                             src->register_successor( *my_owner );
00429                     } else {
00430                         // Retain ownership of the edge
00431                         this->add(*src);
00432                     }
00433                 } while ( msg == false );
00434                 return msg;
00435             }
00436 
00437         protected:
00438             successor_type *my_owner;
00439         };
00440 
00442         template< typename T, typename M=spin_mutex >
00443         class reservable_predecessor_cache : public predecessor_cache< T, M > {
00444         public:
00445             typedef M my_mutex_type;
00446             typedef T output_type; 
00447             typedef sender<T> predecessor_type;
00448             typedef receiver<T> successor_type;
00449 
00450             reservable_predecessor_cache( ) : reserved_src(NULL) { }
00451 
00452             bool 
00453             try_reserve( output_type &v ) {
00454                 bool msg = false;
00455 
00456                 do {
00457                     {
00458                         typename my_mutex_type::scoped_lock lock(this->my_mutex);
00459                         if ( reserved_src || this->internal_empty() ) 
00460                             return false;
00461 
00462                         reserved_src = &this->internal_pop();
00463                     }
00464 
00465                     // Try to get from this sender
00466                     msg = reserved_src->try_reserve( v );
00467 
00468                     if (msg == false) {
00469                         typename my_mutex_type::scoped_lock lock(this->my_mutex);
00470                         // Relinquish ownership of the edge
00471                         reserved_src->register_successor( *this->my_owner );
00472                         reserved_src = NULL;
00473                     } else {
00474                         // Retain ownership of the edge
00475                         this->add( *reserved_src );
00476                     }
00477                 } while ( msg == false );
00478 
00479                 return msg;
00480             }
00481 
00482             bool 
00483             try_release( ) {
00484                 reserved_src->try_release( );
00485                 reserved_src = NULL;
00486                 return true;
00487             }
00488 
00489             bool 
00490             try_consume( ) {
00491                 reserved_src->try_consume( );
00492                 reserved_src = NULL;
00493                 return true;
00494             }
00495 
00496         private:
00497             predecessor_type *reserved_src;
00498         };
00499 
00500 
00502         template<typename T, typename M=spin_rw_mutex >
00503         class successor_cache : no_copy {
00504         protected:
00505 
00506             typedef M my_mutex_type;
00507             my_mutex_type my_mutex;
00508 
00509             typedef std::list< receiver<T> * > my_successors_type;
00510             my_successors_type my_successors;
00511 
00512             sender<T> *my_owner;
00513 
00514         public:
00515 
00516             successor_cache( ) : my_owner(NULL) {}
00517 
00518             void set_owner( sender<T> *owner ) { my_owner = owner; }
00519 
00520             virtual ~successor_cache() {}
00521 
00522             void register_successor( receiver<T> &r ) {
00523                 typename my_mutex_type::scoped_lock l(my_mutex, true);
00524                 my_successors.push_back( &r ); 
00525             }
00526 
00527             void remove_successor( receiver<T> &r ) {
00528                 typename my_mutex_type::scoped_lock l(my_mutex, true);
00529                 for ( typename my_successors_type::iterator i = my_successors.begin();
00530                       i != my_successors.end(); ++i ) { 
00531                     if ( *i == & r ) { 
00532                         my_successors.erase(i);
00533                         break;
00534                     }
00535                 }
00536             }
00537 
00538             bool empty() { 
00539                 typename my_mutex_type::scoped_lock l(my_mutex, false);
00540                 return my_successors.empty(); 
00541             }
00542 
00543             virtual bool try_put( T t ) = 0; 
00544          };
00545 
00547         template<>
00548         class successor_cache< continue_msg > : no_copy {
00549         protected:
00550 
00551             typedef spin_rw_mutex my_mutex_type;
00552             my_mutex_type my_mutex;
00553 
00554             typedef std::list< receiver<continue_msg> * > my_successors_type;
00555             my_successors_type my_successors;
00556 
00557             sender<continue_msg> *my_owner;
00558 
00559         public:
00560 
00561             successor_cache( ) : my_owner(NULL) {}
00562 
00563             void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
00564 
00565             virtual ~successor_cache() {}
00566 
00567             void register_successor( receiver<continue_msg> &r ) {
00568                 my_mutex_type::scoped_lock l(my_mutex, true);
00569                 my_successors.push_back( &r ); 
00570                 if ( my_owner )
00571                     r.register_predecessor( *my_owner );
00572             }
00573 
00574             void remove_successor( receiver<continue_msg> &r ) {
00575                 my_mutex_type::scoped_lock l(my_mutex, true);
00576                 for ( my_successors_type::iterator i = my_successors.begin();
00577                       i != my_successors.end(); ++i ) { 
00578                     if ( *i == & r ) { 
00579                         if ( my_owner )
00580                             r.remove_predecessor( *my_owner );
00581                         my_successors.erase(i);
00582                         break;
00583                     }
00584                 }
00585             }
00586 
00587             bool empty() { 
00588                 my_mutex_type::scoped_lock l(my_mutex, false);
00589                 return my_successors.empty(); 
00590             }
00591 
00592             virtual bool try_put( continue_msg t ) = 0; 
00593 
00594          };
00595 
00597         template<typename T, typename M=spin_rw_mutex>
00598         class broadcast_cache : public successor_cache<T, M> {
00599             typedef M my_mutex_type;
00600             typedef std::list< receiver<T> * > my_successors_type;
00601 
00602         public:
00603 
00604             broadcast_cache( ) {}
00605 
00606             bool try_put( T t ) {
00607                 bool msg = false;
00608                 bool upgraded = false;
00609                 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
00610                 typename my_successors_type::iterator i = this->my_successors.begin();
00611                 while ( i != this->my_successors.end() ) {
00612                    if ( (*i)->try_put( t ) == true ) {
00613                        ++i;
00614                        msg = true;
00615                    } else {
00616                       if ( (*i)->register_predecessor(*this->my_owner) ) {
00617                           if (!upgraded) {
00618                               l.upgrade_to_writer();
00619                               upgraded = true;
00620                           }
00621                           i = this->my_successors.erase(i);
00622                       }
00623                       else {
00624                           ++i;
00625                       }
00626                    }
00627                 }
00628                 return msg;
00629             }
00630         };
00631 
00633         template<typename T, typename M=spin_rw_mutex >
00634         class round_robin_cache : public successor_cache<T, M> {
00635             typedef size_t size_type;
00636             typedef M my_mutex_type;
00637             typedef std::list< receiver<T> * > my_successors_type;
00638 
00639         public:
00640 
00641             round_robin_cache( ) {}
00642 
00643             size_type size() {
00644                 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
00645                 return this->my_successors.size();
00646             }
00647 
00648             bool try_put( T t ) {
00649                 bool upgraded = false;
00650                 typename my_mutex_type::scoped_lock l(this->my_mutex, false);
00651                 typename my_successors_type::iterator i = this->my_successors.begin();
00652                 while ( i != this->my_successors.end() ) {
00653                    if ( (*i)->try_put( t ) ) {
00654                        return true;
00655                    } else {
00656                       if ( (*i)->register_predecessor(*this->my_owner) ) {
00657                           if (!upgraded) {
00658                               l.upgrade_to_writer();
00659                               upgraded = true;
00660                           }
00661                           i = this->my_successors.erase(i);
00662                       }
00663                       else {
00664                           ++i;
00665                       }
00666                    }
00667                 }
00668                 return false;
00669             }
00670         };
00671 
00672         template<typename T>
00673         class decrementer : public continue_receiver, internal::no_copy {
00674 
00675             T *my_node;
00676 
00677             void execute() {
00678                 my_node->decrement_counter();
00679             }
00680 
00681         public:
00682            
00683             typedef continue_msg input_type;
00684             typedef continue_msg output_type;
00685             decrementer( int number_of_predecessors = 0 ) : continue_receiver( number_of_predecessors ) { }
00686             void set_owner( T *node ) { my_node = node; }
00687         };
00688 
00689     }
00691 
00692 
00694 
00695     class graph : internal::no_copy {
00696 
00697         template< typename Body >
00698         class run_task : public task {
00699         public: 
00700             run_task( Body& body ) : my_body(body) {}
00701             task *execute() {
00702                 my_body();
00703                 return NULL;
00704             }
00705         private:
00706             Body my_body;
00707         };
00708 
00709         template< typename Receiver, typename Body >
00710         class run_and_put_task : public task {
00711         public: 
00712             run_and_put_task( Receiver &r, Body& body ) : my_receiver(r), my_body(body) {}
00713             task *execute() {
00714                 my_receiver.try_put( my_body() );
00715                 return NULL;
00716             }
00717         private:
00718             Receiver &my_receiver;
00719             Body my_body;
00720         };
00721 
00722     public:
00723 
00725         enum concurrency { unlimited = 0, serial = 1 };
00726 
00728         graph() : my_root_task( new ( task::allocate_root( ) ) empty_task ) {
00729             my_root_task->set_ref_count(1);
00730         }
00731 
00733 
00735         ~graph() {
00736             wait_for_all();
00737             my_root_task->set_ref_count(0);
00738             task::destroy( *my_root_task );
00739         }
00740 
00741 
00743 
00745         void increment_wait_count() { 
00746             if (my_root_task)
00747                 my_root_task->increment_ref_count();
00748         }
00749 
00751 
00753         void decrement_wait_count() { 
00754             if (my_root_task)
00755                 my_root_task->decrement_ref_count(); 
00756         }
00757 
00759 
00761         template< typename Receiver, typename Body >
00762             void run( Receiver &r, Body body ) {
00763            task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
00764                run_and_put_task< Receiver, Body >( r, body ) );
00765         }
00766 
00768 
00770         template< typename Body >
00771         void run( Body body ) {
00772            task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
00773                run_task< Body >( body ) );
00774         }
00775 
00777 
00778         void wait_for_all() {
00779             if (my_root_task)
00780                 my_root_task->wait_for_all();
00781             my_root_task->set_ref_count(1);
00782         }
00783 
00785         task * root_task() {
00786             return my_root_task;
00787         }
00788 
00789     private:
00790 
00791         task *my_root_task;
00792 
00793     };
00794 
00795 
00797     namespace internal {
00798 
00800         template< typename Input, typename Output >
00801         class function_input : public receiver<Input>, no_assign {
00802             typedef sender<Input> predecessor_type;
00803             enum op_stat {WAIT=0, SUCCEEDED, FAILED};
00804             enum op_type {reg_pred, rem_pred, app_body, tryput, try_fwd};
00805 
00806         public:
00808             typedef Input input_type;
00810             typedef Output output_type;
00811 
00813             template< typename Body >
00814             function_input( graph &g, size_t max_concurrency, Body& body )
00815                 : my_root_task(g.root_task()), my_max_concurrency(max_concurrency), my_concurrency(internal::node_state_idle),
00816                   my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ),
00817                 forwarder_busy(false) {
00818                 my_predecessors.set_owner(this);
00819                 my_aggregator.initialize_handler(my_functor_t(this));
00820             }
00821 
00823             virtual ~function_input() { delete my_body; }
00824 
00826             virtual bool try_put( input_type t ) {
00827                if ( my_max_concurrency == 0 ) {
00828                    spawn_body_task( t );
00829                    return true;
00830                } else {
00831                    my_operation op_data(t, tryput);
00832                    my_aggregator.execute(&op_data);
00833                    return op_data.status == SUCCEEDED;
00834                }
00835             }
00836 
00838             /* override */ bool register_predecessor( predecessor_type &src ) {
00839                 my_operation op_data(reg_pred);
00840                 op_data.r = &src;
00841                 my_aggregator.execute(&op_data);
00842                 return true;
00843             }
00844 
00846             /* override */ bool remove_predecessor( predecessor_type &src ) {
00847                 my_operation op_data(rem_pred);
00848                 op_data.r = &src;
00849                 my_aggregator.execute(&op_data);
00850                 return true;
00851             }
00852 
00853         protected:
00854             task *my_root_task;
00855             const size_t my_max_concurrency;
00856             size_t my_concurrency;
00857             function_body<input_type, output_type> *my_body;
00858             predecessor_cache<input_type, null_mutex > my_predecessors;
00859 
00860             virtual broadcast_cache<output_type > &successors() = 0;
00861 
00862         private:
00863             friend class apply_body_task< function_input< input_type, output_type >, input_type >;
00864             friend class forward_task< function_input< input_type, output_type > >;
00865 
00866             class my_operation : public aggregated_operation< my_operation > {
00867             public:
00868                 char type;
00869                 union {
00870                     input_type *elem;
00871                     predecessor_type *r;
00872                 };
00873                 my_operation(const input_type& e, op_type t) :
00874                     type(char(t)), elem(const_cast<input_type*>(&e)) {}
00875                 my_operation(op_type t) : type(char(t)), r(NULL) {}
00876             };
00877 
00878             class my_functor_t {
00879                 function_input<input_type, output_type> *fi;
00880              public:
00881                 my_functor_t() {}
00882                 my_functor_t(function_input<input_type, output_type> *fi_) : fi(fi_) {}
00883                 void operator()(my_operation* op_list) {
00884                     fi->handle_operations(op_list);
00885                 }
00886             };
00887 
00888             bool forwarder_busy;
00889             aggregator< my_functor_t, my_operation > my_aggregator;
00890 
00891             void handle_operations(my_operation *op_list) {
00892                 my_operation *tmp;
00893                 while (op_list) {
00894                     tmp = op_list;
00895                     op_list = op_list->next;
00896                     switch (tmp->type) {
00897                     case reg_pred:
00898                         my_predecessors.add(*(tmp->r));
00899                         __TBB_store_with_release(tmp->status, SUCCEEDED);
00900                         if (!forwarder_busy) {
00901                             forwarder_busy = true;
00902                             spawn_forward_task();
00903                         }
00904                         break;
00905                     case rem_pred:
00906                         my_predecessors.remove(*(tmp->r));
00907                         __TBB_store_with_release(tmp->status, SUCCEEDED);
00908                         break;
00909                     case app_body:
00910                         __TBB_ASSERT(my_max_concurrency != 0, NULL);
00911                         --my_concurrency;
00912                         __TBB_store_with_release(tmp->status, SUCCEEDED);
00913                         if (my_concurrency<my_max_concurrency) {
00914                             input_type i;
00915                             if (my_predecessors.get_item(i)) {
00916                                 ++my_concurrency;
00917                                 spawn_body_task(i);
00918                             }
00919                         }
00920                         break;
00921                     case tryput: internal_try_put(tmp);  break;
00922                     case try_fwd: internal_forward(tmp);  break;
00923                     }
00924                 }
00925             }
00926 
00927 
00929             void internal_try_put(my_operation *op) {
00930                 __TBB_ASSERT(my_max_concurrency != 0, NULL);
00931                 if (my_concurrency < my_max_concurrency) {
00932                    ++my_concurrency;
00933                    spawn_body_task(*(op->elem));
00934                    __TBB_store_with_release(op->status, SUCCEEDED);
00935                } else {
00936                    __TBB_store_with_release(op->status, FAILED);
00937                }
00938             }
00939 
00941             void internal_forward(my_operation *op) {
00942                 if (my_concurrency<my_max_concurrency || !my_max_concurrency) {
00943                     input_type i;
00944                     if (my_predecessors.get_item(i)) {
00945                         ++my_concurrency;
00946                         __TBB_store_with_release(op->status, SUCCEEDED);
00947                         spawn_body_task(i);
00948                         return;
00949                     }
00950                 }
00951                 __TBB_store_with_release(op->status, FAILED);
00952                 forwarder_busy = false;
00953             }
00954 
00956             void apply_body( input_type &i ) {
00957                 successors().try_put( (*my_body)(i) );
00958                 if ( my_max_concurrency != 0 ) {
00959                     my_operation op_data(app_body);
00960                     my_aggregator.execute(&op_data);
00961                 }
00962             }
00963 
00965            inline void spawn_body_task( input_type &input ) {
00966                task::enqueue(*new(task::allocate_additional_child_of(*my_root_task)) apply_body_task<function_input<input_type, output_type>, input_type >(*this, input));
00967            }
00968 
00970            void forward() {
00971                my_operation op_data(try_fwd);
00972                do {
00973                    op_data.status = WAIT;
00974                    my_aggregator.execute(&op_data);
00975                } while (op_data.status == SUCCEEDED);
00976            }
00977 
00979            inline void spawn_forward_task() {
00980                task::enqueue(*new(task::allocate_additional_child_of(*my_root_task)) forward_task<function_input<input_type, output_type> >(*this));
00981            }
00982         };
00983 
00985         template< typename Output >
00986         class continue_input : public continue_receiver {
00987         public:
00988 
00990             typedef continue_msg input_type;
00991     
00993             typedef Output output_type;
00994 
00995             template< typename Body >
00996             continue_input( graph &g, Body& body )
00997                 : my_root_task(g.root_task()), 
00998                  my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) { }
00999 
01000             template< typename Body >
01001             continue_input( graph &g, int number_of_predecessors, Body& body )
01002                 : continue_receiver( number_of_predecessors ), my_root_task(g.root_task()), 
01003                  my_body( new internal::function_body_leaf< input_type, output_type, Body>(body) ) { }
01004 
01005         protected:
01006 
01007             task *my_root_task;
01008             function_body<input_type, output_type> *my_body;
01009 
01010             virtual broadcast_cache<output_type > &successors() = 0; 
01011 
01012             friend class apply_body_task< continue_input< Output >, continue_msg >;
01013 
01015             /* override */ void apply_body( input_type ) {
01016                 successors().try_put( (*my_body)( continue_msg() ) );
01017             }
01018 
01020             /* override */ void execute( ) {
01021                 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
01022                    apply_body_task< continue_input< Output >, continue_msg >( *this, continue_msg() ) ); 
01023             }
01024         };
01025 
01027         template< typename Output >
01028         class function_output : public sender<Output> {
01029         public:
01030 
01031             typedef Output output_type;
01032 
01033             function_output() { }
01034 
01036             /* override */ bool register_successor( receiver<output_type> &r ) {
01037                 successors().register_successor( r );
01038                 return true;
01039             }
01040 
01042             /* override */ bool remove_successor( receiver<output_type> &r ) {
01043                 successors().remove_successor( r );
01044                 return true;
01045             }
01046   
01047         protected:
01048 
01049             virtual broadcast_cache<output_type > &successors() = 0; 
01050 
01051         };
01052 
01053     }
01055 
01057     template < typename Output >
01058     class source_node : public graph_node, public sender< Output > {
01059     public:
01060 
01062         typedef Output output_type;           
01063 
01065         typedef receiver< Output > successor_type;
01066 
01068         template< typename Body >
01069         source_node( graph &g, Body body, bool is_active = true )
01070              : my_root_task(g.root_task()), my_state( is_active ? internal::node_state_idle : internal::node_state_inactive ),
01071               my_body( new internal::source_body_leaf< output_type, Body>(body) ),
01072               my_reserved(false), my_has_cached_item(false) { 
01073             my_successors.set_owner(this);
01074         }
01075 
01077         ~source_node() { delete my_body; }
01078 
01080         /* override */ bool register_successor( receiver<output_type> &r ) {
01081             spin_mutex::scoped_lock lock(my_mutex);
01082             my_successors.register_successor(r);
01083             if ( my_state != internal::node_state_inactive )
01084                 spawn_put();
01085             return true;
01086         }
01087 
01089         /* override */ bool remove_successor( receiver<output_type> &r ) {
01090             spin_mutex::scoped_lock lock(my_mutex);
01091             my_successors.remove_successor(r);
01092             return true;
01093         }
01094 
01096         /*override */ bool try_get( output_type &v ) {
01097             spin_mutex::scoped_lock lock(my_mutex);
01098             if ( my_reserved )  
01099                 return false;
01100 
01101             if ( my_has_cached_item ) {
01102                 v = my_cached_item;
01103                 my_has_cached_item = false;
01104             } else if ( (*my_body)(v) == false ) {
01105                 return false;
01106             }
01107             return true;
01108         }
01109 
01111         /* override */ bool try_reserve( output_type &v ) {
01112             spin_mutex::scoped_lock lock(my_mutex);
01113             if ( my_reserved ) {
01114                 return false;
01115             }
01116 
01117             if ( !my_has_cached_item && (*my_body)(my_cached_item) )  
01118                 my_has_cached_item = true;
01119 
01120             if ( my_has_cached_item ) {
01121                 v = my_cached_item;
01122                 my_reserved = true;
01123                 return true;
01124             } else {
01125                 return false;
01126             }
01127         }
01128 
01130 
01131         /* override */ bool try_release( ) {
01132             spin_mutex::scoped_lock lock(my_mutex);
01133             __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
01134             my_reserved = false;
01135             spawn_put();
01136             return true;
01137         }
01138 
01140         /* override */ bool try_consume( ) {
01141             spin_mutex::scoped_lock lock(my_mutex);
01142             __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
01143             my_reserved = false;
01144             my_has_cached_item = false;
01145             if ( !my_successors.empty() ) {
01146                 spawn_put();
01147             }
01148             return true;
01149         }
01150 
01152         void activate() {
01153             spin_mutex::scoped_lock lock(my_mutex);
01154             my_state = internal::node_state_idle;
01155             if ( !my_successors.empty() )
01156                 spawn_put();
01157         }
01158 
01159     private:
01160 
01161         task *my_root_task;
01162         spin_mutex my_mutex;
01163         internal::node_state my_state;
01164         internal::source_body<output_type> *my_body;
01165         internal::broadcast_cache< output_type > my_successors;
01166         bool my_reserved;
01167         bool my_has_cached_item;
01168         output_type my_cached_item;
01169 
01170         friend class internal::source_task< source_node< output_type > >;
01171 
01173         /* override */ void apply_body( ) {
01174             output_type v;
01175             if ( try_reserve(v) == false )
01176                 return;
01177 
01178             if ( my_successors.try_put( v ) ) 
01179                 try_consume();
01180             else
01181                 try_release();
01182         }
01183 
01185         /* override */ void spawn_put( ) {
01186             task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
01187                internal::source_task< source_node< output_type > >( *this ) ); 
01188         }
01189 
01190     };
01191 
01193     template <typename Input, typename Output = continue_msg >
01194     class function_node : public graph_node, public internal::function_input<Input,Output>, public internal::function_output<Output> {
01195     public:
01196 
01197         typedef Input input_type;
01198         typedef Output output_type;
01199         typedef sender< input_type > predecessor_type;
01200         typedef receiver< output_type > successor_type;
01201 
01203         template< typename Body >
01204         function_node( graph &g, size_t concurrency, Body body )
01205         : internal::function_input<input_type,output_type>( g, concurrency, body ) {
01206             my_successors.set_owner(this);
01207         }
01208 
01209     protected:
01210 
01211         internal::broadcast_cache<output_type> my_successors; 
01212         /* override */ internal::broadcast_cache<output_type> &successors () { return my_successors; }
01213 
01214     };
01215 
01217     template <typename Output>
01218     class executable_node : public graph_node, public internal::continue_input<Output>, public internal::function_output<Output> {
01219     public:
01220 
01221         typedef continue_msg input_type;
01222         typedef Output output_type;
01223         typedef sender< input_type > predecessor_type;
01224         typedef receiver< output_type > successor_type;
01225 
01227          template <typename Body >
01228          executable_node( graph &g, Body body )
01229                  : internal::continue_input<output_type>( g, body ) {
01230              my_successors.set_owner(this);
01231          }
01232 
01234          template <typename Body >
01235          executable_node( graph &g, int number_of_predecessors, Body body )
01236                  : internal::continue_input<output_type>( g, number_of_predecessors, body ) {
01237              my_successors.set_owner(this);
01238          }
01239 
01240     protected:
01241 
01242         internal::broadcast_cache<output_type> my_successors; 
01243         /* override */ internal::broadcast_cache<output_type> &successors () { return my_successors; }
01244 
01245     };
01246 
01247 
01248 
01249     template< typename T >
01250     class overwrite_node : public graph_node, public receiver<T>, public sender<T>, internal::no_copy {
01251     public:
01252 
01253         typedef T input_type;
01254         typedef T output_type;
01255         typedef sender< input_type > predecessor_type;
01256         typedef receiver< output_type > successor_type;
01257 
01258         overwrite_node() : my_buffer_is_valid(false) {
01259             my_successors.set_owner( this );
01260         }
01261 
01262         ~overwrite_node() {}
01263 
01264         /* override */ bool register_successor( successor_type &s ) {
01265             spin_mutex::scoped_lock l( my_mutex );
01266             if ( my_buffer_is_valid ) {
01267                 // We have a valid value that must be forwarded immediately.
01268                 if ( s.try_put( my_buffer ) || !s.register_predecessor( *this  ) ) {
01269                     // We add the successor: it accepted our put or it rejected it but won't let use become a predecessor
01270                     my_successors.register_successor( s );
01271                     return true;
01272                 } else {
01273                     // We don't add the successor: it rejected our put and we became its predecessor instead
01274                     return false;
01275                 }
01276             } else {
01277                 // No valid value yet, just add as successor
01278                 my_successors.register_successor( s );
01279                 return true;
01280             }
01281         }
01282 
01283         /* override */ bool remove_successor( successor_type &s ) {
01284             spin_mutex::scoped_lock l( my_mutex );
01285             my_successors.remove_successor(s);
01286             return true;
01287         }
01288 
01289         /* override */ bool try_put( T v ) {
01290             spin_mutex::scoped_lock l( my_mutex );
01291             my_buffer = v;
01292             my_buffer_is_valid = true;
01293             my_successors.try_put(v);
01294             return true;
01295         }
01296 
01297         /* override */ bool try_get( T &v ) {
01298             spin_mutex::scoped_lock l( my_mutex );
01299             if ( my_buffer_is_valid ) {
01300                 v = my_buffer;
01301                 return true;
01302             } else {
01303                 return false;
01304             }
01305         }
01306 
01307         bool is_valid() {
01308            spin_mutex::scoped_lock l( my_mutex );
01309            return my_buffer_is_valid;
01310         }
01311 
01312         void clear() {
01313            spin_mutex::scoped_lock l( my_mutex );
01314            my_buffer_is_valid = false;
01315         }
01316 
01317     protected:
01318 
01319         spin_mutex my_mutex;
01320         internal::broadcast_cache< T, null_rw_mutex > my_successors;
01321         T my_buffer;
01322         bool my_buffer_is_valid;
01323 
01324     };
01325 
01326     template< typename T >
01327     class write_once_node : public overwrite_node<T> {
01328     public:
01329 
01330         typedef T input_type;
01331         typedef T output_type;
01332         typedef sender< input_type > predecessor_type;
01333         typedef receiver< output_type > successor_type;
01334 
01335         /* override */ bool try_put( T v ) {
01336             spin_mutex::scoped_lock l( this->my_mutex );
01337             if ( this->my_buffer_is_valid ) {
01338                 return false;
01339             } else {
01340                 this->my_buffer = v;
01341                 this->my_buffer_is_valid = true;
01342                 this->my_successors.try_put(v);
01343                 return true;
01344             }
01345         }
01346     };
01347 
01349 
01350     class continue_node : public executable_node< continue_msg > { 
01351     public:
01352 
01353         typedef continue_msg input_type;
01354         typedef continue_msg output_type;
01355         typedef sender< input_type > predecessor_type;
01356         typedef receiver< output_type > successor_type;
01357 
01358         continue_node( graph &g ) : executable_node<continue_msg>( g, internal::empty_body< continue_msg, continue_msg>() ) {}
01359     };
01360 
01362     template <typename T>
01363     class broadcast_node : public graph_node, public receiver<T>, public sender<T>, internal::no_copy {
01364 
01365         internal::broadcast_cache<T> my_successors;
01366 
01367     public:
01368 
01369         typedef T input_type;
01370         typedef T output_type;
01371         typedef sender< input_type > predecessor_type;
01372         typedef receiver< output_type > successor_type;
01373 
01374         broadcast_node( ) {
01375            my_successors.set_owner( this ); 
01376         }
01377 
01379         virtual bool register_successor( receiver<T> &r ) {
01380             my_successors.register_successor( r );
01381             return true;
01382         }
01383 
01385         virtual bool remove_successor( receiver<T> &r ) {
01386             my_successors.remove_successor( r );
01387             return true;
01388         }
01389 
01390         /* override */ bool try_put( T t ) {
01391             my_successors.try_put(t);
01392             return true;
01393         }
01394 
01395     };
01396 
01397 
01399     template <typename T>
01400     class buffer_node : public graph_node, public receiver<T>, public sender<T>, internal::no_copy {
01401     public:
01402         typedef T input_type;
01403         typedef T output_type;
01404         typedef sender< input_type > predecessor_type;
01405         typedef receiver< output_type > successor_type;
01406     protected:
01407         typedef size_t size_type;
01408         typedef std::pair< T, bool > item_type;
01409 
01410         internal::round_robin_cache< T, null_rw_mutex > my_successors;
01411 
01412         task *my_parent;
01413         item_type *my_array;
01414         size_type my_array_size;
01415         static const size_type initial_buffer_size = 4;
01416         size_type my_head;
01417         size_type my_tail;
01418         spin_mutex my_mutex;
01419         bool my_reserved;
01420         size_type my_reserved_id;
01421 
01422         friend class internal::forward_task< buffer_node< T > >;
01423 
01424         enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd};
01425         enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01426 
01427         // implements the aggregator_operation concept
01428         class buffer_operation : public internal::aggregated_operation< buffer_operation > {
01429         public:
01430             char type;
01431             T *elem;
01432             successor_type *r;
01433             buffer_operation(const T& e, op_type t) :
01434                 type(char(t)), elem(const_cast<T*>(&e)), r(NULL) {}
01435             buffer_operation(op_type t) : type(char(t)), r(NULL) {}
01436         };
01437 
01438         class my_functor_t {
01439             buffer_node<T> *bfr;
01440         public:
01441             my_functor_t(buffer_node<T> *bfr_) : bfr(bfr_) {}
01442             my_functor_t() {}
01443             void operator()(buffer_operation* op_list) {
01444                 bfr->handle_operations(op_list);
01445             }
01446         };
01447 
01448         bool forwarder_busy;
01449         internal::aggregator< my_functor_t, buffer_operation> my_aggregator;
01450 
01451         virtual void handle_operations(buffer_operation *op_list) {
01452             buffer_operation *tmp;
01453             bool try_forwarding=false;
01454             while (op_list) {
01455                 tmp = op_list;
01456                 op_list = op_list->next;
01457                 switch (tmp->type) {
01458                 case reg_succ: internal_reg_succ(tmp);  try_forwarding = true; break;
01459                 case rem_succ: internal_rem_succ(tmp); break;
01460                 case req_item: internal_pop(tmp); break;
01461                 case res_item: internal_reserve(tmp); break;
01462                 case rel_res:  internal_release(tmp);  try_forwarding = true; break;
01463                 case con_res:  internal_consume(tmp);  try_forwarding = true; break;
01464                 case put_item: internal_push(tmp);  try_forwarding = true; break;
01465                 case try_fwd:  internal_forward(tmp); break;
01466                 }
01467             }
01468             if (try_forwarding && !forwarder_busy) {
01469                 forwarder_busy = true;
01470                 task::enqueue(*new(task::allocate_additional_child_of(*my_parent)) internal::forward_task< buffer_node<input_type> >(*this));
01471             }
01472         }
01473 
01475         virtual void forward() {
01476             buffer_operation op_data(try_fwd);
01477             do {
01478                 op_data.status = WAIT;
01479                 my_aggregator.execute(&op_data);
01480             } while (op_data.status == SUCCEEDED);
01481         }
01482 
01484         virtual void internal_reg_succ(buffer_operation *op) {
01485             my_successors.register_successor(*(op->r));
01486             __TBB_store_with_release(op->status, SUCCEEDED);
01487         }
01488 
01490         virtual void internal_rem_succ(buffer_operation *op) {
01491             my_successors.remove_successor(*(op->r));
01492             __TBB_store_with_release(op->status, SUCCEEDED);
01493         }
01494 
01496         virtual void internal_forward(buffer_operation *op) {
01497             T i_copy;
01498             bool success = false; // flagged when a successor accepts
01499             size_type counter = my_successors.size();
01500             // Try forwarding, giving each successor a chance
01501             while (counter>0 && my_tail>my_head && my_array[ (my_tail-1) & (my_array_size-1)].second == true ) {
01502                 i_copy = my_array[ (my_tail-1) & (my_array_size-1)].first;
01503                 bool msg = my_successors.try_put(i_copy);
01504                 if ( msg == true ) {
01505                     my_array[ (my_tail-1) & (my_array_size-1)].second = false;
01506                     --my_tail;
01507                     success = true; // found an accepting successor
01508                 }
01509                 --counter;
01510             }
01511             if (success && !counter)
01512                 __TBB_store_with_release(op->status, SUCCEEDED);
01513             else {
01514                 __TBB_store_with_release(op->status, FAILED);
01515                 forwarder_busy = false;
01516             }
01517         }
01518 
01519         virtual void internal_push(buffer_operation *op) {
01520             while( my_tail-my_head >= my_array_size ) {
01521                 grow_my_array( my_tail - my_head + 1 );
01522             }
01523             my_array[my_tail&(my_array_size-1)] = std::make_pair( *(op->elem), true );
01524             ++my_tail;
01525             __TBB_store_with_release(op->status, SUCCEEDED);
01526         }
01527         virtual void internal_pop(buffer_operation *op) {
01528             if ( my_array[(my_tail-1) & (my_array_size-1)].second == false ) {
01529                 __TBB_store_with_release(op->status, FAILED);
01530             }
01531             else {
01532                 *(op->elem) = my_array[(my_tail-1) & (my_array_size-1)].first;
01533                 my_array[(my_tail-1) & (my_array_size-1)].second = false;
01534                 --my_tail;
01535                 __TBB_store_with_release(op->status, SUCCEEDED);
01536             }
01537         }
01538         virtual void internal_reserve(buffer_operation *op) {
01539             if (my_reserved == true || my_array[ my_head & (my_array_size-1)].second == false ) {
01540                 __TBB_store_with_release(op->status, FAILED);
01541             }
01542             else {
01543                 my_reserved = true;
01544                 *(op->elem) = my_array[ my_head & (my_array_size-1)].first;
01545                 my_array[ my_head & (my_array_size-1)].second = false;
01546                 __TBB_store_with_release(op->status, SUCCEEDED);
01547             }
01548         }
01549         virtual void internal_consume(buffer_operation *op) {
01550             my_reserved = false;
01551             ++my_head;
01552             __TBB_store_with_release(op->status, SUCCEEDED);
01553         }
01554 
01555         virtual void internal_release(buffer_operation *op) {
01556             my_array[my_head&(my_array_size-1)].second = true;
01557             my_reserved = false;
01558             __TBB_store_with_release(op->status, SUCCEEDED);
01559         }
01560 
01562         void grow_my_array( size_t minimum_size ) {
01563             size_type old_size = my_array_size;
01564             size_type new_size = old_size ? 2*old_size : initial_buffer_size;
01565             while( new_size<minimum_size )
01566                 new_size*=2;
01567 
01568             item_type* new_array = cache_aligned_allocator<item_type>().allocate(new_size);
01569             item_type* old_array = my_array;
01570 
01571             for( size_type i=0; i<new_size; ++i )
01572                 new_array[i].second = false;
01573 
01574             size_t t=my_head;
01575             for( size_type i=0; i<old_size; ++i, ++t )
01576                 new_array[t&(new_size-1)] = old_array[t&(old_size-1)];
01577             my_array = new_array;
01578             my_array_size = new_size;
01579             if( old_array )
01580                 cache_aligned_allocator<item_type>().deallocate(old_array,old_size);
01581         }
01582 
01583     public:
01585         buffer_node( graph &g ) :
01586             my_parent( g.root_task() ), my_array(NULL), my_array_size(0),
01587             my_head(0), my_tail(0), my_reserved(false), forwarder_busy(false) {
01588             my_successors.set_owner(this);
01589             my_aggregator.initialize_handler(my_functor_t(this));
01590             grow_my_array(initial_buffer_size);
01591         }
01592 
01593         virtual ~buffer_node() {}
01594 
01595         //
01596         // message sender implementation
01597         //
01598 
01600 
01601         /* override */ bool register_successor( receiver<output_type> &r ) {
01602             buffer_operation op_data(reg_succ);
01603             op_data.r = &r;
01604             my_aggregator.execute(&op_data);
01605             return true;
01606         }
01607 
01609 
01611         /* override */ bool remove_successor( receiver<output_type> &r ) {
01612             r.remove_predecessor(*this);
01613             buffer_operation op_data(rem_succ);
01614             op_data.r = &r;
01615             my_aggregator.execute(&op_data);
01616             return true;
01617         }
01618 
01620 
01622         /* override */ bool try_get( T &v ) {
01623             buffer_operation op_data(req_item);
01624             op_data.elem = &v;
01625             my_aggregator.execute(&op_data);
01626             return (op_data.status==SUCCEEDED);
01627         }
01628 
01630 
01632         /* override */ bool try_reserve( T &v ) {
01633             buffer_operation op_data(res_item);
01634             op_data.elem = &v;
01635             my_aggregator.execute(&op_data);
01636             return (op_data.status==SUCCEEDED);
01637         }
01638 
01640 
01641         /* override */ bool try_release() {
01642             buffer_operation op_data(rel_res);
01643             my_aggregator.execute(&op_data);
01644             return true;
01645         }
01646 
01648 
01649         /* override */ bool try_consume() {
01650             buffer_operation op_data(con_res);
01651             my_aggregator.execute(&op_data);
01652             return true;
01653         }
01654 
01656 
01657         /* override */ bool try_put(T t) {
01658             buffer_operation op_data(t, put_item);
01659             my_aggregator.execute(&op_data);
01660             return true;
01661         }
01662     };
01663 
01664 
01666     template <typename T>
01667     class queue_node : public buffer_node<T> {
01668     protected:
01669         typedef typename buffer_node<T>::size_type size_type;
01670         typedef typename buffer_node<T>::buffer_operation queue_operation;
01671 
01672         enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01673 
01675         /* override */ void internal_forward(queue_operation *op) {
01676             T i_copy;
01677             bool success = false; // flagged when a successor accepts
01678             size_type counter = this->my_successors.size();
01679             if (this->my_reserved || this->my_array[ this->my_head & (this->my_array_size-1)].second == false) {
01680                 __TBB_store_with_release(op->status, FAILED);
01681                 this->forwarder_busy = false;
01682                 return;
01683             }
01684             // Keep trying to send items while there is at least one accepting successor
01685             while (counter>0 && this->my_array[ this->my_head & (this->my_array_size-1)].second == true ) {
01686                 i_copy = this->my_array[ this->my_head & (this->my_array_size-1)].first;
01687                 bool msg = this->my_successors.try_put(i_copy);
01688                 if ( msg == true ) {
01689                      this->my_array[ this->my_head & (this->my_array_size-1)].second = false;
01690                      ++(this->my_head);
01691                     success = true; // found an accepting successor
01692                 }
01693                 --counter;
01694             }
01695             if (success && !counter)
01696                 __TBB_store_with_release(op->status, SUCCEEDED);
01697             else {
01698                 __TBB_store_with_release(op->status, FAILED);
01699                 this->forwarder_busy = false;
01700             }
01701         }
01702 
01703         /* override */ void internal_pop(queue_operation *op) {
01704             if ( this->my_reserved == true || this->my_array[ this->my_head & (this->my_array_size-1)].second == false ) {
01705                 __TBB_store_with_release(op->status, FAILED);
01706             }
01707             else {
01708                 *(op->elem) = this->my_array[ this->my_head & (this->my_array_size-1)].first;
01709                 this->my_array[ this->my_head & (this->my_array_size-1)].second = false;
01710                 ++(this->my_head);
01711                 __TBB_store_with_release(op->status, SUCCEEDED);
01712             }
01713         }
01714         /* override */ void internal_reserve(queue_operation *op) {
01715             if (this->my_reserved == true || this->my_array[ this->my_head & (this->my_array_size-1)].second == false ) {
01716                 __TBB_store_with_release(op->status, FAILED);
01717             }
01718             else {
01719                 this->my_reserved = true;
01720                 *(op->elem) = this->my_array[ this->my_head & (this->my_array_size-1)].first;
01721                 __TBB_store_with_release(op->status, SUCCEEDED);
01722             }
01723         }
01724         /* override */ void internal_consume(queue_operation *op) {
01725             this->my_reserved = false;
01726             this->my_array[ this->my_head & (this->my_array_size-1)].second = false;
01727             ++(this->my_head);
01728             __TBB_store_with_release(op->status, SUCCEEDED);
01729         }
01730 
01731     public:
01732 
01733         typedef T input_type;
01734         typedef T output_type;
01735         typedef sender< input_type > predecessor_type;
01736         typedef receiver< output_type > successor_type;
01737 
01739         queue_node( graph &g ) : buffer_node<T>(g) {}
01740     };
01741 
01743     template< typename T >
01744     class sequencer_node : public queue_node<T> {
01745         internal::function_body< T, size_t > *my_sequencer;
01746     public:
01747 
01748         typedef T input_type;
01749         typedef T output_type;
01750         typedef sender< input_type > predecessor_type;
01751         typedef receiver< output_type > successor_type;
01752 
01754         template< typename Sequencer >
01755         sequencer_node( graph &g, const Sequencer& s ) : queue_node<T>(g),
01756             my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {}
01757 
01759         ~sequencer_node() { delete my_sequencer; }
01760     protected:
01761         typedef typename buffer_node<T>::size_type size_type;
01762         typedef typename buffer_node<T>::buffer_operation sequencer_operation;
01763 
01764         enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01765 
01766     private:
01767         /* override */ void internal_push(sequencer_operation *op) {
01768             size_type tag = (*my_sequencer)(*(op->elem));
01769 
01770             this->my_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
01771             while ( this->my_tail - this->my_head >= this->my_array_size ) {
01772                 this->grow_my_array( this->my_tail - this->my_head  + 1);
01773             }
01774             this->my_array[tag&(this->my_array_size-1)] = std::make_pair( *(op->elem), true );
01775             __TBB_store_with_release(op->status, SUCCEEDED);
01776         }
01777     };
01778 
01780     template< typename T, typename Compare = std::less<T> >
01781     class priority_queue_node : public buffer_node<T> {
01782     public:
01783         typedef T input_type;
01784         typedef T output_type;
01785         typedef sender< input_type > predecessor_type;
01786         typedef receiver< output_type > successor_type;
01787 
01789         priority_queue_node( graph &g ) : buffer_node<T>(g), mark(0) {}
01790 
01791     protected:
01792         typedef typename buffer_node<T>::size_type size_type;
01793         typedef typename buffer_node<T>::item_type item_type;
01794         typedef typename buffer_node<T>::buffer_operation prio_operation;
01795 
01796         enum op_stat {WAIT=0, SUCCEEDED, FAILED};
01797 
01798         /* override */ void handle_operations(prio_operation *op_list) {
01799             prio_operation *tmp /*, *pop_list*/ ;
01800             bool try_forwarding=false;
01801             while (op_list) {
01802                 tmp = op_list;
01803                 op_list = op_list->next;
01804                 switch (tmp->type) {
01805                 case buffer_node<T>::reg_succ: this->internal_reg_succ(tmp); try_forwarding = true; break;
01806                 case buffer_node<T>::rem_succ: this->internal_rem_succ(tmp); break;
01807                 case buffer_node<T>::put_item: internal_push(tmp); try_forwarding = true; break;
01808                 case buffer_node<T>::try_fwd: internal_forward(tmp); break;
01809                 case buffer_node<T>::rel_res: internal_release(tmp); try_forwarding = true; break;
01810                 case buffer_node<T>::con_res: internal_consume(tmp); try_forwarding = true; break;
01811                 case buffer_node<T>::req_item: internal_pop(tmp); break;
01812                 case buffer_node<T>::res_item: internal_reserve(tmp); break;
01813                 }
01814             }
01815             // process pops!  for now, no special pop processing
01816             if (mark<this->my_tail) heapify();
01817             if (try_forwarding && !this->forwarder_busy) {
01818                 this->forwarder_busy = true;
01819                 task::enqueue(*new(task::allocate_additional_child_of(*(this->my_parent))) internal::forward_task< buffer_node<input_type> >(*this));
01820             }
01821         }
01822 
01824         /* override */ void internal_forward(prio_operation *op) {
01825             T i_copy;
01826             bool success = false; // flagged when a successor accepts
01827             size_type counter = this->my_successors.size();
01828 
01829             if (this->my_reserved || this->my_tail == 0) {
01830                 __TBB_store_with_release(op->status, FAILED);
01831                 this->forwarder_busy = false;
01832                 return;
01833             }
01834             // Keep trying to send while there exists an accepting successor
01835             while (counter>0 && this->my_tail > 0) {
01836                 i_copy = this->my_array[0].first;
01837                 bool msg = this->my_successors.try_put(i_copy);
01838                 if ( msg == true ) {
01839                      if (mark == this->my_tail) --mark;
01840                     --(this->my_tail);
01841                     this->my_array[0].first=this->my_array[this->my_tail].first;
01842                     if (this->my_tail > 1) // don't reheap for heap of size 1
01843                         reheap();
01844                     success = true; // found an accepting successor
01845                 }
01846                 --counter;
01847             }
01848             if (success && !counter)
01849                 __TBB_store_with_release(op->status, SUCCEEDED);
01850             else {
01851                 __TBB_store_with_release(op->status, FAILED);
01852                 this->forwarder_busy = false;
01853             }
01854         }
01855 
01856         /* override */ void internal_push(prio_operation *op) {
01857             if ( this->my_tail >= this->my_array_size )
01858                 this->grow_my_array( this->my_tail + 1 );
01859             this->my_array[this->my_tail] = std::make_pair( *(op->elem), true );
01860             ++(this->my_tail);
01861             __TBB_store_with_release(op->status, SUCCEEDED);
01862         }
01863         /* override */ void internal_pop(prio_operation *op) {
01864             if ( this->my_reserved == true || this->my_tail == 0 ) {
01865                 __TBB_store_with_release(op->status, FAILED);
01866             }
01867             else {
01868                 if (mark<this->my_tail &&
01869                     compare(this->my_array[0].first,
01870                             this->my_array[this->my_tail-1].first)) {
01871                     // there are newly pushed elems; last one higher than top
01872                     // copy the data
01873                     *(op->elem) = this->my_array[this->my_tail-1].first;
01874                     --(this->my_tail);
01875                     __TBB_store_with_release(op->status, SUCCEEDED);
01876                 }
01877                 else { // extract and push the last element down heap
01878                     *(op->elem) = this->my_array[0].first; // copy the data
01879                     if (mark == this->my_tail) --mark;
01880                     --(this->my_tail);
01881                     __TBB_store_with_release(op->status, SUCCEEDED);
01882                     this->my_array[0].first=this->my_array[this->my_tail].first;
01883                     if (this->my_tail > 1) // don't reheap for heap of size 1
01884                         reheap();
01885                 }
01886             }
01887         }
01888         /* override */ void internal_reserve(prio_operation *op) {
01889             if (this->my_reserved == true || this->my_tail == 0) {
01890                 __TBB_store_with_release(op->status, FAILED);
01891             }
01892             else {
01893                 this->my_reserved = true;
01894                 *(op->elem) = reserved_item = this->my_array[0].first;
01895                 if (mark == this->my_tail) --mark;
01896                 --(this->my_tail);
01897                 __TBB_store_with_release(op->status, SUCCEEDED);
01898                 this->my_array[0].first = this->my_array[this->my_tail].first;
01899                 if (this->my_tail > 1) // don't reheap for heap of size 1
01900                     reheap();
01901             }
01902         }
01903         /* override */ void internal_consume(prio_operation *op) {
01904             this->my_reserved = false;
01905             __TBB_store_with_release(op->status, SUCCEEDED);
01906         }
01907         /* override */ void internal_release(prio_operation *op) {
01908             if (this->my_tail >= this->my_array_size)
01909                 this->grow_my_array( this->my_tail + 1 );
01910             this->my_array[this->my_tail] = std::make_pair(reserved_item, true);
01911             ++(this->my_tail);
01912             this->my_reserved = false;
01913             __TBB_store_with_release(op->status, SUCCEEDED);
01914             heapify();
01915         }
01916     private:
01917         Compare compare;
01918         size_type mark;
01919         input_type reserved_item;
01920 
01921         void heapify() {
01922             if (!mark) mark = 1;
01923             for (; mark<this->my_tail; ++mark) { // for each unheaped element
01924                 size_type cur_pos = mark;
01925                 input_type to_place = this->my_array[mark].first;
01926                 do { // push to_place up the heap
01927                     size_type parent = (cur_pos-1)>>1;
01928                     if (!compare(this->my_array[parent].first, to_place))
01929                         break;
01930                     this->my_array[cur_pos].first = this->my_array[parent].first;
01931                     cur_pos = parent;
01932                 } while( cur_pos );
01933                 this->my_array[cur_pos].first = to_place;
01934             }
01935         }
01936 
01937         void reheap() {
01938             size_type cur_pos=0, child=1;
01939             while (child < mark) {
01940                 size_type target = child;
01941                 if (child+1<mark &&
01942                     compare(this->my_array[child].first,
01943                             this->my_array[child+1].first))
01944                     ++target;
01945                 // target now has the higher priority child
01946                 if (compare(this->my_array[target].first,
01947                             this->my_array[this->my_tail].first))
01948                     break;
01949                 this->my_array[cur_pos].first = this->my_array[target].first;
01950                 cur_pos = target;
01951                 child = (cur_pos<<1)+1;
01952             }
01953             this->my_array[cur_pos].first = this->my_array[this->my_tail].first;
01954         }
01955     };
01956 
01958 
01961     template< typename T >
01962     class limiter_node : public graph_node, public receiver< T >, public sender< T >, internal::no_copy {
01963     public:
01964 
01965         typedef T input_type;
01966         typedef T output_type;
01967         typedef sender< input_type > predecessor_type;
01968         typedef receiver< output_type > successor_type;
01969 
01970     private:
01971 
01972         task *my_root_task;
01973         size_t my_threshold;
01974         size_t my_count;
01975         internal::predecessor_cache< T > my_predecessors;
01976         spin_mutex my_mutex;
01977         internal::broadcast_cache< T > my_successors;
01978 
01979         friend class internal::forward_task< limiter_node<T> >;
01980 
01981         // Let decrementer call decrement_counter()
01982         friend class internal::decrementer< limiter_node<T> >;
01983 
01984         void decrement_counter() {
01985             input_type v;
01986             
01987             // If we can't get / put an item immediately then drop the count
01988             if ( my_predecessors.get_item( v ) == false 
01989                  || my_successors.try_put(v) == false ) {
01990                 spin_mutex::scoped_lock lock(my_mutex);
01991                 --my_count;
01992                 if ( !my_predecessors.empty() ) 
01993                     task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
01994                                 internal::forward_task< limiter_node<T> >( *this ) );
01995             }
01996         }
01997 
01998         void forward() {
01999             {
02000                 spin_mutex::scoped_lock lock(my_mutex);
02001                 if ( my_count < my_threshold ) 
02002                     ++my_count;
02003                 else
02004                     return;
02005             }
02006             decrement_counter();
02007         }
02008 
02009     public:
02010 
02012         internal::decrementer< limiter_node<T> > decrement;
02013 
02015         limiter_node( graph &g, size_t threshold, int number_of_decrement_predecessors = 0 ) : 
02016            my_root_task(g.root_task()), my_threshold(threshold), my_count(0), decrement(number_of_decrement_predecessors) {
02017             my_predecessors.set_owner(this);
02018             my_successors.set_owner(this);
02019             decrement.set_owner(this);
02020         }
02021 
02023         /* override */ bool register_successor( receiver<output_type> &r ) {
02024             my_successors.register_successor(r);
02025             return true;
02026         }
02027 
02029 
02030         /* override */ bool remove_successor( receiver<output_type> &r ) {
02031             r.remove_predecessor(*this);
02032             my_successors.remove_successor(r);
02033             return true;
02034         }
02035 
02037         /* override */ bool try_put( T t ) {
02038             {
02039                 spin_mutex::scoped_lock lock(my_mutex);
02040                 if ( my_count >= my_threshold ) 
02041                     return false;
02042                 else
02043                     ++my_count; 
02044             }
02045 
02046             bool msg = my_successors.try_put(t);
02047 
02048             if ( msg != true ) {
02049                 spin_mutex::scoped_lock lock(my_mutex);
02050                 --my_count;
02051                 if ( !my_predecessors.empty() ) 
02052                     task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
02053                                 internal::forward_task< limiter_node<T> >( *this ) );
02054             }
02055 
02056             return msg;
02057         }
02058 
02060         /* override */ bool register_predecessor( predecessor_type &src ) {
02061             spin_mutex::scoped_lock lock(my_mutex);
02062             my_predecessors.add( src );
02063             if ( my_count < my_threshold && !my_successors.empty() ) 
02064                 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) ) 
02065                                internal::forward_task< limiter_node<T> >( *this ) );
02066             return true;
02067         }
02068 
02070         /* override */ bool remove_predecessor( predecessor_type &src ) {
02071             my_predecessors.remove( src );
02072             return true;
02073         }
02074 
02075     };
02076 
02077     namespace internal {
02078 
02079     struct forwarding_base {
02080         virtual ~forwarding_base() {}
02081         virtual void decrement_port_count() = 0;
02082         virtual void increment_port_count() = 0;
02083     };
02084 
02085     template< int N >
02086     struct join_helper {
02087 
02088         template< typename TupleType, typename PortType >
02089         static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
02090             std::get<N-1>( my_input ).set_join_node_pointer(port);
02091             join_helper<N-1>::set_join_node_pointer( my_input, port );
02092         }
02093         template< typename TupleType >
02094         static inline void consume_reservations( TupleType &my_input ) {
02095             std::get<N-1>( my_input ).consume();
02096             join_helper<N-1>::consume_reservations( my_input );
02097         }
02098 
02099         template< typename TupleType >
02100         static inline void release_my_reservation( TupleType &my_input ) {
02101             std::get<N-1>( my_input ).release();
02102         }
02103 
02104         template <typename TupleType>
02105         static inline void release_reservations( TupleType &my_input) {
02106             join_helper<N-1>::release_reservations(my_input);
02107             release_my_reservation(my_input);
02108         }
02109 
02110         template< typename InputTuple, typename OutputTuple >
02111         static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
02112             if ( !std::get<N-1>( my_input ).reserve( std::get<N-1>( out ) ) ) return false;
02113             if ( !join_helper<N-1>::reserve( my_input, out ) ) {
02114                 release_my_reservation( my_input );
02115                 return false;
02116             }
02117             return true;
02118         }
02119     };
02120 
02121     template< >
02122     struct join_helper<1> {
02123 
02124         template< typename TupleType, typename PortType >
02125         static inline void set_join_node_pointer(TupleType &my_input, PortType *port) {
02126             std::get<0>( my_input ).set_join_node_pointer(port);
02127         }
02128 
02129         template< typename TupleType >
02130         static inline void consume_reservations( TupleType &my_input ) {
02131             std::get<0>( my_input ).consume();
02132         }
02133 
02134         template< typename TupleType >
02135         static inline void release_my_reservation( TupleType &my_input ) {
02136             std::get<0>( my_input ).release();
02137         }
02138         
02139         template<typename TupleType>
02140         static inline void release_reservations( TupleType &my_input) {
02141             release_my_reservation(my_input);
02142         }
02143 
02144         template< typename InputTuple, typename OutputTuple >
02145         static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
02146             return std::get<0>( my_input ).reserve( std::get<0>( out ) );
02147         }
02148     };
02149 
02150     namespace join_policy_namespace {
02151         enum join_policy { two_phase
02152         };
02153     }
02154     using namespace join_policy_namespace;
02155 
02157     template< typename T >
02158     class two_phase_port : public receiver<T> {
02159     public:
02160         typedef T input_type;
02161         typedef sender<T> predecessor_type;
02162 
02164         two_phase_port() : reserved(false) {
02165            my_join = NULL;
02166            my_predecessors.set_owner( this );
02167         }
02168 
02169         // copy constructor
02170         two_phase_port(const two_phase_port& /* other */) : receiver<T>() {
02171             reserved = false;
02172             my_join = NULL;
02173             my_predecessors.set_owner( this );
02174         }
02175 
02176         void set_join_node_pointer(forwarding_base *join) {
02177             my_join = join;
02178         }
02179 
02180         bool try_put( T ) {
02181             return false;
02182         }
02183 
02185         bool register_predecessor( sender<T> &src ) {
02186             spin_mutex::scoped_lock l(my_mutex);
02187             bool no_predecessors = my_predecessors.empty();
02188             my_predecessors.add(src);
02189             if ( no_predecessors ) {
02190                 my_join->decrement_port_count( );
02191             }
02192             return true;
02193         }
02194 
02196         bool remove_predecessor( sender<T> &src ) {
02197             spin_mutex::scoped_lock l(my_mutex);
02198             my_predecessors.remove( src );
02199             if(my_predecessors.empty()) my_join->increment_port_count();
02200             return true;
02201         }
02202 
02204         bool reserve( T &v ) {
02205             spin_mutex::scoped_lock l(my_mutex);
02206             if ( reserved ) {
02207                 return false;
02208             }
02209             if ( my_predecessors.try_reserve( v ) ) {
02210                 reserved = true;
02211                 return true;
02212             } else if ( my_predecessors.empty() ) {
02213                 my_join->increment_port_count();
02214             }
02215             return false;
02216         }
02217 
02219         void release( ) {
02220             spin_mutex::scoped_lock l(my_mutex);
02221             reserved = false;
02222             my_predecessors.try_release( );
02223         }
02224 
02226         void consume( ) {
02227             spin_mutex::scoped_lock l(my_mutex);
02228             reserved = false;
02229             my_predecessors.try_consume( );
02230         }
02231 
02232     private:
02233         spin_mutex my_mutex;
02234         forwarding_base *my_join;
02235         reservable_predecessor_cache< T > my_predecessors;
02236         bool reserved;
02237     };
02238 
02239     template<join_policy JP, typename InputTuple, typename OutputTuple>
02240     class join_node_base;
02241 
02243     template<join_policy JP, typename InputTuple, typename OutputTuple>
02244     class join_node_FE;
02245 
02246     template<typename InputTuple, typename OutputTuple>
02247     class join_node_FE<two_phase, InputTuple, OutputTuple> : public forwarding_base {
02248     public:
02249         static const int N = std::tuple_size<OutputTuple>::value;
02250         typedef OutputTuple output_type;
02251         typedef InputTuple input_type;
02252         typedef join_node_base<two_phase, InputTuple, OutputTuple> my_node_type; // for forwarding
02253 
02254         join_node_FE(graph &g) : my_root_task(g.root_task()), my_node(NULL) {
02255             ports_with_no_inputs = N;
02256             join_helper<N>::set_join_node_pointer(my_inputs, this);
02257         }
02258 
02259         void set_my_node(my_node_type *new_my_node) { my_node = new_my_node; }
02260 
02261        void increment_port_count() {
02262             ++ports_with_no_inputs;
02263         }
02264 
02265         // if all input_ports have predecessors, spawn forward to try and consume tuples
02266         void decrement_port_count() {
02267             if(ports_with_no_inputs.fetch_and_decrement() == 1) {
02268                 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task ) )
02269                     forward_task<my_node_type>(*my_node) );
02270             }
02271         }
02272 
02273         input_type &inputs() { return my_inputs; }
02274     protected:
02275         // all methods on input ports should be called under spin lock from join_node_base.
02276 
02277         bool tuple_build_may_succeed() {
02278             return !ports_with_no_inputs;
02279         }
02280 
02281         bool try_to_make_tuple(output_type &out) {
02282             if(ports_with_no_inputs) return false;
02283             return join_helper<N>::reserve(my_inputs, out);
02284         }
02285 
02286         void tuple_accepted() {
02287             join_helper<N>::consume_reservations(my_inputs);
02288         }
02289         void tuple_rejected() {
02290             join_helper<N>::release_reservations(my_inputs);
02291         }
02292 
02293         input_type my_inputs;
02294         task *my_root_task;
02295         my_node_type *my_node;
02296         atomic<size_t> ports_with_no_inputs;
02297     };
02298 
02300     template<join_policy JP, typename InputTuple, typename OutputTuple>
02301     class join_node_base : public graph_node, public join_node_FE<JP, InputTuple, OutputTuple>,
02302                            public sender<OutputTuple>, no_copy {
02303     public:
02304         typedef OutputTuple output_type;
02305 
02306         typedef receiver<output_type> successor_type;
02307         typedef join_node_FE<JP, InputTuple, OutputTuple> input_ports_type;
02308         using input_ports_type::tuple_build_may_succeed;
02309         using input_ports_type::try_to_make_tuple;
02310         using input_ports_type::tuple_accepted;
02311         using input_ports_type::tuple_rejected;
02312 
02313         join_node_base(graph &g) : input_ports_type(g),  my_root_task(g.root_task()) {
02314             my_successors.set_owner(this);
02315             input_ports_type::set_my_node(this);
02316         }
02317 
02318         bool register_successor(successor_type &r) {
02319             spin_mutex::scoped_lock l(my_mutex);
02320             my_successors.register_successor(r);
02321             if(tuple_build_may_succeed()) {
02322                 task::enqueue( * new ( task::allocate_additional_child_of( *my_root_task) )
02323                         forward_task<join_node_base<JP,InputTuple,OutputTuple> >( *this ) );
02324             }
02325             return true;
02326         }
02327 
02328         template<size_t N>
02329         receiver<typename std::tuple_element<N, OutputTuple>::value> & input_port(void) { return std::get<N>(input_ports_type::inputs()); }
02330 
02331         bool remove_successor( successor_type &r) {
02332             spin_mutex::scoped_lock l(my_mutex);
02333             my_successors.remove_successor(r);
02334             return true;
02335         }
02336 
02337         bool try_get( output_type &v) {
02338             spin_mutex::scoped_lock l(my_mutex);
02339             if(tuple_build_may_succeed()) {
02340                 if(try_to_make_tuple(v)) {
02341                     // successor requested, so acceptance guaranteed
02342                     tuple_accepted();
02343                     return true;
02344                 }
02345             }
02346             return false;
02347         }
02348 
02349     private:
02350         task *my_root_task;
02351         broadcast_cache<output_type, null_rw_mutex> my_successors;
02352         spin_mutex my_mutex;
02353 
02354         friend class forward_task< join_node_base<JP, InputTuple, OutputTuple> >;
02355 
02356         void forward() {
02357             spin_mutex::scoped_lock l(my_mutex);
02358             output_type out;
02359             if(!tuple_build_may_succeed()) return;
02360             while(try_to_make_tuple(out)) {
02361                 if(my_successors.try_put(out)) {
02362                     tuple_accepted();
02363                 }
02364                 else {
02365                     tuple_rejected();
02366                     return;
02367                 }
02368             }
02369         }
02370     };
02371 
02373     //  using tuple_element.
02374     template<int N, typename OutputTuple, join_policy JP>
02375     class unfolded_join_node;
02376 
02377     template<typename OutputTuple>
02378     class unfolded_join_node<2,OutputTuple,two_phase> : public internal::join_node_base<two_phase,
02379         std::tuple<
02380                 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>, 
02381                 two_phase_port<typename std::tuple_element<1,OutputTuple>::type> >,
02382         OutputTuple
02383                   >
02384                   {
02385     private:
02386         typedef typename std::tuple<
02387                 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>, 
02388                 two_phase_port<typename std::tuple_element<1,OutputTuple>::type> > port_tuple_type;
02389     public:
02390         typedef OutputTuple output_type;
02391     private:
02392         typedef join_node_base<two_phase, port_tuple_type, output_type > base_type;
02393     public:
02394         unfolded_join_node(graph &g) : base_type(g) {}
02395     };
02396 
02397     template<typename OutputTuple>
02398     class unfolded_join_node<3,OutputTuple,two_phase> : public internal::join_node_base<two_phase,
02399         std::tuple<
02400                 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02401                 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02402                 two_phase_port<typename std::tuple_element<2,OutputTuple>::type> >,
02403         OutputTuple
02404                     >
02405                     {
02406     private:
02407         typedef typename std::tuple<
02408                 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>, 
02409                 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>, 
02410                 two_phase_port<typename std::tuple_element<2,OutputTuple>::type> > port_tuple_type;
02411     public:
02412         typedef OutputTuple output_type;
02413     private:
02414         typedef join_node_base<two_phase, port_tuple_type, output_type > base_type;
02415     public:
02416         unfolded_join_node(graph &g) : base_type(g) {}
02417     };
02418 
02419     template<typename OutputTuple>
02420     class unfolded_join_node<4,OutputTuple,two_phase> : public internal::join_node_base<two_phase,
02421         std::tuple<
02422                 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02423                 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02424                 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02425                 two_phase_port<typename std::tuple_element<3,OutputTuple>::type> >,
02426         OutputTuple
02427                     > {
02428     private:
02429         typedef typename std::tuple<
02430                 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>, 
02431                 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>, 
02432                 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>, 
02433                 two_phase_port<typename std::tuple_element<3,OutputTuple>::type> > port_tuple_type;
02434     public:
02435         typedef OutputTuple output_type;
02436     private:
02437         typedef join_node_base<two_phase, port_tuple_type, output_type > base_type;
02438     public:
02439         unfolded_join_node(graph &g) : base_type(g) {}
02440     };
02441 
02442     template<typename OutputTuple>
02443     class unfolded_join_node<5,OutputTuple,two_phase> : public internal::join_node_base<two_phase,
02444         std::tuple<
02445                 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02446                 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02447                 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02448                 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>,
02449                 two_phase_port<typename std::tuple_element<4,OutputTuple>::type> >,
02450         OutputTuple
02451                 > {
02452     private:
02453         typedef typename std::tuple<
02454                 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>, 
02455                 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>, 
02456                 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>, 
02457                 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>, 
02458                 two_phase_port<typename std::tuple_element<4,OutputTuple>::type> > port_tuple_type;
02459     public:
02460         typedef OutputTuple output_type;
02461     private:
02462         typedef join_node_base<two_phase, port_tuple_type, output_type > base_type;
02463     public:
02464         unfolded_join_node(graph &g) : base_type(g) {}
02465     };
02466 
02467     template<typename OutputTuple>
02468     class unfolded_join_node<6,OutputTuple,two_phase> : public internal::join_node_base<two_phase,
02469         std::tuple<
02470                 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02471                 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02472                 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02473                 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>,
02474                 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>,
02475                 two_phase_port<typename std::tuple_element<5,OutputTuple>::type> >,
02476         OutputTuple
02477                     > {
02478     private:
02479         typedef typename std::tuple<
02480                 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>, 
02481                 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>, 
02482                 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>, 
02483                 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>, 
02484                 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>, 
02485                 two_phase_port<typename std::tuple_element<5,OutputTuple>::type> > port_tuple_type;
02486     public:
02487         typedef OutputTuple output_type;
02488     private:
02489         typedef join_node_base<two_phase, port_tuple_type, output_type > base_type;
02490     public:
02491         unfolded_join_node(graph &g) : base_type(g) {}
02492     };
02493 
02494     template<typename OutputTuple>
02495     class unfolded_join_node<7,OutputTuple,two_phase> : public internal::join_node_base<two_phase,
02496         std::tuple<
02497                 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02498                 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02499                 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02500                 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>,
02501                 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>,
02502                 two_phase_port<typename std::tuple_element<5,OutputTuple>::type>,
02503                 two_phase_port<typename std::tuple_element<6,OutputTuple>::type> >,
02504         OutputTuple
02505                 > {
02506     private:
02507         typedef typename std::tuple<
02508                 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>, 
02509                 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>, 
02510                 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>, 
02511                 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>, 
02512                 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>, 
02513                 two_phase_port<typename std::tuple_element<5,OutputTuple>::type>, 
02514                 two_phase_port<typename std::tuple_element<6,OutputTuple>::type> > port_tuple_type;
02515     public:
02516         typedef OutputTuple output_type;
02517     private:
02518         typedef join_node_base<two_phase, port_tuple_type, output_type > base_type;
02519     public:
02520         unfolded_join_node(graph &g) : base_type(g) {}
02521     };
02522 
02523     template<typename OutputTuple>
02524     class unfolded_join_node<8,OutputTuple,two_phase> : public internal::join_node_base<two_phase,
02525         std::tuple<
02526                 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02527                 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02528                 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02529                 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>,
02530                 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>,
02531                 two_phase_port<typename std::tuple_element<5,OutputTuple>::type>,
02532                 two_phase_port<typename std::tuple_element<6,OutputTuple>::type>,
02533                 two_phase_port<typename std::tuple_element<7,OutputTuple>::type> >,
02534         OutputTuple
02535                 > {
02536     private:
02537         typedef typename std::tuple<
02538                 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>, 
02539                 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>, 
02540                 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>, 
02541                 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>, 
02542                 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>, 
02543                 two_phase_port<typename std::tuple_element<5,OutputTuple>::type>, 
02544                 two_phase_port<typename std::tuple_element<6,OutputTuple>::type>, 
02545                 two_phase_port<typename std::tuple_element<7,OutputTuple>::type> > port_tuple_type;
02546     public:
02547         typedef OutputTuple output_type;
02548     private:
02549         typedef join_node_base<two_phase, port_tuple_type, output_type > base_type;
02550     public:
02551         unfolded_join_node(graph &g) : base_type(g) {}
02552     };
02553 
02554     template<typename OutputTuple>
02555     class unfolded_join_node<9,OutputTuple,two_phase> : public internal::join_node_base<two_phase,
02556         std::tuple<
02557                 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02558                 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02559                 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02560                 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>,
02561                 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>,
02562                 two_phase_port<typename std::tuple_element<5,OutputTuple>::type>,
02563                 two_phase_port<typename std::tuple_element<6,OutputTuple>::type>,
02564                 two_phase_port<typename std::tuple_element<7,OutputTuple>::type>,
02565                 two_phase_port<typename std::tuple_element<8,OutputTuple>::type> >,
02566         OutputTuple
02567                 > {
02568     private:
02569         typedef typename std::tuple<
02570                 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>, 
02571                 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>, 
02572                 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>, 
02573                 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>, 
02574                 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>, 
02575                 two_phase_port<typename std::tuple_element<5,OutputTuple>::type>, 
02576                 two_phase_port<typename std::tuple_element<6,OutputTuple>::type>, 
02577                 two_phase_port<typename std::tuple_element<7,OutputTuple>::type>, 
02578                 two_phase_port<typename std::tuple_element<8,OutputTuple>::type> > port_tuple_type;
02579     public:
02580         typedef OutputTuple output_type;
02581     private:
02582         typedef join_node_base<two_phase, port_tuple_type, output_type > base_type;
02583     public:
02584         unfolded_join_node(graph &g) : base_type(g) {}
02585     };
02586 
02587     template<typename OutputTuple>
02588     class unfolded_join_node<10,OutputTuple,two_phase> : public internal::join_node_base<two_phase,
02589         std::tuple<
02590                 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>,
02591                 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>,
02592                 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>,
02593                 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>,
02594                 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>,
02595                 two_phase_port<typename std::tuple_element<5,OutputTuple>::type>,
02596                 two_phase_port<typename std::tuple_element<6,OutputTuple>::type>,
02597                 two_phase_port<typename std::tuple_element<7,OutputTuple>::type>,
02598                 two_phase_port<typename std::tuple_element<8,OutputTuple>::type>,
02599                 two_phase_port<typename std::tuple_element<9,OutputTuple>::type> >,
02600         OutputTuple
02601                 > {
02602     private:
02603         typedef typename std::tuple<
02604                 two_phase_port<typename std::tuple_element<0,OutputTuple>::type>, 
02605                 two_phase_port<typename std::tuple_element<1,OutputTuple>::type>, 
02606                 two_phase_port<typename std::tuple_element<2,OutputTuple>::type>, 
02607                 two_phase_port<typename std::tuple_element<3,OutputTuple>::type>, 
02608                 two_phase_port<typename std::tuple_element<4,OutputTuple>::type>, 
02609                 two_phase_port<typename std::tuple_element<5,OutputTuple>::type>, 
02610                 two_phase_port<typename std::tuple_element<6,OutputTuple>::type>, 
02611                 two_phase_port<typename std::tuple_element<7,OutputTuple>::type>, 
02612                 two_phase_port<typename std::tuple_element<8,OutputTuple>::type>, 
02613                 two_phase_port<typename std::tuple_element<9,OutputTuple>::type> > port_tuple_type;
02614     public:
02615         typedef OutputTuple output_type;
02616     private:
02617         typedef join_node_base<two_phase, port_tuple_type, output_type > base_type;
02618     public:
02619         unfolded_join_node(graph &g) : base_type(g) {}
02620     };
02621 
02622     } // namespace internal
02623 
02624 using namespace internal::join_policy_namespace;
02625 
02626 template<typename OutputTuple, join_policy JP=two_phase>
02627 class join_node: public internal::unfolded_join_node<std::tuple_size<OutputTuple>::value, OutputTuple, JP> {
02628 private:
02629     static const int N = std::tuple_size<OutputTuple>::value;
02630     typedef typename internal::unfolded_join_node<N, OutputTuple, JP> unfolded_type;
02631 public:
02632     typedef OutputTuple output_type;
02633     join_node(graph &g) : unfolded_type(g) { }
02634 };
02635 
02636     //
02637     // Making edges
02638     //
02639   
02641     template< typename T >
02642     inline void make_edge( sender<T> &p, receiver<T> &s ) {
02643         p.register_successor( s );
02644     }
02645 
02647     template< typename T, typename SIterator >
02648     inline void make_edges( sender<T> &p, SIterator s_begin, SIterator s_end ) {
02649         for ( SIterator i = s_begin; i != s_end; ++i ) {
02650             make_edge( p, **i );
02651         }
02652     }
02653 
02655     template< typename T, typename PIterator >
02656     inline void make_edges( PIterator p_begin, PIterator p_end, receiver<T> &s ) {
02657         for ( PIterator i = p_begin; i != p_end; ++i ) {
02658             make_edge( **i, s );
02659         }
02660     }
02661 
02662 }
02663 
02664 #endif
02665 

Copyright © 2005-2011 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.