00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef __TBB_pipeline_H
00022 #define __TBB_pipeline_H
00023
00024 #include "atomic.h"
00025 #include "task.h"
00026 #include <cstddef>
00027
00028 namespace tbb {
00029
00030 class pipeline;
00031 class filter;
00032
00034 namespace internal {
00035
00036
00037 #define __TBB_PIPELINE_VERSION(x) (unsigned char)(x-2)<<1
00038
00039 typedef unsigned long Token;
00040 typedef long tokendiff_t;
00041 class stage_task;
00042 class input_buffer;
00043 class pipeline_root_task;
00044 class pipeline_cleaner;
00045
00046 }
00047
00048 namespace interface5 {
00049 template<typename T, typename U> class filter_t;
00050
00051 namespace internal {
00052 class pipeline_proxy;
00053 }
00054 }
00055
00057
00059
00060 class filter: internal::no_copy {
00061 private:
00063 static filter* not_in_pipeline() {return reinterpret_cast<filter*>(intptr_t(-1));}
00064
00066 static const unsigned char filter_is_serial = 0x1;
00067
00069
00071 static const unsigned char filter_is_out_of_order = 0x1<<4;
00072
00074 static const unsigned char filter_is_bound = 0x1<<5;
00075
00077 static const unsigned char exact_exception_propagation =
00078 #if TBB_USE_CAPTURED_EXCEPTION
00079 0x0;
00080 #else
00081 0x1<<7;
00082 #endif
00083
00084 static const unsigned char current_version = __TBB_PIPELINE_VERSION(5);
00085 static const unsigned char version_mask = 0x7<<1;
00086 public:
00087 enum mode {
00089 parallel = current_version | filter_is_out_of_order,
00091 serial_in_order = current_version | filter_is_serial,
00093 serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
00095 serial = serial_in_order
00096 };
00097 protected:
00098 filter( bool is_serial_ ) :
00099 next_filter_in_pipeline(not_in_pipeline()),
00100 my_input_buffer(NULL),
00101 my_filter_mode(static_cast<unsigned char>((is_serial_ ? serial : parallel) | exact_exception_propagation)),
00102 prev_filter_in_pipeline(not_in_pipeline()),
00103 my_pipeline(NULL),
00104 next_segment(NULL)
00105 {}
00106
00107 filter( mode filter_mode ) :
00108 next_filter_in_pipeline(not_in_pipeline()),
00109 my_input_buffer(NULL),
00110 my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
00111 prev_filter_in_pipeline(not_in_pipeline()),
00112 my_pipeline(NULL),
00113 next_segment(NULL)
00114 {}
00115
00116 public:
00118 bool is_serial() const {
00119 return bool( my_filter_mode & filter_is_serial );
00120 }
00121
00123 bool is_ordered() const {
00124 return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
00125 }
00126
00128 bool is_bound() const {
00129 return ( my_filter_mode & filter_is_bound )==filter_is_bound;
00130 }
00131
00133
00134 virtual void* operator()( void* item ) = 0;
00135
00137
00138 virtual __TBB_EXPORTED_METHOD ~filter();
00139
00140 #if __TBB_TASK_GROUP_CONTEXT
00142
00144 virtual void finalize( void* ) {};
00145 #endif
00146
00147 private:
00149 filter* next_filter_in_pipeline;
00150
00152
00153 internal::input_buffer* my_input_buffer;
00154
00155 friend class internal::stage_task;
00156 friend class internal::pipeline_root_task;
00157 friend class pipeline;
00158 friend class thread_bound_filter;
00159
00161 const unsigned char my_filter_mode;
00162
00164 filter* prev_filter_in_pipeline;
00165
00167 pipeline* my_pipeline;
00168
00170
00171 filter* next_segment;
00172 };
00173
00175
00176 class thread_bound_filter: public filter {
00177 public:
00178 enum result_type {
00179
00180 success,
00181
00182 item_not_available,
00183
00184 end_of_stream
00185 };
00186 protected:
00187 thread_bound_filter(mode filter_mode):
00188 filter(static_cast<mode>(filter_mode | filter::filter_is_bound | filter::exact_exception_propagation))
00189 {}
00190 public:
00192
00197 result_type __TBB_EXPORTED_METHOD try_process_item();
00198
00200
00204 result_type __TBB_EXPORTED_METHOD process_item();
00205
00206 private:
00208 result_type internal_process_item(bool is_blocking);
00209 };
00210
00212
00213 class pipeline {
00214 public:
00216 __TBB_EXPORTED_METHOD pipeline();
00217
00220 virtual __TBB_EXPORTED_METHOD ~pipeline();
00221
00223 void __TBB_EXPORTED_METHOD add_filter( filter& filter_ );
00224
00226 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens );
00227
00228 #if __TBB_TASK_GROUP_CONTEXT
00230 void __TBB_EXPORTED_METHOD run( size_t max_number_of_live_tokens, tbb::task_group_context& context );
00231 #endif
00232
00234 void __TBB_EXPORTED_METHOD clear();
00235
00236 private:
00237 friend class internal::stage_task;
00238 friend class internal::pipeline_root_task;
00239 friend class filter;
00240 friend class thread_bound_filter;
00241 friend class internal::pipeline_cleaner;
00242 friend class tbb::interface5::internal::pipeline_proxy;
00243
00245 filter* filter_list;
00246
00248 filter* filter_end;
00249
00251 task* end_counter;
00252
00254 atomic<internal::Token> input_tokens;
00255
00257 atomic<internal::Token> token_counter;
00258
00260 bool end_of_input;
00261
00263 bool has_thread_bound_filters;
00264
00266 void remove_filter( filter& filter_ );
00267
00269 void __TBB_EXPORTED_METHOD inject_token( task& self );
00270
00271 #if __TBB_TASK_GROUP_CONTEXT
00273 void clear_filters();
00274 #endif
00275 };
00276
00277
00278
00279
00280
00281 namespace interface5 {
00282
00283 namespace internal {
00284 template<typename T, typename U, typename Body> class concrete_filter;
00285 }
00286
00287 class flow_control {
00288 bool is_pipeline_stopped;
00289 flow_control() { is_pipeline_stopped = false; }
00290 template<typename T, typename U, typename Body> friend class internal::concrete_filter;
00291 public:
00292 void stop() { is_pipeline_stopped = true; }
00293 };
00294
00296 namespace internal {
00297
00298 template<typename T, typename U, typename Body>
00299 class concrete_filter: public tbb::filter {
00300 Body my_body;
00301
00302 void* operator()(void* input) {
00303 T* temp_input = (T*)input;
00304
00305 void* output = (void*) new U(my_body(*temp_input));
00306 delete temp_input;
00307 return output;
00308 }
00309
00310 public:
00311 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00312 };
00313
00314 template<typename U, typename Body>
00315 class concrete_filter<void,U,Body>: public filter {
00316 Body my_body;
00317
00318 void* operator()(void*) {
00319 flow_control control;
00320 U temp_output = my_body(control);
00321 void* output = control.is_pipeline_stopped ? NULL : (void*) new U(temp_output);
00322 return output;
00323 }
00324 public:
00325 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00326 };
00327
00328 template<typename T, typename Body>
00329 class concrete_filter<T,void,Body>: public filter {
00330 Body my_body;
00331
00332 void* operator()(void* input) {
00333 T* temp_input = (T*)input;
00334 my_body(*temp_input);
00335 delete temp_input;
00336 return NULL;
00337 }
00338 public:
00339 concrete_filter(tbb::filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00340 };
00341
00342 template<typename Body>
00343 class concrete_filter<void,void,Body>: public filter {
00344 Body my_body;
00345
00347 void* operator()(void*) {
00348 flow_control control;
00349 my_body(control);
00350 void* output = control.is_pipeline_stopped ? NULL : (void*)(intptr_t)-1;
00351 return output;
00352 }
00353 public:
00354 concrete_filter(filter::mode filter_mode, const Body& body) : filter(filter_mode), my_body(body) {}
00355 };
00356
00358
00359 class pipeline_proxy {
00360 tbb::pipeline my_pipe;
00361 public:
00362 pipeline_proxy( const filter_t<void,void>& filter_chain );
00363 ~pipeline_proxy() {
00364 while( filter* f = my_pipe.filter_list )
00365 delete f;
00366 }
00367 tbb::pipeline* operator->() { return &my_pipe; }
00368 };
00369
00371
00372 class filter_node: tbb::internal::no_copy {
00374 tbb::atomic<intptr_t> ref_count;
00375 protected:
00376 filter_node() {
00377 ref_count = 0;
00378 #ifdef __TBB_TEST_FILTER_NODE_COUNT
00379 ++(__TBB_TEST_FILTER_NODE_COUNT);
00380 #endif
00381 }
00382 public:
00384 virtual void add_to( pipeline& ) = 0;
00386 void add_ref() {++ref_count;}
00388 void remove_ref() {
00389 __TBB_ASSERT(ref_count>0,"ref_count underflow");
00390 if( --ref_count==0 )
00391 delete this;
00392 }
00393 virtual ~filter_node() {
00394 #ifdef __TBB_TEST_FILTER_NODE_COUNT
00395 --(__TBB_TEST_FILTER_NODE_COUNT);
00396 #endif
00397 }
00398 };
00399
00401 template<typename T, typename U, typename Body>
00402 class filter_node_leaf: public filter_node {
00403 const tbb::filter::mode mode;
00404 const Body& body;
00405 void add_to( pipeline& p ) {
00406 concrete_filter<T,U,Body>* f = new concrete_filter<T,U,Body>(mode,body);
00407 p.add_filter( *f );
00408 }
00409 public:
00410 filter_node_leaf( tbb::filter::mode m, const Body& b ) : mode(m), body(b) {}
00411 };
00412
00414 class filter_node_join: public filter_node {
00415 friend class filter_node;
00416 filter_node& left;
00417 filter_node& right;
00418 ~filter_node_join() {
00419 left.remove_ref();
00420 right.remove_ref();
00421 }
00422 void add_to( pipeline& p ) {
00423 left.add_to(p);
00424 right.add_to(p);
00425 }
00426 public:
00427 filter_node_join( filter_node& x, filter_node& y ) : left(x), right(y) {
00428 left.add_ref();
00429 right.add_ref();
00430 }
00431 };
00432
00433 }
00435
00436 template<typename T, typename U, typename Body>
00437 filter_t<T,U> make_filter(tbb::filter::mode mode, const Body& body) {
00438 return new internal::filter_node_leaf<T,U,Body>(mode, body);
00439 }
00440
00441 template<typename T, typename V, typename U>
00442 filter_t<T,U> operator& (const filter_t<T,V>& left, const filter_t<V,U>& right) {
00443 __TBB_ASSERT(left.root,"cannot use default-constructed filter_t as left argument of '&'");
00444 __TBB_ASSERT(right.root,"cannot use default-constructed filter_t as right argument of '&'");
00445 return new internal::filter_node_join(*left.root,*right.root);
00446 }
00447
00449 template<typename T, typename U>
00450 class filter_t {
00451 typedef internal::filter_node filter_node;
00452 filter_node* root;
00453 filter_t( filter_node* root_ ) : root(root_) {
00454 root->add_ref();
00455 }
00456 friend class internal::pipeline_proxy;
00457 template<typename T_, typename U_, typename Body>
00458 friend filter_t<T_,U_> make_filter(tbb::filter::mode, const Body& );
00459 template<typename T_, typename V_, typename U_>
00460 friend filter_t<T_,U_> operator& (const filter_t<T_,V_>& , const filter_t<V_,U_>& );
00461 public:
00462 filter_t() : root(NULL) {}
00463 filter_t( const filter_t<T,U>& rhs ) : root(rhs.root) {
00464 if( root ) root->add_ref();
00465 }
00466 template<typename Body>
00467 filter_t( tbb::filter::mode mode, const Body& body ) :
00468 root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
00469 root->add_ref();
00470 }
00471
00472 void operator=( const filter_t<T,U>& rhs ) {
00473
00474
00475 filter_node* old = root;
00476 root = rhs.root;
00477 if( root ) root->add_ref();
00478 if( old ) old->remove_ref();
00479 }
00480 ~filter_t() {
00481 if( root ) root->remove_ref();
00482 }
00483 void clear() {
00484
00485 if( root ) {
00486 filter_node* old = root;
00487 root = NULL;
00488 old->remove_ref();
00489 }
00490 }
00491 };
00492
00493 inline internal::pipeline_proxy::pipeline_proxy( const filter_t<void,void>& filter_chain ) : my_pipe() {
00494 __TBB_ASSERT( filter_chain.root, "cannot apply parallel_pipeline to default-constructed filter_t" );
00495 filter_chain.root->add_to(my_pipe);
00496 }
00497
00498 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain
00499 #if __TBB_TASK_GROUP_CONTEXT
00500 , tbb::task_group_context& context
00501 #endif
00502 ) {
00503 internal::pipeline_proxy pipe(filter_chain);
00504
00505 pipe->run(max_number_of_live_tokens
00506 #if __TBB_TASK_GROUP_CONTEXT
00507 , context
00508 #endif
00509 );
00510 }
00511
00512 #if __TBB_TASK_GROUP_CONTEXT
00513 inline void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t<void,void>& filter_chain) {
00514 tbb::task_group_context context;
00515 parallel_pipeline(max_number_of_live_tokens, filter_chain, context);
00516 }
00517 #endif // __TBB_TASK_GROUP_CONTEXT
00518
00519 }
00520
00521 using interface5::flow_control;
00522 using interface5::filter_t;
00523 using interface5::make_filter;
00524 using interface5::parallel_pipeline;
00525
00526 }
00527
00528 #endif