Sinkpads assigned to operate in pull-based mode, while none of its sourcepads operate in pull-based mode (or it has no sourcepads), can start a task that will drive the pipeline dataflow. Within this function, those elements have random access over all of their sinkpads, and push data over their sourcepads. This can come in useful for several different kinds of elements:
Demuxers, parsers and certain kinds of decoders where data comes in unparsed (such as MPEG-audio or video streams), since those will prefer byte-exact (random) access from their input. If possible, however, such elements should be prepared to operate in chain-based mode, too.
Certain kind of audio outputs, which require control over their input dataflow, such as the Jack sound server.
In order to start this task, you will need to create it in the activation function.
#include "filter.h" #include <string.h> static gboolean gst_my_filter_activate (GstPad * pad); static gboolean gst_my_filter_activate_pull (GstPad * pad, gboolean active); static void gst_my_filter_loop (GstMyFilter * filter); GST_BOILERPLATE (GstMyFilter, gst_my_filter, GstElement, GST_TYPE_ELEMENT); static void gst_my_filter_init (GstMyFilter * filter) { [..] gst_pad_set_activate_function (filter->sinkpad, gst_my_filter_activate); gst_pad_set_activatepull_function (filter->sinkpad, gst_my_filter_activate_pull); [..] } [..] static gboolean gst_my_filter_activate (GstPad * pad) { if (gst_pad_check_pull_range (pad)) { return gst_pad_activate_pull (pad, TRUE); } else { return FALSE; } } static gboolean gst_my_filter_activate_pull (GstPad *pad, gboolean active) { GstMyFilter *filter = GST_MY_FILTER (GST_OBJECT_PARENT (pad)); if (active) { filter->offset = 0; return gst_pad_start_task (pad, (GstTaskFunction) gst_my_filter_loop, filter); } else { return gst_pad_stop_task (pad); } }
Once started, your task has full control over input and output. The most simple case of a task function is one that reads input and pushes that over its source pad. It's not all that useful, but provides some more flexibility than the old chain-based case that we've been looking at so far.
#define BLOCKSIZE 2048 static void gst_my_filter_loop (GstMyFilter * filter) { GstFlowReturn ret; guint64 len; GstFormat fmt = GST_FORMAT_BYTES; GstBuffer *buf = NULL; if (!gst_pad_query_duration (filter->sinkpad, &fmt, &len)) { GST_DEBUG_OBJECT (filter, "failed to query duration, pausing"); goto stop; } if (filter->offset >= len) { GST_DEBUG_OBJECT (filter, "at end of input, sending EOS, pausing"); gst_pad_push_event (filter->srcpad, gst_event_new_eos ()); goto stop; } /* now, read BLOCKSIZE bytes from byte offset filter->offset */ ret = gst_pad_pull_range (filter->sinkpad, filter->offset, BLOCKSIZE, &buf); if (ret != GST_FLOW_OK) { GST_DEBUG_OBJECT (filter, "pull_range failed: %s", gst_flow_get_name (ret)); goto stop; } /* now push buffer downstream */ ret = gst_pad_push (filter->srcpad, buf); buf = NULL; /* gst_pad_push() took ownership of buffer */ if (ret != GST_FLOW_OK) { GST_DEBUG_OBJECT (filter, "pad_push failed: %s", gst_flow_get_name (ret)); goto stop; } /* everything is fine, increase offset and wait for us to be called again */ filter->offset += BLOCKSIZE; return; stop: GST_DEBUG_OBJECT (filter, "pausing task"); gst_pad_pause_task (filter->sinkpad); }