Music Hub  ..
A session-wide music playback service
bus.h
Go to the documentation of this file.
1 /*
2  * Copyright © 2013 Canonical Ltd.
3  * Copyright © 2022 UBports Foundation.
4  *
5  * Contact: Alberto Mardegan <mardy@users.sourceforge.net>
6  *
7  * This program is free software: you can redistribute it and/or modify it
8  * under the terms of the GNU Lesser General Public License version 3,
9  * as published by the Free Software Foundation.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License
17  * along with this program. If not, see <http://www.gnu.org/licenses/>.
18  *
19  * Authored by: Thomas Voß <thomas.voss@canonical.com>
20  */
21 
22 #ifndef GSTREAMER_BUS_H
23 #define GSTREAMER_BUS_H
24 
25 #include <gst/gst.h>
26 
27 #include <QByteArray>
28 #include <QHash>
29 
30 #include <exception>
31 #include <functional>
32 #include <memory>
33 
34 namespace gstreamer
35 {
36 class Bus
37 {
38 public:
39  struct Message
40  {
42  {
43  }
44 
45  Message(GstMessage* msg)
46  : message(msg),
47  type(GST_MESSAGE_TYPE(msg)),
48  source(GST_MESSAGE_SRC_NAME(msg)),
49  sequence_number(gst_message_get_seqnum(msg))
50  {
51  switch(type)
52  {
53  case GST_MESSAGE_UNKNOWN:
54  throw std::runtime_error("Cannot construct message for type unknown");
55  break;
56  case GST_MESSAGE_ERROR:
57  {
58  gst_message_parse_error(
59  msg,
62  cleanup = [this]()
63  {
64  g_error_free(detail.error_warning_info.error);
66  };
67  break;
68  }
69  case GST_MESSAGE_WARNING:
70  gst_message_parse_warning(
71  msg,
74  cleanup = [this]()
75  {
76  g_error_free(detail.error_warning_info.error);
78  };
79  break;
80  case GST_MESSAGE_INFO:
81  gst_message_parse_info(
82  msg,
85  cleanup = [this]()
86  {
87  g_error_free(detail.error_warning_info.error);
89  };
90  break;
91  case GST_MESSAGE_TAG:
92  gst_message_parse_tag(
93  msg,
95  cleanup = [this]()
96  {
97  gst_tag_list_unref(detail.tag.tag_list);
98  };
99  break;
100  case GST_MESSAGE_BUFFERING:
101  gst_message_parse_buffering(
102  msg,
104  break;
105  case GST_MESSAGE_STATE_CHANGED:
106  gst_message_parse_state_changed(
107  msg,
111  break;
112  case GST_MESSAGE_STEP_DONE:
113  gst_message_parse_step_done(
114  msg,
122  );
123  break;
124  case GST_MESSAGE_CLOCK_PROVIDE:
125  gst_message_parse_clock_provide(
126  msg,
129  break;
130  case GST_MESSAGE_CLOCK_LOST:
131  gst_message_parse_clock_lost(
132  msg,
134  break;
135  case GST_MESSAGE_NEW_CLOCK:
136  gst_message_parse_new_clock(
137  msg,
139  break;
140  case GST_MESSAGE_SEGMENT_START:
141  gst_message_parse_segment_start(
142  msg,
145  break;
146  case GST_MESSAGE_SEGMENT_DONE:
147  gst_message_parse_segment_done(
148  msg,
151  break;
152  case GST_MESSAGE_ASYNC_DONE:
153  gst_message_parse_async_done(
154  msg,
156  break;
157  case GST_MESSAGE_STEP_START:
158  gst_message_parse_step_start(
159  msg,
166  break;
167  case GST_MESSAGE_QOS:
168  gst_message_parse_qos(
169  msg,
170  &detail.qos.live,
174  &detail.qos.duration);
175  break;
176  default:
177  break;
178  }
179  }
180 
181  GstMessage* message;
182  GstMessageType type;
183  QByteArray source; // TODO compute it only when needed
184  uint32_t sequence_number;
185 
186  union Detail
187  {
188  Detail() {}
189  ~Detail() {}
191  {
192  GError* error;
193  gchar* debug;
195  struct Tag
196  {
197  Tag(const Tag &t): tag_list(gst_tag_list_ref(t.tag_list)) {}
198  ~Tag() { gst_tag_list_unref(tag_list); }
199  GstTagList* tag_list;
200  } tag;
201  struct
202  {
203  gint percent;
204  } buffering;
205  struct
206  {
207  GstBufferingMode buffering_mode;
208  gint avg_in;
209  gint avg_out;
211  } buffering_stats;
213  {
214  GstState old_state;
215  GstState new_state;
216  GstState pending_state;
217  } state_changed;
218  struct
219  {
220  gboolean active;
221  GstFormat format;
222  guint64 amount;
223  gdouble rate;
224  gboolean flush;
225  gboolean intermediate;
226  } step_start;
227  struct
228  {
229  GstFormat format;
230  guint64 amount;
231  gdouble rate;
232  gboolean flush;
233  gboolean intermediate;
234  guint64 duration;
235  gboolean eos;
236  } step_done;
237  struct
238  {
239  GstClock* clock;
240  gboolean ready;
241  } clock_provide;
242  struct
243  {
244  GstClock* clock;
245  } clock_lost;
246  struct
247  {
248  GstClock* clock;
249  } clock_new;
250  struct
251  {
252  GstFormat format;
253  gint64 position;
254  } segment_start;
255  struct
256  {
257  GstFormat format;
258  gint64 position;
259  } segment_done;
260  struct
261  {
262  GstClockTime running_time;
263  } async_done;
264  struct
265  {
266  gboolean live;
267  guint64 running_time;
268  guint64 stream_time;
269  guint64 timestamp;
270  guint64 duration;
271  } qos;
272  } detail;
273  std::function<void()> cleanup;
274  };
275 
276  static gboolean bus_watch_handler(
277  GstBus* bus,
278  GstMessage* msg,
279  gpointer data)
280  {
281  (void) bus;
282 
283  auto thiz = static_cast<Bus*>(data);
284  Message message(msg);
285  thiz->notifyNewMessage(message);
286 
287  return true;
288  }
289 
290  Bus(GstBus* bus):
291  bus(bus),
293  bus_watch_id(0)
294  {
295  set_bus(bus);
296  }
297 
299  {
300  g_source_remove(bus_watch_id);
301  gst_object_unref(bus);
302  }
303 
304  void set_bus(GstBus* bus)
305  {
306  if (!bus)
307  throw std::runtime_error("Cannot create Bus instance if underlying instance is NULL.");
308 
309  // Use a watch for most messages instead of the sync handler so that our context is not
310  // the same as the streaming thread, which can cause deadlocks in GStreamer
311  // if, for example, attempting to change the pipeline state from this context.
312  bus_watch_id = gst_bus_add_watch(
313  bus,
315  this);
316  }
317 
318  typedef std::function<void(const Message &)> MessageCallback;
319 
320  int onNewMessage(const MessageCallback &cb) {
322  return m_onNewMessageNextId++;
323  }
324 
326  m_onNewMessage.remove(id);
327  }
328 
329  void notifyNewMessage(const Message &msg) const {
330  for (auto cb: m_onNewMessage) {
331  cb(msg);
332  }
333  }
334 
335  GstBus* bus;
336  QHash<int,MessageCallback> m_onNewMessage;
339 };
340 }
341 
342 #endif // GSTREAMER_BUS_H
gstreamer::Bus::Message::Detail::avg_out
gint avg_out
Definition: bus.h:209
gstreamer::Bus::Message::cleanup
std::function< void()> cleanup
Definition: bus.h:273
gstreamer::Bus::Message::Detail::percent
gint percent
Definition: bus.h:203
gstreamer::Bus::Message::Detail::format
GstFormat format
Definition: bus.h:221
gstreamer::Bus::Message::Detail::timestamp
guint64 timestamp
Definition: bus.h:269
gstreamer::Bus::Message
Definition: bus.h:39
gstreamer::Bus::Message::Detail::StateChanged::new_state
GstState new_state
Definition: bus.h:215
gstreamer::Bus::Bus
Bus(GstBus *bus)
Definition: bus.h:290
gstreamer::Bus::Message::sequence_number
uint32_t sequence_number
Definition: bus.h:184
gstreamer::Bus::bus_watch_id
guint bus_watch_id
Definition: bus.h:338
gstreamer::Bus::Message::Detail::buffering_stats
struct gstreamer::Bus::Message::Detail::@1 buffering_stats
gstreamer::Bus::Message::Detail::intermediate
gboolean intermediate
Definition: bus.h:225
gstreamer::Bus::Message::Detail::ErrorWarningInfo::error
GError * error
Definition: bus.h:192
gstreamer::Bus::Message::Detail::buffering
struct gstreamer::Bus::Message::Detail::@0 buffering
gstreamer::Bus::Message::Detail::live
gboolean live
Definition: bus.h:266
gstreamer::Bus
Definition: bus.h:36
gstreamer::Bus::Message::Detail::step_done
struct gstreamer::Bus::Message::Detail::@3 step_done
gstreamer::Bus::m_onNewMessage
QHash< int, MessageCallback > m_onNewMessage
Definition: bus.h:336
gstreamer::Bus::Message::Detail::stream_time
guint64 stream_time
Definition: bus.h:268
gstreamer::Bus::Message::Detail::StateChanged::old_state
GstState old_state
Definition: bus.h:214
gstreamer::Bus::Message::Detail::running_time
GstClockTime running_time
Definition: bus.h:262
gstreamer::Bus::Message::Detail::~Detail
~Detail()
Definition: bus.h:189
gstreamer::Bus::Message::Detail::duration
guint64 duration
Definition: bus.h:234
gstreamer::Bus::~Bus
~Bus()
Definition: bus.h:298
gstreamer::Bus::MessageCallback
std::function< void(const Message &)> MessageCallback
Definition: bus.h:318
gstreamer::Bus::m_onNewMessageNextId
int m_onNewMessageNextId
Definition: bus.h:337
gstreamer::Bus::Message::Detail::position
gint64 position
Definition: bus.h:253
gstreamer::Bus::Message::~Message
~Message()
Definition: bus.h:41
gstreamer::Bus::bus_watch_handler
static gboolean bus_watch_handler(GstBus *bus, GstMessage *msg, gpointer data)
Definition: bus.h:276
gstreamer::Bus::Message::Detail::async_done
struct gstreamer::Bus::Message::Detail::@9 async_done
gstreamer::Bus::Message::Detail::clock
GstClock * clock
Definition: bus.h:239
gstreamer::Bus::Message::Detail::ErrorWarningInfo
Definition: bus.h:190
gstreamer::Bus::Message::Detail::clock_lost
struct gstreamer::Bus::Message::Detail::@5 clock_lost
gstreamer::Bus::Message::Detail::clock_provide
struct gstreamer::Bus::Message::Detail::@4 clock_provide
gstreamer::Bus::unsubscribeFromNewMessage
void unsubscribeFromNewMessage(int id)
Definition: bus.h:325
gstreamer
Definition: bus.h:34
gstreamer::Bus::Message::Detail::error_warning_info
struct gstreamer::Bus::Message::Detail::ErrorWarningInfo error_warning_info
gstreamer::Bus::Message::Detail::segment_start
struct gstreamer::Bus::Message::Detail::@7 segment_start
gstreamer::Bus::Message::Detail::ready
gboolean ready
Definition: bus.h:240
gstreamer::Bus::Message::Detail::eos
gboolean eos
Definition: bus.h:235
gstreamer::Bus::Message::Detail::avg_in
gint avg_in
Definition: bus.h:208
gstreamer::Bus::Message::detail
union gstreamer::Bus::Message::Detail detail
gstreamer::Bus::onNewMessage
int onNewMessage(const MessageCallback &cb)
Definition: bus.h:320
gstreamer::Bus::bus
GstBus * bus
Definition: bus.h:335
gstreamer::Bus::Message::Detail::tag
struct gstreamer::Bus::Message::Detail::Tag tag
gstreamer::Bus::Message::Detail::Detail
Detail()
Definition: bus.h:188
gstreamer::Bus::Message::Detail::Tag::tag_list
GstTagList * tag_list
Definition: bus.h:199
gstreamer::Bus::Message::source
QByteArray source
Definition: bus.h:183
gstreamer::Bus::Message::Detail::buffering_left
gint64 buffering_left
Definition: bus.h:210
gstreamer::Bus::notifyNewMessage
void notifyNewMessage(const Message &msg) const
Definition: bus.h:329
gstreamer::Bus::Message::Detail::flush
gboolean flush
Definition: bus.h:224
gstreamer::Bus::Message::Detail
Definition: bus.h:186
gstreamer::Bus::Message::Detail::ErrorWarningInfo::debug
gchar * debug
Definition: bus.h:193
gstreamer::Bus::Message::Detail::running_time
guint64 running_time
Definition: bus.h:267
gstreamer::Bus::Message::Detail::Tag::Tag
Tag(const Tag &t)
Definition: bus.h:197
gstreamer::Bus::Message::Detail::qos
struct gstreamer::Bus::Message::Detail::@10 qos
gstreamer::Bus::Message::type
GstMessageType type
Definition: bus.h:182
gstreamer::Bus::Message::Detail::amount
guint64 amount
Definition: bus.h:222
gstreamer::Bus::Message::Detail::clock_new
struct gstreamer::Bus::Message::Detail::@6 clock_new
gstreamer::Bus::Message::Detail::step_start
struct gstreamer::Bus::Message::Detail::@2 step_start
gstreamer::Bus::Message::Detail::segment_done
struct gstreamer::Bus::Message::Detail::@8 segment_done
gstreamer::Bus::Message::Detail::rate
gdouble rate
Definition: bus.h:223
gstreamer::Bus::Message::Detail::StateChanged::pending_state
GstState pending_state
Definition: bus.h:216
gstreamer::Bus::Message::message
GstMessage * message
Definition: bus.h:181
gstreamer::Bus::Message::Message
Message(GstMessage *msg)
Definition: bus.h:45
gstreamer::Bus::Message::Detail::state_changed
struct gstreamer::Bus::Message::Detail::StateChanged state_changed
gstreamer::Bus::Message::Detail::buffering_mode
GstBufferingMode buffering_mode
Definition: bus.h:207
gstreamer::Bus::Message::Detail::StateChanged
Definition: bus.h:212
gstreamer::Bus::Message::Detail::active
gboolean active
Definition: bus.h:220
gstreamer::Bus::set_bus
void set_bus(GstBus *bus)
Definition: bus.h:304
gstreamer::Bus::Message::Detail::Tag
Definition: bus.h:195
gstreamer::Bus::Message::Detail::Tag::~Tag
~Tag()
Definition: bus.h:198