diff options
Diffstat (limited to 'gst/tcp')
-rw-r--r-- | gst/tcp/.gitignore | 5 | ||||
-rw-r--r-- | gst/tcp/Makefile.am | 41 | ||||
-rw-r--r-- | gst/tcp/README | 53 | ||||
-rw-r--r-- | gst/tcp/gstmultifdsink.c | 2974 | ||||
-rw-r--r-- | gst/tcp/gstmultifdsink.h | 286 | ||||
-rw-r--r-- | gst/tcp/gsttcp-marshal.list | 5 | ||||
-rw-r--r-- | gst/tcp/gsttcp.c | 569 | ||||
-rw-r--r-- | gst/tcp/gsttcp.h | 76 | ||||
-rw-r--r-- | gst/tcp/gsttcpclientsink.c | 464 | ||||
-rw-r--r-- | gst/tcp/gsttcpclientsink.h | 91 | ||||
-rw-r--r-- | gst/tcp/gsttcpclientsrc.c | 444 | ||||
-rw-r--r-- | gst/tcp/gsttcpclientsrc.h | 83 | ||||
-rw-r--r-- | gst/tcp/gsttcpplugin.c | 63 | ||||
-rw-r--r-- | gst/tcp/gsttcpplugin.h | 40 | ||||
-rw-r--r-- | gst/tcp/gsttcpserversink.c | 382 | ||||
-rw-r--r-- | gst/tcp/gsttcpserversink.h | 90 | ||||
-rw-r--r-- | gst/tcp/gsttcpserversrc.c | 486 | ||||
-rw-r--r-- | gst/tcp/gsttcpserversrc.h | 89 | ||||
-rw-r--r-- | gst/tcp/tcp.vcproj | 160 |
19 files changed, 0 insertions, 6401 deletions
diff --git a/gst/tcp/.gitignore b/gst/tcp/.gitignore deleted file mode 100644 index 45ededdb..00000000 --- a/gst/tcp/.gitignore +++ /dev/null @@ -1,5 +0,0 @@ -gsttcp-enumtypes.c -gsttcp-enumtypes.h -gsttcp-marshal.c -gsttcp-marshal.h -fdsetstress diff --git a/gst/tcp/Makefile.am b/gst/tcp/Makefile.am deleted file mode 100644 index 02bc526c..00000000 --- a/gst/tcp/Makefile.am +++ /dev/null @@ -1,41 +0,0 @@ -plugin_LTLIBRARIES = libgsttcp.la - -# variables used for enum/marshal generation -glib_enum_headers = gsttcp.h -glib_enum_define = GST_TCP -glib_gen_prefix = gst_tcp -glib_gen_basename = gsttcp - -include $(top_srcdir)/common/gst-glib-gen.mak - -built_sources = gsttcp-enumtypes.c gsttcp-marshal.c -built_headers = gsttcp-enumtypes.h gsttcp-marshal.h - -BUILT_SOURCES = $(built_sources) $(built_headers) - -libgsttcp_la_SOURCES = \ - gsttcpplugin.c \ - gsttcp.c \ - gstmultifdsink.c \ - gsttcpclientsrc.c gsttcpclientsink.c \ - gsttcpserversrc.c gsttcpserversink.c - -nodist_libgsttcp_la_SOURCES = \ - $(built_sources) - -# remove ENABLE_NEW when dataprotocol is stable -libgsttcp_la_CFLAGS = $(GST_PLUGINS_BASE_CFLAGS) $(GST_CFLAGS) -DGST_ENABLE_NEW -libgsttcp_la_LDFLAGS = $(GST_PLUGIN_LDFLAGS) -libgsttcp_la_LIBADD = $(GST_BASE_LIBS) $(GST_GDP_LIBS) $(GST_LIBS) -libgsttcp_la_LIBTOOLFLAGS = --tag=disable-static - -noinst_HEADERS = \ - gsttcpplugin.h \ - gsttcp.h \ - gstmultifdsink.h \ - gsttcpclientsrc.h gsttcpclientsink.h \ - gsttcpserversrc.h gsttcpserversink.h - -CLEANFILES = $(BUILT_SOURCES) - -EXTRA_DIST = gsttcp-marshal.list diff --git a/gst/tcp/README b/gst/tcp/README deleted file mode 100644 index 47e4894b..00000000 --- a/gst/tcp/README +++ /dev/null @@ -1,53 +0,0 @@ -This part of the documentation is for the new tcp elements: -- tcpclientsrc -- tcpclientsink -- tcpserversrc -- tcpserversink - -TESTS ------ -Use these tests to test functionality of the various tcp plugins - -* server: nc -l -p 3000 - client: nc localhost 3000 - everything you type in the server is shown on the client - everything you type in the client is shown on the server - -* server: nc -l -p 3000 - client: gst-launch tcpclientsrc protocol=none port=3000 ! fdsink fd=2 - everything you type in the server is shown on the client - -* server: nc -l -p 3000 - client: gst-launch fdsrc fd=1 ! tcpclientsink protocol=none port=3000 - everything you type in the client is shown on the server - -* server: gst-launch tcpserversrc protocol=none port=3000 ! fdsink fd=2 - client: gst-launch fdsrc fd=1 ! tcpclientsink protocol=none port=3000 - -* server: gst-launch fdsrc fd=1 ! tcpserversink protocol=none port=3000 - client: gst-launch tcpclientsrc protocol=none port=3000 ! fdsink fd=2 - -> Received first buffer without caps set - -TODO ----- -- implement DNS resolution - -multifdsink ------------ -- operation: - - client fd gets added when "add" signal gets emitted on multifdsink - - signal handler creates a GstTCPClient structure, adds it to ->clients, - and adds the fd to ->fd_hash, then emits client-added - - client - - - when a buffer comes in: - - the _render vmethod puts the buffer on the global queue - - and increases bytes_to_serve - - (currently it sets streamheaders, but since this is treated globally - this is wrong - clients can be at different positions in the stream) - - - when a client issues a write (ie requests data): - - when using GDP, if no caps sent yet, send caps first, then set caps_sent - - if streamheader buffers, and we haven't sent yet to this client, - send current streamheader buffers, then set streamheader_sent - - send out buffers diff --git a/gst/tcp/gstmultifdsink.c b/gst/tcp/gstmultifdsink.c deleted file mode 100644 index 4a440e95..00000000 --- a/gst/tcp/gstmultifdsink.c +++ /dev/null @@ -1,2974 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu> - * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org> - * Copyright (C) 2006 Wim Taymans <wim at fluendo dot com> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -/** - * SECTION:element-multifdsink - * @see_also: tcpserversink - * - * This plugin writes incoming data to a set of file descriptors. The - * file descriptors can be added to multifdsink by emitting the #GstMultiFdSink::add signal. - * For each descriptor added, the #GstMultiFdSink::client-added signal will be called. - * - * As of version 0.10.8, a client can also be added with the #GstMultiFdSink::add-full signal - * that allows for more control over what and how much data a client - * initially receives. - * - * Clients can be removed from multifdsink by emitting the #GstMultiFdSink::remove signal. For - * each descriptor removed, the #GstMultiFdSink::client-removed signal will be called. The - * #GstMultiFdSink::client-removed signal can also be fired when multifdsink decides that a - * client is not active anymore or, depending on the value of the - * #GstMultiFdSink:recover-policy property, if the client is reading too slowly. - * In all cases, multifdsink will never close a file descriptor itself. - * The user of multifdsink is responsible for closing all file descriptors. - * This can for example be done in response to the #GstMultiFdSink::client-fd-removed signal. - * Note that multifdsink still has a reference to the file descriptor when the - * #GstMultiFdSink::client-removed signal is emitted, so that "get-stats" can be performed on - * the descriptor; it is therefore not safe to close the file descriptor in - * the #GstMultiFdSink::client-removed signal handler, and you should use the - * #GstMultiFdSink::client-fd-removed signal to safely close the fd. - * - * Multifdsink internally keeps a queue of the incoming buffers and uses a - * separate thread to send the buffers to the clients. This ensures that no - * client write can block the pipeline and that clients can read with different - * speeds. - * - * When adding a client to multifdsink, the #GstMultiFdSink:sync-method property will define - * which buffer in the queued buffers will be sent first to the client. Clients - * can be sent the most recent buffer (which might not be decodable by the - * client if it is not a keyframe), the next keyframe received in - * multifdsink (which can take some time depending on the keyframe rate), or the - * last received keyframe (which will cause a simple burst-on-connect). - * Multifdsink will always keep at least one keyframe in its internal buffers - * when the sync-mode is set to latest-keyframe. - * - * As of version 0.10.8, there are additional values for the #GstMultiFdSink:sync-method - * property to allow finer control over burst-on-connect behaviour. By selecting - * the 'burst' method a minimum burst size can be chosen, 'burst-keyframe' - * additionally requires that the burst begin with a keyframe, and - * 'burst-with-keyframe' attempts to burst beginning with a keyframe, but will - * prefer a minimum burst size even if it requires not starting with a keyframe. - * - * Multifdsink can be instructed to keep at least a minimum amount of data - * expressed in time or byte units in its internal queues with the the - * #GstMultiFdSink:time-min and #GstMultiFdSink:bytes-min properties respectively. - * These properties are useful if the application adds clients with the - * #GstMultiFdSink::add-full signal to make sure that a burst connect can - * actually be honored. - * - * When streaming data, clients are allowed to read at a different rate than - * the rate at which multifdsink receives data. If the client is reading too - * fast, no data will be send to the client until multifdsink receives more - * data. If the client, however, reads too slowly, data for that client will be - * queued up in multifdsink. Two properties control the amount of data - * (buffers) that is queued in multifdsink: #GstMultiFdSink:buffers-max and - * #GstMultiFdSink:buffers-soft-max. A client that falls behind by - * #GstMultiFdSink:buffers-max is removed from multifdsink forcibly. - * - * A client with a lag of at least #GstMultiFdSink:buffers-soft-max enters the recovery - * procedure which is controlled with the #GstMultiFdSink:recover-policy property. - * A recover policy of NONE will do nothing, RESYNC_LATEST will send the most recently - * received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT - * positions the client to the soft limit in the buffer queue and - * RESYNC_KEYFRAME positions the client at the most recent keyframe in the - * buffer queue. - * - * multifdsink will by default synchronize on the clock before serving the - * buffers to the clients. This behaviour can be disabled by setting the sync - * property to FALSE. Multifdsink will by default not do QoS and will never - * drop late buffers. - * - * Last reviewed on 2006-09-12 (0.10.10) - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif -#include <gst/gst-i18n-plugin.h> - -#include <sys/ioctl.h> - -#ifdef HAVE_UNISTD_H -#include <unistd.h> -#endif - -#include <fcntl.h> -#include <sys/types.h> -#include <sys/socket.h> -#include <sys/stat.h> -#include <netinet/in.h> - -#ifdef HAVE_FIONREAD_IN_SYS_FILIO -#include <sys/filio.h> -#endif - -#include "gstmultifdsink.h" -#include "gsttcp-marshal.h" - -#define NOT_IMPLEMENTED 0 - -/* elementfactory information */ -static const GstElementDetails gst_multi_fd_sink_details = -GST_ELEMENT_DETAILS ("Multi filedescriptor sink", - "Sink/Network", - "Send data to multiple filedescriptors", - "Thomas Vander Stichele <thomas at apestaart dot org>, " - "Wim Taymans <wim@fluendo.com>"); - -static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", - GST_PAD_SINK, - GST_PAD_ALWAYS, - GST_STATIC_CAPS_ANY); - -GST_DEBUG_CATEGORY_STATIC (multifdsink_debug); -#define GST_CAT_DEFAULT (multifdsink_debug) - -/* MultiFdSink signals and args */ -enum -{ - /* methods */ - SIGNAL_ADD, - SIGNAL_ADD_BURST, - SIGNAL_REMOVE, - SIGNAL_REMOVE_FLUSH, - SIGNAL_CLEAR, - SIGNAL_GET_STATS, - - /* signals */ - SIGNAL_CLIENT_ADDED, - SIGNAL_CLIENT_REMOVED, - SIGNAL_CLIENT_FD_REMOVED, - - LAST_SIGNAL -}; - - -/* this is really arbitrarily chosen */ -#define DEFAULT_PROTOCOL GST_TCP_PROTOCOL_NONE -#define DEFAULT_MODE 1 -#define DEFAULT_BUFFERS_MAX -1 -#define DEFAULT_BUFFERS_SOFT_MAX -1 -#define DEFAULT_TIME_MIN -1 -#define DEFAULT_BYTES_MIN -1 -#define DEFAULT_BUFFERS_MIN -1 -#define DEFAULT_UNIT_TYPE GST_TCP_UNIT_TYPE_BUFFERS -#define DEFAULT_UNITS_MAX -1 -#define DEFAULT_UNITS_SOFT_MAX -1 -#define DEFAULT_RECOVER_POLICY GST_RECOVER_POLICY_NONE -#define DEFAULT_TIMEOUT 0 -#define DEFAULT_SYNC_METHOD GST_SYNC_METHOD_LATEST - -#define DEFAULT_BURST_UNIT GST_TCP_UNIT_TYPE_UNDEFINED -#define DEFAULT_BURST_VALUE 0 - -#define DEFAULT_QOS_DSCP -1 -#define DEFAULT_HANDLE_READ TRUE - -#define DEFAULT_RESEND_STREAMHEADER TRUE - -enum -{ - PROP_0, - PROP_PROTOCOL, - PROP_MODE, - PROP_BUFFERS_QUEUED, - PROP_BYTES_QUEUED, - PROP_TIME_QUEUED, - - PROP_UNIT_TYPE, - PROP_UNITS_MAX, - PROP_UNITS_SOFT_MAX, - - PROP_BUFFERS_MAX, - PROP_BUFFERS_SOFT_MAX, - - PROP_TIME_MIN, - PROP_BYTES_MIN, - PROP_BUFFERS_MIN, - - PROP_RECOVER_POLICY, - PROP_TIMEOUT, - PROP_SYNC_METHOD, - PROP_BYTES_TO_SERVE, - PROP_BYTES_SERVED, - - PROP_BURST_UNIT, - PROP_BURST_VALUE, - - PROP_QOS_DSCP, - - PROP_HANDLE_READ, - - PROP_RESEND_STREAMHEADER, - - PROP_NUM_FDS, - - PROP_LAST -}; - -/* For backward compat, we can't really select the poll mode anymore with - * GstPoll. */ -#define GST_TYPE_FDSET_MODE (gst_fdset_mode_get_type()) -static GType -gst_fdset_mode_get_type (void) -{ - static GType fdset_mode_type = 0; - static const GEnumValue fdset_mode[] = { - {0, "Select", "select"}, - {1, "Poll", "poll"}, - {2, "EPoll", "epoll"}, - {0, NULL, NULL}, - }; - - if (!fdset_mode_type) { - fdset_mode_type = g_enum_register_static ("GstFDSetMode", fdset_mode); - } - return fdset_mode_type; -} - -#define GST_TYPE_RECOVER_POLICY (gst_recover_policy_get_type()) -static GType -gst_recover_policy_get_type (void) -{ - static GType recover_policy_type = 0; - static const GEnumValue recover_policy[] = { - {GST_RECOVER_POLICY_NONE, - "Do not try to recover", "none"}, - {GST_RECOVER_POLICY_RESYNC_LATEST, - "Resync client to latest buffer", "latest"}, - {GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT, - "Resync client to soft limit", "soft-limit"}, - {GST_RECOVER_POLICY_RESYNC_KEYFRAME, - "Resync client to most recent keyframe", "keyframe"}, - {0, NULL, NULL}, - }; - - if (!recover_policy_type) { - recover_policy_type = - g_enum_register_static ("GstRecoverPolicy", recover_policy); - } - return recover_policy_type; -} - -#define GST_TYPE_SYNC_METHOD (gst_sync_method_get_type()) -static GType -gst_sync_method_get_type (void) -{ - static GType sync_method_type = 0; - static const GEnumValue sync_method[] = { - {GST_SYNC_METHOD_LATEST, - "Serve starting from the latest buffer", "latest"}, - {GST_SYNC_METHOD_NEXT_KEYFRAME, - "Serve starting from the next keyframe", "next-keyframe"}, - {GST_SYNC_METHOD_LATEST_KEYFRAME, - "Serve everything since the latest keyframe (burst)", - "latest-keyframe"}, - {GST_SYNC_METHOD_BURST, "Serve burst-value data to client", "burst"}, - {GST_SYNC_METHOD_BURST_KEYFRAME, - "Serve burst-value data starting on a keyframe", - "burst-keyframe"}, - {GST_SYNC_METHOD_BURST_WITH_KEYFRAME, - "Serve burst-value data preferably starting on a keyframe", - "burst-with-keyframe"}, - {0, NULL, NULL}, - }; - - if (!sync_method_type) { - sync_method_type = g_enum_register_static ("GstSyncMethod", sync_method); - } - return sync_method_type; -} - -#define GST_TYPE_UNIT_TYPE (gst_unit_type_get_type()) -static GType -gst_unit_type_get_type (void) -{ - static GType unit_type_type = 0; - static const GEnumValue unit_type[] = { - {GST_TCP_UNIT_TYPE_UNDEFINED, "Undefined", "undefined"}, - {GST_TCP_UNIT_TYPE_BUFFERS, "Buffers", "buffers"}, - {GST_TCP_UNIT_TYPE_BYTES, "Bytes", "bytes"}, - {GST_TCP_UNIT_TYPE_TIME, "Time", "time"}, - {0, NULL, NULL}, - }; - - if (!unit_type_type) { - unit_type_type = g_enum_register_static ("GstTCPUnitType", unit_type); - } - return unit_type_type; -} - -#define GST_TYPE_CLIENT_STATUS (gst_client_status_get_type()) -static GType -gst_client_status_get_type (void) -{ - static GType client_status_type = 0; - static const GEnumValue client_status[] = { - {GST_CLIENT_STATUS_OK, "ok", "ok"}, - {GST_CLIENT_STATUS_CLOSED, "Closed", "closed"}, - {GST_CLIENT_STATUS_REMOVED, "Removed", "removed"}, - {GST_CLIENT_STATUS_SLOW, "Too slow", "slow"}, - {GST_CLIENT_STATUS_ERROR, "Error", "error"}, - {GST_CLIENT_STATUS_DUPLICATE, "Duplicate", "duplicate"}, - {GST_CLIENT_STATUS_FLUSHING, "Flushing", "flushing"}, - {0, NULL, NULL}, - }; - - if (!client_status_type) { - client_status_type = - g_enum_register_static ("GstClientStatus", client_status); - } - return client_status_type; -} - -static void gst_multi_fd_sink_finalize (GObject * object); - -static void gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, - GList * link); - -static GstFlowReturn gst_multi_fd_sink_render (GstBaseSink * bsink, - GstBuffer * buf); -static GstStateChangeReturn gst_multi_fd_sink_change_state (GstElement * - element, GstStateChange transition); - -static void gst_multi_fd_sink_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec); -static void gst_multi_fd_sink_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec); - -GST_BOILERPLATE (GstMultiFdSink, gst_multi_fd_sink, GstBaseSink, - GST_TYPE_BASE_SINK); - -static guint gst_multi_fd_sink_signals[LAST_SIGNAL] = { 0 }; - -static void -gst_multi_fd_sink_base_init (gpointer g_class) -{ - GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); - - gst_element_class_add_pad_template (element_class, - gst_static_pad_template_get (&sinktemplate)); - - gst_element_class_set_details (element_class, &gst_multi_fd_sink_details); -} - -static void -gst_multi_fd_sink_class_init (GstMultiFdSinkClass * klass) -{ - GObjectClass *gobject_class; - GstElementClass *gstelement_class; - GstBaseSinkClass *gstbasesink_class; - - gobject_class = (GObjectClass *) klass; - gstelement_class = (GstElementClass *) klass; - gstbasesink_class = (GstBaseSinkClass *) klass; - - gobject_class->set_property = gst_multi_fd_sink_set_property; - gobject_class->get_property = gst_multi_fd_sink_get_property; - gobject_class->finalize = gst_multi_fd_sink_finalize; - - g_object_class_install_property (gobject_class, PROP_PROTOCOL, - g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in", - GST_TYPE_TCP_PROTOCOL, DEFAULT_PROTOCOL, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - /** - * GstMultiFdSink::mode - * - * The mode for selecting activity on the fds. - * - * This property is deprecated since 0.10.18, if will now automatically - * select and use the most optimal method. - */ - g_object_class_install_property (gobject_class, PROP_MODE, - g_param_spec_enum ("mode", "Mode", - "The mode for selecting activity on the fds (deprecated)", - GST_TYPE_FDSET_MODE, DEFAULT_MODE, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_BUFFERS_MAX, - g_param_spec_int ("buffers-max", "Buffers max", - "max number of buffers to queue for a client (-1 = no limit)", -1, - G_MAXINT, DEFAULT_BUFFERS_MAX, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BUFFERS_SOFT_MAX, - g_param_spec_int ("buffers-soft-max", "Buffers soft max", - "Recover client when going over this limit (-1 = no limit)", -1, - G_MAXINT, DEFAULT_BUFFERS_SOFT_MAX, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_BYTES_MIN, - g_param_spec_int ("bytes-min", "Bytes min", - "min number of bytes to queue (-1 = as little as possible)", -1, - G_MAXINT, DEFAULT_BYTES_MIN, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_TIME_MIN, - g_param_spec_int64 ("time-min", "Time min", - "min number of time to queue (-1 = as little as possible)", -1, - G_MAXINT64, DEFAULT_TIME_MIN, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BUFFERS_MIN, - g_param_spec_int ("buffers-min", "Buffers min", - "min number of buffers to queue (-1 = as few as possible)", -1, - G_MAXINT, DEFAULT_BUFFERS_MIN, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_UNIT_TYPE, - g_param_spec_enum ("unit-type", "Units type", - "The unit to measure the max/soft-max/queued properties", - GST_TYPE_UNIT_TYPE, DEFAULT_UNIT_TYPE, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_UNITS_MAX, - g_param_spec_int64 ("units-max", "Units max", - "max number of units to queue (-1 = no limit)", -1, G_MAXINT64, - DEFAULT_UNITS_MAX, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_UNITS_SOFT_MAX, - g_param_spec_int64 ("units-soft-max", "Units soft max", - "Recover client when going over this limit (-1 = no limit)", -1, - G_MAXINT64, DEFAULT_UNITS_SOFT_MAX, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_BUFFERS_QUEUED, - g_param_spec_uint ("buffers-queued", "Buffers queued", - "Number of buffers currently queued", 0, G_MAXUINT, 0, - G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); -#if NOT_IMPLEMENTED - g_object_class_install_property (gobject_class, PROP_BYTES_QUEUED, - g_param_spec_uint ("bytes-queued", "Bytes queued", - "Number of bytes currently queued", 0, G_MAXUINT, 0, - G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_TIME_QUEUED, - g_param_spec_uint64 ("time-queued", "Time queued", - "Number of time currently queued", 0, G_MAXUINT64, 0, - G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); -#endif - - g_object_class_install_property (gobject_class, PROP_RECOVER_POLICY, - g_param_spec_enum ("recover-policy", "Recover Policy", - "How to recover when client reaches the soft max", - GST_TYPE_RECOVER_POLICY, DEFAULT_RECOVER_POLICY, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_TIMEOUT, - g_param_spec_uint64 ("timeout", "Timeout", - "Maximum inactivity timeout in nanoseconds for a client (0 = no limit)", - 0, G_MAXUINT64, DEFAULT_TIMEOUT, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_SYNC_METHOD, - g_param_spec_enum ("sync-method", "Sync Method", - "How to sync new clients to the stream", GST_TYPE_SYNC_METHOD, - DEFAULT_SYNC_METHOD, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BYTES_TO_SERVE, - g_param_spec_uint64 ("bytes-to-serve", "Bytes to serve", - "Number of bytes received to serve to clients", 0, G_MAXUINT64, 0, - G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BYTES_SERVED, - g_param_spec_uint64 ("bytes-served", "Bytes served", - "Total number of bytes send to all clients", 0, G_MAXUINT64, 0, - G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_BURST_UNIT, - g_param_spec_enum ("burst-unit", "Burst unit", - "The format of the burst units (when sync-method is burst[[-with]-keyframe])", - GST_TYPE_UNIT_TYPE, DEFAULT_BURST_UNIT, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_BURST_VALUE, - g_param_spec_uint64 ("burst-value", "Burst value", - "The amount of burst expressed in burst-unit", 0, G_MAXUINT64, - DEFAULT_BURST_VALUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_QOS_DSCP, - g_param_spec_int ("qos-dscp", "QoS diff srv code point", - "Quality of Service, differentiated services code point (-1 default)", - -1, 63, DEFAULT_QOS_DSCP, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - /** - * GstMultiFdSink::handle-read - * - * Handle read requests from clients and discard the data. - * - * Since: 0.10.23 - */ - g_object_class_install_property (gobject_class, PROP_HANDLE_READ, - g_param_spec_boolean ("handle-read", "Handle Read", - "Handle client reads and discard the data", - DEFAULT_HANDLE_READ, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - /** - * GstMultiFdSink::resend-streamheader - * - * Resend the streamheaders to existing clients when they change. - * - * Since: 0.10.23 - */ - g_object_class_install_property (gobject_class, PROP_RESEND_STREAMHEADER, - g_param_spec_boolean ("resend-streamheader", "Resend streamheader", - "Resend the streamheader if it changes in the caps", - DEFAULT_RESEND_STREAMHEADER, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - g_object_class_install_property (gobject_class, PROP_NUM_FDS, - g_param_spec_uint ("num-fds", "Number of fds", - "The current number of client file descriptors.", - 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - - /** - * GstMultiFdSink::add: - * @gstmultifdsink: the multifdsink element to emit this signal on - * @fd: the file descriptor to add to multifdsink - * - * Hand the given open file descriptor to multifdsink to write to. - */ - gst_multi_fd_sink_signals[SIGNAL_ADD] = - g_signal_new ("add", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass, - add), NULL, NULL, g_cclosure_marshal_VOID__INT, G_TYPE_NONE, 1, - G_TYPE_INT); - /** - * GstMultiFdSink::add-full: - * @gstmultifdsink: the multifdsink element to emit this signal on - * @fd: the file descriptor to add to multifdsink - * @sync: the sync method to use - * @unit_type_min: the unit-type of @value_min - * @value_min: the minimum amount of data to burst expressed in - * @unit_type_min units. - * @unit_type_max: the unit-type of @value_max - * @value_max: the maximum amount of data to burst expressed in - * @unit_type_max units. - * - * Hand the given open file descriptor to multifdsink to write to and - * specify the burst parameters for the new connection. - */ - gst_multi_fd_sink_signals[SIGNAL_ADD_BURST] = - g_signal_new ("add-full", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass, - add_full), NULL, NULL, - gst_tcp_marshal_VOID__INT_ENUM_INT_UINT64_INT_UINT64, G_TYPE_NONE, 6, - G_TYPE_INT, GST_TYPE_SYNC_METHOD, GST_TYPE_UNIT_TYPE, G_TYPE_UINT64, - GST_TYPE_UNIT_TYPE, G_TYPE_UINT64); - /** - * GstMultiFdSink::remove: - * @gstmultifdsink: the multifdsink element to emit this signal on - * @fd: the file descriptor to remove from multifdsink - * - * Remove the given open file descriptor from multifdsink. - */ - gst_multi_fd_sink_signals[SIGNAL_REMOVE] = - g_signal_new ("remove", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass, - remove), NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, - G_TYPE_INT); - /** - * GstMultiFdSink::remove-flush: - * @gstmultifdsink: the multifdsink element to emit this signal on - * @fd: the file descriptor to remove from multifdsink - * - * Remove the given open file descriptor from multifdsink after flushing all - * the pending data to the fd. - */ - gst_multi_fd_sink_signals[SIGNAL_REMOVE_FLUSH] = - g_signal_new ("remove-flush", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass, - remove_flush), NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, - G_TYPE_INT); - /** - * GstMultiFdSink::clear: - * @gstmultifdsink: the multifdsink element to emit this signal on - * - * Remove all file descriptors from multifdsink. Since multifdsink did not - * open fd's itself, it does not explicitly close the fd. The application - * should do so by connecting to the client-fd-removed callback. - */ - gst_multi_fd_sink_signals[SIGNAL_CLEAR] = - g_signal_new ("clear", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass, - clear), NULL, NULL, g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); - - /** - * GstMultiFdSink::get-stats: - * @gstmultifdsink: the multifdsink element to emit this signal on - * @fd: the file descriptor to get stats of from multifdsink - * - * Get statistics about @fd. This function returns a GValueArray to ease - * automatic wrapping for bindings. - * - * Returns: a GValueArray with the statistics. The array contains guint64 - * values that represent respectively: total number of bytes sent, time - * when the client was added, time when the client was - * disconnected/removed, time the client is/was active, last activity - * time (in epoch seconds), number of buffers dropped. - * All times are expressed in nanoseconds (GstClockTime). - * The array can be 0-length if the client was not found. - */ - gst_multi_fd_sink_signals[SIGNAL_GET_STATS] = - g_signal_new ("get-stats", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstMultiFdSinkClass, - get_stats), NULL, NULL, gst_tcp_marshal_BOXED__INT, - G_TYPE_VALUE_ARRAY, 1, G_TYPE_INT); - - /** - * GstMultiFdSink::client-added: - * @gstmultifdsink: the multifdsink element that emitted this signal - * @fd: the file descriptor that was added to multifdsink - * - * The given file descriptor was added to multifdsink. This signal will - * be emitted from the streaming thread so application should be prepared - * for that. - */ - gst_multi_fd_sink_signals[SIGNAL_CLIENT_ADDED] = - g_signal_new ("client-added", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, client_added), - NULL, NULL, gst_tcp_marshal_VOID__INT, G_TYPE_NONE, 1, G_TYPE_INT); - /** - * GstMultiFdSink::client-removed: - * @gstmultifdsink: the multifdsink element that emitted this signal - * @fd: the file descriptor that is to be removed from multifdsink - * @status: the reason why the client was removed - * - * The given file descriptor is about to be removed from multifdsink. This - * signal will be emitted from the streaming thread so applications should - * be prepared for that. - * - * @gstmultifdsink still holds a handle to @fd so it is possible to call - * the get-stats signal from this callback. For the same reason it is - * not safe to close() and reuse @fd in this callback. - */ - gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED] = - g_signal_new ("client-removed", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, - client_removed), NULL, NULL, gst_tcp_marshal_VOID__INT_BOXED, - G_TYPE_NONE, 2, G_TYPE_INT, GST_TYPE_CLIENT_STATUS); - /** - * GstMultiFdSink::client-fd-removed: - * @gstmultifdsink: the multifdsink element that emitted this signal - * @fd: the file descriptor that was removed from multifdsink - * - * The given file descriptor was removed from multifdsink. This signal will - * be emitted from the streaming thread so applications should be prepared - * for that. - * - * In this callback, @gstmultifdsink has removed all the information - * associated with @fd and it is therefore not possible to call get-stats - * with @fd. It is however safe to close() and reuse @fd in the callback. - * - * Since: 0.10.7 - */ - gst_multi_fd_sink_signals[SIGNAL_CLIENT_FD_REMOVED] = - g_signal_new ("client-fd-removed", G_TYPE_FROM_CLASS (klass), - G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstMultiFdSinkClass, - client_fd_removed), NULL, NULL, gst_tcp_marshal_VOID__INT, - G_TYPE_NONE, 1, G_TYPE_INT); - - gstelement_class->change_state = - GST_DEBUG_FUNCPTR (gst_multi_fd_sink_change_state); - - gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_render); - - klass->add = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add); - klass->add_full = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_add_full); - klass->remove = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove); - klass->remove_flush = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_remove_flush); - klass->clear = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_clear); - klass->get_stats = GST_DEBUG_FUNCPTR (gst_multi_fd_sink_get_stats); - - GST_DEBUG_CATEGORY_INIT (multifdsink_debug, "multifdsink", 0, "FD sink"); -} - -static void -gst_multi_fd_sink_init (GstMultiFdSink * this, GstMultiFdSinkClass * klass) -{ - GST_OBJECT_FLAG_UNSET (this, GST_MULTI_FD_SINK_OPEN); - - this->protocol = DEFAULT_PROTOCOL; - this->mode = DEFAULT_MODE; - - CLIENTS_LOCK_INIT (this); - this->clients = NULL; - this->fd_hash = g_hash_table_new (g_int_hash, g_int_equal); - - this->bufqueue = g_array_new (FALSE, TRUE, sizeof (GstBuffer *)); - this->unit_type = DEFAULT_UNIT_TYPE; - this->units_max = DEFAULT_UNITS_MAX; - this->units_soft_max = DEFAULT_UNITS_SOFT_MAX; - this->time_min = DEFAULT_TIME_MIN; - this->bytes_min = DEFAULT_BYTES_MIN; - this->buffers_min = DEFAULT_BUFFERS_MIN; - this->recover_policy = DEFAULT_RECOVER_POLICY; - - this->timeout = DEFAULT_TIMEOUT; - this->def_sync_method = DEFAULT_SYNC_METHOD; - this->def_burst_unit = DEFAULT_BURST_UNIT; - this->def_burst_value = DEFAULT_BURST_VALUE; - - this->qos_dscp = DEFAULT_QOS_DSCP; - this->handle_read = DEFAULT_HANDLE_READ; - - this->resend_streamheader = DEFAULT_RESEND_STREAMHEADER; - - this->header_flags = 0; -} - -static void -gst_multi_fd_sink_finalize (GObject * object) -{ - GstMultiFdSink *this; - - this = GST_MULTI_FD_SINK (object); - - CLIENTS_LOCK_FREE (this); - g_hash_table_destroy (this->fd_hash); - g_array_free (this->bufqueue, TRUE); - - G_OBJECT_CLASS (parent_class)->finalize (object); -} - -static gint -setup_dscp_client (GstMultiFdSink * sink, GstTCPClient * client) -{ - gint tos; - gint ret; - union gst_sockaddr - { - struct sockaddr sa; - struct sockaddr_in6 sa_in6; - struct sockaddr_storage sa_stor; - } sa; - socklen_t slen = sizeof (sa); - gint af; - - /* don't touch */ - if (sink->qos_dscp < 0) - return 0; - - if ((ret = getsockname (client->fd.fd, &sa.sa, &slen)) < 0) { - GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno)); - return ret; - } - - af = sa.sa.sa_family; - - /* if this is an IPv4-mapped address then do IPv4 QoS */ - if (af == AF_INET6) { - - GST_DEBUG_OBJECT (sink, "check IP6 socket"); - if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) { - GST_DEBUG_OBJECT (sink, "mapped to IPV4"); - af = AF_INET; - } - } - - /* extract and shift 6 bits of the DSCP */ - tos = (sink->qos_dscp & 0x3f) << 2; - - switch (af) { - case AF_INET: - ret = setsockopt (client->fd.fd, IPPROTO_IP, IP_TOS, &tos, sizeof (tos)); - break; - case AF_INET6: -#ifdef IPV6_TCLASS - ret = - setsockopt (client->fd.fd, IPPROTO_IPV6, IPV6_TCLASS, &tos, - sizeof (tos)); - break; -#endif - default: - ret = 0; - GST_ERROR_OBJECT (sink, "unsupported AF"); - break; - } - if (ret) - GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno)); - - return ret; -} - - -static void -setup_dscp (GstMultiFdSink * sink) -{ - GList *clients, *next; - - CLIENTS_LOCK (sink); - for (clients = sink->clients; clients; clients = next) { - GstTCPClient *client; - - client = (GstTCPClient *) clients->data; - next = g_list_next (clients); - - setup_dscp_client (sink, client); - } - CLIENTS_UNLOCK (sink); -} - -/* "add-full" signal implementation */ -void -gst_multi_fd_sink_add_full (GstMultiFdSink * sink, int fd, - GstSyncMethod sync_method, GstTCPUnitType min_unit, guint64 min_value, - GstTCPUnitType max_unit, guint64 max_value) -{ - GstTCPClient *client; - GList *clink; - GTimeVal now; - gint flags, res; - struct stat statbuf; - - GST_DEBUG_OBJECT (sink, "[fd %5d] adding client, sync_method %d, " - "min_unit %d, min_value %" G_GUINT64_FORMAT - ", max_unit %d, max_value %" G_GUINT64_FORMAT, fd, sync_method, - min_unit, min_value, max_unit, max_value); - - /* do limits check if we can */ - if (min_unit == max_unit) { - if (max_value != -1 && min_value != -1 && max_value < min_value) - goto wrong_limits; - } - - /* create client datastructure */ - client = g_new0 (GstTCPClient, 1); - client->fd.fd = fd; - client->status = GST_CLIENT_STATUS_OK; - client->bufpos = -1; - client->flushcount = -1; - client->bufoffset = 0; - client->sending = NULL; - client->bytes_sent = 0; - client->dropped_buffers = 0; - client->avg_queue_size = 0; - client->new_connection = TRUE; - client->burst_min_unit = min_unit; - client->burst_min_value = min_value; - client->burst_max_unit = max_unit; - client->burst_max_value = max_value; - client->sync_method = sync_method; - client->currently_removing = FALSE; - - /* update start time */ - g_get_current_time (&now); - client->connect_time = GST_TIMEVAL_TO_TIME (now); - client->disconnect_time = 0; - /* set last activity time to connect time */ - client->last_activity_time = client->connect_time; - - CLIENTS_LOCK (sink); - - /* check the hash to find a duplicate fd */ - clink = g_hash_table_lookup (sink->fd_hash, &client->fd.fd); - if (clink != NULL) - goto duplicate; - - /* we can add the fd now */ - clink = sink->clients = g_list_prepend (sink->clients, client); - g_hash_table_insert (sink->fd_hash, &client->fd.fd, clink); - sink->clients_cookie++; - - /* set the socket to non blocking */ - res = fcntl (fd, F_SETFL, O_NONBLOCK); - /* we always read from a client */ - gst_poll_add_fd (sink->fdset, &client->fd); - - /* we don't try to read from write only fds */ - if (sink->handle_read) { - flags = fcntl (fd, F_GETFL, 0); - if ((flags & O_ACCMODE) != O_WRONLY) { - gst_poll_fd_ctl_read (sink->fdset, &client->fd, TRUE); - } - } - /* figure out the mode, can't use send() for non sockets */ - res = fstat (fd, &statbuf); - if (S_ISSOCK (statbuf.st_mode)) { - client->is_socket = TRUE; - setup_dscp_client (sink, client); - } - - gst_poll_restart (sink->fdset); - - CLIENTS_UNLOCK (sink); - - g_signal_emit (G_OBJECT (sink), - gst_multi_fd_sink_signals[SIGNAL_CLIENT_ADDED], 0, fd); - - return; - - /* errors */ -wrong_limits: - { - GST_WARNING_OBJECT (sink, - "[fd %5d] wrong values min =%" G_GUINT64_FORMAT ", max=%" - G_GUINT64_FORMAT ", unit %d specified when adding client", fd, - min_value, max_value, min_unit); - return; - } -duplicate: - { - client->status = GST_CLIENT_STATUS_DUPLICATE; - CLIENTS_UNLOCK (sink); - GST_WARNING_OBJECT (sink, "[fd %5d] duplicate client found, refusing", fd); - g_signal_emit (G_OBJECT (sink), - gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, - client->status); - g_free (client); - return; - } -} - -/* "add" signal implemntation */ -void -gst_multi_fd_sink_add (GstMultiFdSink * sink, int fd) -{ - gst_multi_fd_sink_add_full (sink, fd, sink->def_sync_method, - sink->def_burst_unit, sink->def_burst_value, sink->def_burst_unit, -1); -} - -/* "remove" signal implementation */ -void -gst_multi_fd_sink_remove (GstMultiFdSink * sink, int fd) -{ - GList *clink; - - GST_DEBUG_OBJECT (sink, "[fd %5d] removing client", fd); - - CLIENTS_LOCK (sink); - clink = g_hash_table_lookup (sink->fd_hash, &fd); - if (clink != NULL) { - GstTCPClient *client = (GstTCPClient *) clink->data; - - if (client->status != GST_CLIENT_STATUS_OK) { - GST_INFO_OBJECT (sink, - "[fd %5d] Client already disconnecting with status %d", - fd, client->status); - goto done; - } - - client->status = GST_CLIENT_STATUS_REMOVED; - gst_multi_fd_sink_remove_client_link (sink, clink); - gst_poll_restart (sink->fdset); - } else { - GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd); - } - -done: - CLIENTS_UNLOCK (sink); -} - -/* "remove-flush" signal implementation */ -void -gst_multi_fd_sink_remove_flush (GstMultiFdSink * sink, int fd) -{ - GList *clink; - - GST_DEBUG_OBJECT (sink, "[fd %5d] flushing client", fd); - - CLIENTS_LOCK (sink); - clink = g_hash_table_lookup (sink->fd_hash, &fd); - if (clink != NULL) { - GstTCPClient *client = (GstTCPClient *) clink->data; - - if (client->status != GST_CLIENT_STATUS_OK) { - GST_INFO_OBJECT (sink, - "[fd %5d] Client already disconnecting with status %d", - fd, client->status); - goto done; - } - - /* take the position of the client as the number of buffers left to flush. - * If the client was at position -1, we flush 0 buffers, 0 == flush 1 - * buffer, etc... */ - client->flushcount = client->bufpos + 1; - /* mark client as flushing. We can not remove the client right away because - * it might have some buffers to flush in the ->sending queue. */ - client->status = GST_CLIENT_STATUS_FLUSHING; - } else { - GST_WARNING_OBJECT (sink, "[fd %5d] no client with this fd found!", fd); - } -done: - CLIENTS_UNLOCK (sink); -} - -/* can be called both through the signal (i.e. from any thread) or when - * stopping, after the writing thread has shut down */ -void -gst_multi_fd_sink_clear (GstMultiFdSink * sink) -{ - GList *clients, *next; - guint32 cookie; - - GST_DEBUG_OBJECT (sink, "clearing all clients"); - - CLIENTS_LOCK (sink); -restart: - cookie = sink->clients_cookie; - for (clients = sink->clients; clients; clients = next) { - GstTCPClient *client; - - if (cookie != sink->clients_cookie) { - GST_DEBUG_OBJECT (sink, "cookie changed while removing all clients"); - goto restart; - } - - client = (GstTCPClient *) clients->data; - next = g_list_next (clients); - - client->status = GST_CLIENT_STATUS_REMOVED; - gst_multi_fd_sink_remove_client_link (sink, clients); - } - gst_poll_restart (sink->fdset); - CLIENTS_UNLOCK (sink); -} - -/* "get-stats" signal implementation - * the array returned contains: - * - * guint64 : bytes_sent - * guint64 : connect time (in nanoseconds, since Epoch) - * guint64 : disconnect time (in nanoseconds, since Epoch) - * guint64 : time the client is/was connected (in nanoseconds) - * guint64 : last activity time (in nanoseconds, since Epoch) - * guint64 : buffers dropped due to recovery - */ -GValueArray * -gst_multi_fd_sink_get_stats (GstMultiFdSink * sink, int fd) -{ - GstTCPClient *client; - GValueArray *result = NULL; - GList *clink; - - CLIENTS_LOCK (sink); - clink = g_hash_table_lookup (sink->fd_hash, &fd); - if (clink == NULL) - goto noclient; - - client = (GstTCPClient *) clink->data; - if (client != NULL) { - GValue value = { 0 }; - guint64 interval; - - result = g_value_array_new (5); - - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->bytes_sent); - result = g_value_array_append (result, &value); - g_value_unset (&value); - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->connect_time); - result = g_value_array_append (result, &value); - g_value_unset (&value); - if (client->disconnect_time == 0) { - GTimeVal nowtv; - - g_get_current_time (&nowtv); - - interval = GST_TIMEVAL_TO_TIME (nowtv) - client->connect_time; - } else { - interval = client->disconnect_time - client->connect_time; - } - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->disconnect_time); - result = g_value_array_append (result, &value); - g_value_unset (&value); - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, interval); - result = g_value_array_append (result, &value); - g_value_unset (&value); - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->last_activity_time); - result = g_value_array_append (result, &value); - g_value_unset (&value); - g_value_init (&value, G_TYPE_UINT64); - g_value_set_uint64 (&value, client->dropped_buffers); - result = g_value_array_append (result, &value); - } - -noclient: - CLIENTS_UNLOCK (sink); - - /* python doesn't like a NULL pointer yet */ - if (result == NULL) { - GST_WARNING_OBJECT (sink, "[fd %5d] no client with this found!", fd); - result = g_value_array_new (0); - } - - return result; -} - -/* should be called with the clientslock helt. - * Note that we don't close the fd as we didn't open it in the first - * place. An application should connect to the client-fd-removed signal and - * close the fd itself. - */ -static void -gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link) -{ - int fd; - GTimeVal now; - GstTCPClient *client = (GstTCPClient *) link->data; - GstMultiFdSinkClass *fclass; - - fclass = GST_MULTI_FD_SINK_GET_CLASS (sink); - - fd = client->fd.fd; - - if (client->currently_removing) { - GST_WARNING_OBJECT (sink, "[fd %5d] client is already being removed", fd); - return; - } else { - client->currently_removing = TRUE; - } - - /* FIXME: if we keep track of ip we can log it here and signal */ - switch (client->status) { - case GST_CLIENT_STATUS_OK: - GST_WARNING_OBJECT (sink, "[fd %5d] removing client %p for no reason", - fd, client); - break; - case GST_CLIENT_STATUS_CLOSED: - GST_DEBUG_OBJECT (sink, "[fd %5d] removing client %p because of close", - fd, client); - break; - case GST_CLIENT_STATUS_REMOVED: - GST_DEBUG_OBJECT (sink, - "[fd %5d] removing client %p because the app removed it", fd, client); - break; - case GST_CLIENT_STATUS_SLOW: - GST_INFO_OBJECT (sink, - "[fd %5d] removing client %p because it was too slow", fd, client); - break; - case GST_CLIENT_STATUS_ERROR: - GST_WARNING_OBJECT (sink, - "[fd %5d] removing client %p because of error", fd, client); - break; - case GST_CLIENT_STATUS_FLUSHING: - default: - GST_WARNING_OBJECT (sink, - "[fd %5d] removing client %p with invalid reason %d", fd, client, - client->status); - break; - } - - gst_poll_remove_fd (sink->fdset, &client->fd); - - g_get_current_time (&now); - client->disconnect_time = GST_TIMEVAL_TO_TIME (now); - - /* free client buffers */ - g_slist_foreach (client->sending, (GFunc) gst_mini_object_unref, NULL); - g_slist_free (client->sending); - client->sending = NULL; - - if (client->caps) - gst_caps_unref (client->caps); - client->caps = NULL; - - /* unlock the mutex before signaling because the signal handler - * might query some properties */ - CLIENTS_UNLOCK (sink); - - g_signal_emit (G_OBJECT (sink), - gst_multi_fd_sink_signals[SIGNAL_CLIENT_REMOVED], 0, fd, client->status); - - /* lock again before we remove the client completely */ - CLIENTS_LOCK (sink); - - /* fd cannot be reused in the above signal callback so we can safely - * remove it from the hashtable here */ - if (!g_hash_table_remove (sink->fd_hash, &client->fd.fd)) { - GST_WARNING_OBJECT (sink, - "[fd %5d] error removing client %p from hash", client->fd.fd, client); - } - /* after releasing the lock above, the link could be invalid, more - * precisely, the next and prev pointers could point to invalid list - * links. One optimisation could be to add a cookie to the linked list - * and take a shortcut when it did not change between unlocking and locking - * our mutex. For now we just walk the list again. */ - sink->clients = g_list_remove (sink->clients, client); - sink->clients_cookie++; - - if (fclass->removed) - fclass->removed (sink, client->fd.fd); - - g_free (client); - CLIENTS_UNLOCK (sink); - - /* and the fd is really gone now */ - g_signal_emit (G_OBJECT (sink), - gst_multi_fd_sink_signals[SIGNAL_CLIENT_FD_REMOVED], 0, fd); - - CLIENTS_LOCK (sink); -} - -/* handle a read on a client fd, - * which either indicates a close or should be ignored - * returns FALSE if some error occured or the client closed. */ -static gboolean -gst_multi_fd_sink_handle_client_read (GstMultiFdSink * sink, - GstTCPClient * client) -{ - int avail, fd; - gboolean ret; - - fd = client->fd.fd; - - if (ioctl (fd, FIONREAD, &avail) < 0) - goto ioctl_failed; - - GST_DEBUG_OBJECT (sink, "[fd %5d] select reports client read of %d bytes", - fd, avail); - - ret = TRUE; - - if (avail == 0) { - /* client sent close, so remove it */ - GST_DEBUG_OBJECT (sink, "[fd %5d] client asked for close, removing", fd); - client->status = GST_CLIENT_STATUS_CLOSED; - ret = FALSE; - } else if (avail < 0) { - GST_WARNING_OBJECT (sink, "[fd %5d] avail < 0, removing", fd); - client->status = GST_CLIENT_STATUS_ERROR; - ret = FALSE; - } else { - guint8 dummy[512]; - gint nread; - - /* just Read 'n' Drop, could also just drop the client as it's not supposed - * to write to us except for closing the socket, I guess it's because we - * like to listen to our customers. */ - do { - /* this is the maximum we can read */ - gint to_read = MIN (avail, 512); - - GST_DEBUG_OBJECT (sink, "[fd %5d] client wants us to read %d bytes", - fd, to_read); - - nread = read (fd, dummy, to_read); - if (nread < -1) { - GST_WARNING_OBJECT (sink, "[fd %5d] could not read %d bytes: %s (%d)", - fd, to_read, g_strerror (errno), errno); - client->status = GST_CLIENT_STATUS_ERROR; - ret = FALSE; - break; - } else if (nread == 0) { - GST_WARNING_OBJECT (sink, "[fd %5d] 0 bytes in read, removing", fd); - client->status = GST_CLIENT_STATUS_ERROR; - ret = FALSE; - break; - } - avail -= nread; - } - while (avail > 0); - } - return ret; - - /* ERRORS */ -ioctl_failed: - { - GST_WARNING_OBJECT (sink, "[fd %5d] ioctl failed: %s (%d)", - fd, g_strerror (errno), errno); - client->status = GST_CLIENT_STATUS_ERROR; - return FALSE; - } -} - -/* Queue raw data for this client, creating a new buffer. - * This takes ownership of the data by - * setting it as GST_BUFFER_MALLOCDATA() on the created buffer so - * be sure to pass g_free()-able @data. - */ -static gboolean -gst_multi_fd_sink_client_queue_data (GstMultiFdSink * sink, - GstTCPClient * client, gchar * data, gint len) -{ - GstBuffer *buf; - - buf = gst_buffer_new (); - GST_BUFFER_DATA (buf) = (guint8 *) data; - GST_BUFFER_MALLOCDATA (buf) = (guint8 *) data; - GST_BUFFER_SIZE (buf) = len; - - GST_LOG_OBJECT (sink, "[fd %5d] queueing data of length %d", - client->fd.fd, len); - - client->sending = g_slist_append (client->sending, buf); - - return TRUE; -} - -/* GDP-encode given caps and queue them for sending */ -static gboolean -gst_multi_fd_sink_client_queue_caps (GstMultiFdSink * sink, - GstTCPClient * client, const GstCaps * caps) -{ - guint8 *header; - guint8 *payload; - guint length; - gchar *string; - - g_return_val_if_fail (caps != NULL, FALSE); - - string = gst_caps_to_string (caps); - GST_DEBUG_OBJECT (sink, "[fd %5d] Queueing caps %s through GDP", - client->fd.fd, string); - g_free (string); - - if (!gst_dp_packet_from_caps (caps, sink->header_flags, &length, &header, - &payload)) { - GST_DEBUG_OBJECT (sink, "Could not create GDP packet from caps"); - return FALSE; - } - gst_multi_fd_sink_client_queue_data (sink, client, (gchar *) header, length); - - length = gst_dp_header_payload_length (header); - gst_multi_fd_sink_client_queue_data (sink, client, (gchar *) payload, length); - - return TRUE; -} - -static gboolean -is_sync_frame (GstMultiFdSink * sink, GstBuffer * buffer) -{ - if (GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT)) { - return FALSE; - } else if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_IN_CAPS)) { - return TRUE; - } - - return FALSE; -} - -/* queue the given buffer for the given client, possibly adding the GDP - * header if GDP is being used */ -static gboolean -gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink, - GstTCPClient * client, GstBuffer * buffer) -{ - GstCaps *caps; - - /* TRUE: send them if the new caps have them */ - gboolean send_streamheader = FALSE; - GstStructure *s; - - /* before we queue the buffer, we check if we need to queue streamheader - * buffers (because it's a new client, or because they changed) */ - caps = gst_buffer_get_caps (buffer); /* cleaned up after streamheader */ - if (!client->caps) { - GST_DEBUG_OBJECT (sink, - "[fd %5d] no previous caps for this client, send streamheader", - client->fd.fd); - send_streamheader = TRUE; - client->caps = gst_caps_ref (caps); - } else { - /* there were previous caps recorded, so compare */ - if (!gst_caps_is_equal (caps, client->caps)) { - const GValue *sh1, *sh2; - - /* caps are not equal, but could still have the same streamheader */ - s = gst_caps_get_structure (caps, 0); - if (!gst_structure_has_field (s, "streamheader")) { - /* no new streamheader, so nothing new to send */ - GST_DEBUG_OBJECT (sink, - "[fd %5d] new caps do not have streamheader, not sending", - client->fd.fd); - } else { - /* there is a new streamheader */ - s = gst_caps_get_structure (client->caps, 0); - if (!gst_structure_has_field (s, "streamheader")) { - /* no previous streamheader, so send the new one */ - GST_DEBUG_OBJECT (sink, - "[fd %5d] previous caps did not have streamheader, sending", - client->fd.fd); - send_streamheader = TRUE; - } else { - /* both old and new caps have streamheader set */ - if (!sink->resend_streamheader) { - GST_DEBUG_OBJECT (sink, - "[fd %5d] asked to not resend the streamheader, not sending", - client->fd.fd); - send_streamheader = FALSE; - } else { - sh1 = gst_structure_get_value (s, "streamheader"); - s = gst_caps_get_structure (caps, 0); - sh2 = gst_structure_get_value (s, "streamheader"); - if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) { - GST_DEBUG_OBJECT (sink, - "[fd %5d] new streamheader different from old, sending", - client->fd.fd); - send_streamheader = TRUE; - } - } - } - } - } - /* Replace the old caps */ - gst_caps_unref (client->caps); - client->caps = gst_caps_ref (caps); - } - - if (G_UNLIKELY (send_streamheader)) { - const GValue *sh; - GArray *buffers; - int i; - - GST_LOG_OBJECT (sink, - "[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT, - client->fd.fd, caps); - s = gst_caps_get_structure (caps, 0); - if (!gst_structure_has_field (s, "streamheader")) { - GST_DEBUG_OBJECT (sink, - "[fd %5d] no new streamheader, so nothing to send", client->fd.fd); - } else { - GST_LOG_OBJECT (sink, - "[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT, - client->fd.fd, caps); - sh = gst_structure_get_value (s, "streamheader"); - g_assert (G_VALUE_TYPE (sh) == GST_TYPE_ARRAY); - buffers = g_value_peek_pointer (sh); - GST_DEBUG_OBJECT (sink, "%d streamheader buffers", buffers->len); - for (i = 0; i < buffers->len; ++i) { - GValue *bufval; - GstBuffer *buffer; - - bufval = &g_array_index (buffers, GValue, i); - g_assert (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER); - buffer = g_value_peek_pointer (bufval); - GST_DEBUG_OBJECT (sink, - "[fd %5d] queueing streamheader buffer of length %d", - client->fd.fd, GST_BUFFER_SIZE (buffer)); - gst_buffer_ref (buffer); - - if (sink->protocol == GST_TCP_PROTOCOL_GDP) { - guint8 *header; - guint len; - - if (!gst_dp_header_from_buffer (buffer, sink->header_flags, &len, - &header)) { - GST_DEBUG_OBJECT (sink, - "[fd %5d] could not create header, removing client", - client->fd.fd); - return FALSE; - } - gst_multi_fd_sink_client_queue_data (sink, client, (gchar *) header, - len); - } - - client->sending = g_slist_append (client->sending, buffer); - } - } - } - - gst_caps_unref (caps); - caps = NULL; - /* now we can send the buffer, possibly sending a GDP header first */ - if (sink->protocol == GST_TCP_PROTOCOL_GDP) { - guint8 *header; - guint len; - - if (!gst_dp_header_from_buffer (buffer, sink->header_flags, &len, &header)) { - GST_DEBUG_OBJECT (sink, - "[fd %5d] could not create header, removing client", client->fd.fd); - return FALSE; - } - gst_multi_fd_sink_client_queue_data (sink, client, (gchar *) header, len); - } - - GST_LOG_OBJECT (sink, "[fd %5d] queueing buffer of length %d", - client->fd.fd, GST_BUFFER_SIZE (buffer)); - - gst_buffer_ref (buffer); - client->sending = g_slist_append (client->sending, buffer); - - return TRUE; -} - -/* find the keyframe in the list of buffers starting the - * search from @idx. @direction as -1 will search backwards, - * 1 will search forwards. - * Returns: the index or -1 if there is no keyframe after idx. - */ -static gint -find_syncframe (GstMultiFdSink * sink, gint idx, gint direction) -{ - gint i, len, result; - - /* take length of queued buffers */ - len = sink->bufqueue->len; - - /* assume we don't find a keyframe */ - result = -1; - - /* then loop over all buffers to find the first keyframe */ - for (i = idx; i >= 0 && i < len; i += direction) { - GstBuffer *buf; - - buf = g_array_index (sink->bufqueue, GstBuffer *, i); - if (is_sync_frame (sink, buf)) { - GST_LOG_OBJECT (sink, "found keyframe at %d from %d, direction %d", - i, idx, direction); - result = i; - break; - } - } - return result; -} - -#define find_next_syncframe(s,i) find_syncframe(s,i,1) -#define find_prev_syncframe(s,i) find_syncframe(s,i,-1) - -/* Get the number of buffers from the buffer queue needed to satisfy - * the maximum max in the configured units. - * If units are not BUFFERS, and there are insufficient buffers in the - * queue to satify the limit, return len(queue) + 1 */ -static gint -get_buffers_max (GstMultiFdSink * sink, gint64 max) -{ - switch (sink->unit_type) { - case GST_TCP_UNIT_TYPE_BUFFERS: - return max; - case GST_TCP_UNIT_TYPE_TIME: - { - GstBuffer *buf; - int i; - int len; - gint64 diff; - GstClockTime first = GST_CLOCK_TIME_NONE; - - len = sink->bufqueue->len; - - for (i = 0; i < len; i++) { - buf = g_array_index (sink->bufqueue, GstBuffer *, i); - if (GST_BUFFER_TIMESTAMP_IS_VALID (buf)) { - if (first == -1) - first = GST_BUFFER_TIMESTAMP (buf); - - diff = first - GST_BUFFER_TIMESTAMP (buf); - - if (diff > max) - return i + 1; - } - } - return len + 1; - } - case GST_TCP_UNIT_TYPE_BYTES: - { - GstBuffer *buf; - int i; - int len; - gint acc = 0; - - len = sink->bufqueue->len; - - for (i = 0; i < len; i++) { - buf = g_array_index (sink->bufqueue, GstBuffer *, i); - acc += GST_BUFFER_SIZE (buf); - - if (acc > max) - return i + 1; - } - return len + 1; - } - default: - return max; - } -} - -/* find the positions in the buffer queue where *_min and *_max - * is satisfied - */ -/* count the amount of data in the buffers and return the index - * that satifies the given limits. - * - * Returns: index @idx in the buffer queue so that the given limits are - * satisfied. TRUE if all the limits could be satisfied, FALSE if not - * enough data was in the queue. - * - * FIXME, this code might now work if any of the units is in buffers... - */ -static gboolean -find_limits (GstMultiFdSink * sink, - gint * min_idx, gint bytes_min, gint buffers_min, gint64 time_min, - gint * max_idx, gint bytes_max, gint buffers_max, gint64 time_max) -{ - GstClockTime first, time; - gint i, len, bytes; - gboolean result, max_hit; - - /* take length of queue */ - len = sink->bufqueue->len; - - /* this must hold */ - g_assert (len > 0); - - GST_LOG_OBJECT (sink, - "bytes_min %d, buffers_min %d, time_min %" GST_TIME_FORMAT - ", bytes_max %d, buffers_max %d, time_max %" GST_TIME_FORMAT, bytes_min, - buffers_min, GST_TIME_ARGS (time_min), bytes_max, buffers_max, - GST_TIME_ARGS (time_max)); - - /* do the trivial buffer limit test */ - if (buffers_min != -1 && len < buffers_min) { - *min_idx = len - 1; - *max_idx = len - 1; - return FALSE; - } - - result = FALSE; - /* else count bytes and time */ - first = -1; - bytes = 0; - /* unset limits */ - *min_idx = -1; - *max_idx = -1; - max_hit = FALSE; - - i = 0; - /* loop through the buffers, when a limit is ok, mark it - * as -1, we have at least one buffer in the queue. */ - do { - GstBuffer *buf; - - /* if we checked all min limits, update result */ - if (bytes_min == -1 && time_min == -1 && *min_idx == -1) { - /* don't go below 0 */ - *min_idx = MAX (i - 1, 0); - } - /* if we reached one max limit break out */ - if (max_hit) { - /* i > 0 when we get here, we subtract one to get the position - * of the previous buffer. */ - *max_idx = i - 1; - /* we have valid complete result if we found a min_idx too */ - result = *min_idx != -1; - break; - } - buf = g_array_index (sink->bufqueue, GstBuffer *, i); - - bytes += GST_BUFFER_SIZE (buf); - - /* take timestamp and save for the base first timestamp */ - if ((time = GST_BUFFER_TIMESTAMP (buf)) != -1) { - GST_LOG_OBJECT (sink, "Ts %" GST_TIME_FORMAT " on buffer", - GST_TIME_ARGS (time)); - if (first == -1) - first = time; - - /* increase max usage if we did not fill enough. Note that - * buffers are sorted from new to old, so the first timestamp is - * bigger than the next one. */ - if (time_min != -1 && first - time >= time_min) - time_min = -1; - if (time_max != -1 && first - time >= time_max) - max_hit = TRUE; - } else { - GST_LOG_OBJECT (sink, "No timestamp on buffer"); - } - /* time is OK or unknown, check and increase if not enough bytes */ - if (bytes_min != -1) { - if (bytes >= bytes_min) - bytes_min = -1; - } - if (bytes_max != -1) { - if (bytes >= bytes_max) { - max_hit = TRUE; - } - } - i++; - } - while (i < len); - - /* if we did not hit the max or min limit, set to buffer size */ - if (*max_idx == -1) - *max_idx = len - 1; - /* make sure min does not exceed max */ - if (*min_idx == -1) - *min_idx = *max_idx; - - return result; -} - -/* parse the unit/value pair and assign it to the result value of the - * right type, leave the other values untouched - * - * Returns: FALSE if the unit is unknown or undefined. TRUE otherwise. - */ -static gboolean -assign_value (GstTCPUnitType unit, guint64 value, gint * bytes, gint * buffers, - GstClockTime * time) -{ - gboolean res = TRUE; - - /* set only the limit of the given format to the given value */ - switch (unit) { - case GST_TCP_UNIT_TYPE_BUFFERS: - *buffers = (gint) value; - break; - case GST_TCP_UNIT_TYPE_TIME: - *time = value; - break; - case GST_TCP_UNIT_TYPE_BYTES: - *bytes = (gint) value; - break; - case GST_TCP_UNIT_TYPE_UNDEFINED: - default: - res = FALSE; - break; - } - return res; -} - -/* count the index in the buffer queue to satisfy the given unit - * and value pair starting from buffer at index 0. - * - * Returns: TRUE if there was enough data in the queue to satisfy the - * burst values. @idx contains the index in the buffer that contains enough - * data to satisfy the limits or the last buffer in the queue when the - * function returns FALSE. - */ -static gboolean -count_burst_unit (GstMultiFdSink * sink, gint * min_idx, - GstTCPUnitType min_unit, guint64 min_value, gint * max_idx, - GstTCPUnitType max_unit, guint64 max_value) -{ - gint bytes_min = -1, buffers_min = -1; - gint bytes_max = -1, buffers_max = -1; - GstClockTime time_min = GST_CLOCK_TIME_NONE, time_max = GST_CLOCK_TIME_NONE; - - assign_value (min_unit, min_value, &bytes_min, &buffers_min, &time_min); - assign_value (max_unit, max_value, &bytes_max, &buffers_max, &time_max); - - return find_limits (sink, min_idx, bytes_min, buffers_min, time_min, - max_idx, bytes_max, buffers_max, time_max); -} - -/* decide where in the current buffer queue this new client should start - * receiving buffers from. - * This function is called whenever a client is connected and has not yet - * received a buffer. - * If this returns -1, it means that we haven't found a good point to - * start streaming from yet, and this function should be called again later - * when more buffers have arrived. - */ -static gint -gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client) -{ - gint result; - - GST_DEBUG_OBJECT (sink, - "[fd %5d] new client, deciding where to start in queue", client->fd.fd); - GST_DEBUG_OBJECT (sink, "queue is currently %d buffers long", - sink->bufqueue->len); - switch (client->sync_method) { - case GST_SYNC_METHOD_LATEST: - /* no syncing, we are happy with whatever the client is going to get */ - result = client->bufpos; - GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_LATEST, position %d", client->fd.fd, result); - break; - case GST_SYNC_METHOD_NEXT_KEYFRAME: - { - /* if one of the new buffers (between client->bufpos and 0) in the queue - * is a sync point, we can proceed, otherwise we need to keep waiting */ - GST_LOG_OBJECT (sink, - "[fd %5d] new client, bufpos %d, waiting for keyframe", client->fd.fd, - client->bufpos); - - result = find_prev_syncframe (sink, client->bufpos); - if (result != -1) { - GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_NEXT_KEYFRAME: result %d", - client->fd.fd, result); - break; - } - - /* client is not on a syncbuffer, need to skip these buffers and - * wait some more */ - GST_LOG_OBJECT (sink, - "[fd %5d] new client, skipping buffer(s), no syncpoint found", - client->fd.fd); - client->bufpos = -1; - break; - } - case GST_SYNC_METHOD_LATEST_KEYFRAME: - { - GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME", client->fd.fd); - - /* for new clients we initially scan the complete buffer queue for - * a sync point when a buffer is added. If we don't find a keyframe, - * we need to wait for the next keyframe and so we change the client's - * sync method to GST_SYNC_METHOD_NEXT_KEYFRAME. - */ - result = find_next_syncframe (sink, 0); - if (result != -1) { - GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME: result %d", client->fd.fd, - result); - break; - } - - GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_LATEST_KEYFRAME: no keyframe found, " - "switching to SYNC_METHOD_NEXT_KEYFRAME", client->fd.fd); - /* throw client to the waiting state */ - client->bufpos = -1; - /* and make client sync to next keyframe */ - client->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME; - break; - } - case GST_SYNC_METHOD_BURST: - { - gboolean ok; - gint max; - - /* move to the position where we satisfy the client's burst - * parameters. If we could not satisfy the parameters because there - * is not enough data, we just send what we have (which is in result). - * We use the max value to limit the search - */ - ok = count_burst_unit (sink, &result, client->burst_min_unit, - client->burst_min_value, &max, client->burst_max_unit, - client->burst_max_value); - GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_BURST: burst_unit returned %d, result %d", - client->fd.fd, ok, result); - - GST_LOG_OBJECT (sink, "min %d, max %d", result, max); - - /* we hit the max and it is below the min, use that then */ - if (max != -1 && max <= result) { - result = MAX (max - 1, 0); - GST_DEBUG_OBJECT (sink, - "[fd %5d] SYNC_METHOD_BURST: result above max, taken down to %d", - client->fd.fd, result); - } - break; - } - case GST_SYNC_METHOD_BURST_KEYFRAME: - { - gboolean ok; - gint min_idx, max_idx; - gint next_syncframe, prev_syncframe; - - /* BURST_KEYFRAME: - * - * _always_ start sending a keyframe to the client. We first search - * a keyframe between min/max limits. If there is none, we send it the - * last keyframe before min. If there is none, the behaviour is like - * NEXT_KEYFRAME. - */ - /* gather burst limits */ - ok = count_burst_unit (sink, &min_idx, client->burst_min_unit, - client->burst_min_value, &max_idx, client->burst_max_unit, - client->burst_max_value); - - GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx); - - /* first find a keyframe after min_idx */ - next_syncframe = find_next_syncframe (sink, min_idx); - if (next_syncframe != -1 && next_syncframe < max_idx) { - /* we have a valid keyframe and it's below the max */ - GST_LOG_OBJECT (sink, "found keyframe in min/max limits"); - result = next_syncframe; - break; - } - - /* no valid keyframe, try to find one below min */ - prev_syncframe = find_prev_syncframe (sink, min_idx); - if (prev_syncframe != -1) { - GST_WARNING_OBJECT (sink, - "using keyframe below min in BURST_KEYFRAME sync mode"); - result = prev_syncframe; - break; - } - - /* no prev keyframe or not enough data */ - GST_WARNING_OBJECT (sink, - "no prev keyframe found in BURST_KEYFRAME sync mode, waiting for next"); - - /* throw client to the waiting state */ - client->bufpos = -1; - /* and make client sync to next keyframe */ - client->sync_method = GST_SYNC_METHOD_NEXT_KEYFRAME; - result = -1; - break; - } - case GST_SYNC_METHOD_BURST_WITH_KEYFRAME: - { - gboolean ok; - gint min_idx, max_idx; - gint next_syncframe; - - /* BURST_WITH_KEYFRAME: - * - * try to start sending a keyframe to the client. We first search - * a keyframe between min/max limits. If there is none, we send it the - * amount of data up 'till min. - */ - /* gather enough data to burst */ - ok = count_burst_unit (sink, &min_idx, client->burst_min_unit, - client->burst_min_value, &max_idx, client->burst_max_unit, - client->burst_max_value); - - GST_LOG_OBJECT (sink, "min %d, max %d", min_idx, max_idx); - - /* first find a keyframe after min_idx */ - next_syncframe = find_next_syncframe (sink, min_idx); - if (next_syncframe != -1 && next_syncframe < max_idx) { - /* we have a valid keyframe and it's below the max */ - GST_LOG_OBJECT (sink, "found keyframe in min/max limits"); - result = next_syncframe; - break; - } - - /* no keyframe, send data from min_idx */ - GST_WARNING_OBJECT (sink, "using min in BURST_WITH_KEYFRAME sync mode"); - - /* make sure we don't go over the max limit */ - if (max_idx != -1 && max_idx <= min_idx) { - result = MAX (max_idx - 1, 0); - } else { - result = min_idx; - } - - break; - } - default: - g_warning ("unknown sync method %d", client->sync_method); - result = client->bufpos; - break; - } - return result; -} - -/* Handle a write on a client, - * which indicates a read request from a client. - * - * For each client we maintain a queue of GstBuffers that contain the raw bytes - * we need to send to the client. In the case of the GDP protocol, we create - * buffers out of the header bytes so that we can focus only on sending - * buffers. - * - * We first check to see if we need to send caps (in GDP) and streamheaders. - * If so, we queue them. - * - * Then we run into the main loop that tries to send as many buffers as - * possible. It will first exhaust the client->sending queue and if the queue - * is empty, it will pick a buffer from the global queue. - * - * Sending the buffers from the client->sending queue is basically writing - * the bytes to the socket and maintaining a count of the bytes that were - * sent. When the buffer is completely sent, it is removed from the - * client->sending queue and we try to pick a new buffer for sending. - * - * When the sending returns a partial buffer we stop sending more data as - * the next send operation could block. - * - * This functions returns FALSE if some error occured. - */ -static gboolean -gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink, - GstTCPClient * client) -{ - int fd = client->fd.fd; - gboolean more; - gboolean res; - gboolean flushing; - GstClockTime now; - GTimeVal nowtv; - - g_get_current_time (&nowtv); - now = GST_TIMEVAL_TO_TIME (nowtv); - - flushing = client->status == GST_CLIENT_STATUS_FLUSHING; - - /* when using GDP, first check if we have queued caps yet */ - if (sink->protocol == GST_TCP_PROTOCOL_GDP) { - /* don't need to do anything when the client is flushing */ - if (!client->caps_sent && !flushing) { - GstPad *peer; - GstCaps *caps; - - peer = gst_pad_get_peer (GST_BASE_SINK_PAD (sink)); - if (!peer) { - GST_WARNING_OBJECT (sink, "pad has no peer"); - return FALSE; - } - gst_object_unref (peer); - - caps = gst_pad_get_negotiated_caps (GST_BASE_SINK_PAD (sink)); - if (!caps) { - GST_WARNING_OBJECT (sink, "pad caps not yet negotiated"); - return FALSE; - } - - /* queue caps for sending */ - res = gst_multi_fd_sink_client_queue_caps (sink, client, caps); - - gst_caps_unref (caps); - - if (!res) { - GST_DEBUG_OBJECT (sink, "Failed queueing caps, removing client"); - return FALSE; - } - client->caps_sent = TRUE; - } - } - - more = TRUE; - do { - gint maxsize; - - if (!client->sending) { - /* client is not working on a buffer */ - if (client->bufpos == -1) { - /* client is too fast, remove from write queue until new buffer is - * available */ - gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE); - /* if we flushed out all of the client buffers, we can stop */ - if (client->flushcount == 0) - goto flushed; - - return TRUE; - } else { - /* client can pick a buffer from the global queue */ - GstBuffer *buf; - - /* for new connections, we need to find a good spot in the - * bufqueue to start streaming from */ - if (client->new_connection && !flushing) { - gint position = gst_multi_fd_sink_new_client (sink, client); - - if (position >= 0) { - /* we got a valid spot in the queue */ - client->new_connection = FALSE; - client->bufpos = position; - } else { - /* cannot send data to this client yet */ - gst_poll_fd_ctl_write (sink->fdset, &client->fd, FALSE); - return TRUE; - } - } - - /* we flushed all remaining buffers, no need to get a new one */ - if (client->flushcount == 0) - goto flushed; - - /* grab buffer */ - buf = g_array_index (sink->bufqueue, GstBuffer *, client->bufpos); - client->bufpos--; - - /* decrease flushcount */ - if (client->flushcount != -1) - client->flushcount--; - - GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d", - fd, client, client->bufpos); - - /* queueing a buffer will ref it */ - gst_multi_fd_sink_client_queue_buffer (sink, client, buf); - - /* need to start from the first byte for this new buffer */ - client->bufoffset = 0; - } - } - - /* see if we need to send something */ - if (client->sending) { - ssize_t wrote; - GstBuffer *head; - - /* pick first buffer from list */ - head = GST_BUFFER (client->sending->data); - maxsize = GST_BUFFER_SIZE (head) - client->bufoffset; - - /* try to write the complete buffer */ -#ifdef MSG_NOSIGNAL -#define FLAGS MSG_NOSIGNAL -#else -#define FLAGS 0 -#endif - if (client->is_socket) { - wrote = - send (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize, - FLAGS); - } else { - wrote = write (fd, GST_BUFFER_DATA (head) + client->bufoffset, maxsize); - } - - if (wrote < 0) { - /* hmm error.. */ - if (errno == EAGAIN) { - /* nothing serious, resource was unavailable, try again later */ - more = FALSE; - } else if (errno == ECONNRESET) { - goto connection_reset; - } else { - goto write_error; - } - } else { - if (wrote < maxsize) { - /* partial write means that the client cannot read more and we should - * stop sending more */ - GST_LOG_OBJECT (sink, - "partial write on %d of %" G_GSSIZE_FORMAT " bytes", fd, wrote); - client->bufoffset += wrote; - more = FALSE; - } else { - /* complete buffer was written, we can proceed to the next one */ - client->sending = g_slist_remove (client->sending, head); - gst_buffer_unref (head); - /* make sure we start from byte 0 for the next buffer */ - client->bufoffset = 0; - } - /* update stats */ - client->bytes_sent += wrote; - client->last_activity_time = now; - sink->bytes_served += wrote; - } - } - } while (more); - - return TRUE; - - /* ERRORS */ -flushed: - { - GST_DEBUG_OBJECT (sink, "[fd %5d] flushed, removing", fd); - client->status = GST_CLIENT_STATUS_REMOVED; - return FALSE; - } -connection_reset: - { - GST_DEBUG_OBJECT (sink, "[fd %5d] connection reset by peer, removing", fd); - client->status = GST_CLIENT_STATUS_CLOSED; - return FALSE; - } -write_error: - { - GST_WARNING_OBJECT (sink, - "[fd %5d] could not write, removing client: %s (%d)", fd, - g_strerror (errno), errno); - client->status = GST_CLIENT_STATUS_ERROR; - return FALSE; - } -} - -/* calculate the new position for a client after recovery. This function - * does not update the client position but merely returns the required - * position. - */ -static gint -gst_multi_fd_sink_recover_client (GstMultiFdSink * sink, GstTCPClient * client) -{ - gint newbufpos; - - GST_WARNING_OBJECT (sink, - "[fd %5d] client %p is lagging at %d, recover using policy %d", - client->fd.fd, client, client->bufpos, sink->recover_policy); - - switch (sink->recover_policy) { - case GST_RECOVER_POLICY_NONE: - /* do nothing, client will catch up or get kicked out when it reaches - * the hard max */ - newbufpos = client->bufpos; - break; - case GST_RECOVER_POLICY_RESYNC_LATEST: - /* move to beginning of queue */ - newbufpos = -1; - break; - case GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT: - /* move to beginning of soft max */ - newbufpos = get_buffers_max (sink, sink->units_soft_max); - break; - case GST_RECOVER_POLICY_RESYNC_KEYFRAME: - /* find keyframe in buffers, we search backwards to find the - * closest keyframe relative to what this client already received. */ - newbufpos = MIN (sink->bufqueue->len - 1, - get_buffers_max (sink, sink->units_soft_max) - 1); - - while (newbufpos >= 0) { - GstBuffer *buf; - - buf = g_array_index (sink->bufqueue, GstBuffer *, newbufpos); - if (is_sync_frame (sink, buf)) { - /* found a buffer that is not a delta unit */ - break; - } - newbufpos--; - } - break; - default: - /* unknown recovery procedure */ - newbufpos = get_buffers_max (sink, sink->units_soft_max); - break; - } - return newbufpos; -} - -/* Queue a buffer on the global queue. - * - * This function adds the buffer to the front of a GArray. It removes the - * tail buffer if the max queue size is exceeded, unreffing the queued buffer. - * Note that unreffing the buffer is not a problem as clients who - * started writing out this buffer will still have a reference to it in the - * client->sending queue. - * - * After adding the buffer, we update all client positions in the queue. If - * a client moves over the soft max, we start the recovery procedure for this - * slow client. If it goes over the hard max, it is put into the slow list - * and removed. - * - * Special care is taken of clients that were waiting for a new buffer (they - * had a position of -1) because they can proceed after adding this new buffer. - * This is done by adding the client back into the write fd_set and signalling - * the select thread that the fd_set changed. - */ -static void -gst_multi_fd_sink_queue_buffer (GstMultiFdSink * sink, GstBuffer * buf) -{ - GList *clients, *next; - gint queuelen; - gboolean need_signal = FALSE; - gint max_buffer_usage; - gint i; - GTimeVal nowtv; - GstClockTime now; - gint max_buffers, soft_max_buffers; - guint cookie; - - g_get_current_time (&nowtv); - now = GST_TIMEVAL_TO_TIME (nowtv); - - CLIENTS_LOCK (sink); - /* add buffer to queue */ - g_array_prepend_val (sink->bufqueue, buf); - queuelen = sink->bufqueue->len; - - if (sink->units_max > 0) - max_buffers = get_buffers_max (sink, sink->units_max); - else - max_buffers = -1; - - if (sink->units_soft_max > 0) - soft_max_buffers = get_buffers_max (sink, sink->units_soft_max); - else - soft_max_buffers = -1; - GST_LOG_OBJECT (sink, "Using max %d, softmax %d", max_buffers, - soft_max_buffers); - - /* then loop over the clients and update the positions */ - max_buffer_usage = 0; - -restart: - cookie = sink->clients_cookie; - for (clients = sink->clients; clients; clients = next) { - GstTCPClient *client; - - if (cookie != sink->clients_cookie) { - GST_DEBUG_OBJECT (sink, "Clients cookie outdated, restarting"); - goto restart; - } - - client = (GstTCPClient *) clients->data; - next = g_list_next (clients); - - client->bufpos++; - GST_LOG_OBJECT (sink, "[fd %5d] client %p at position %d", - client->fd.fd, client, client->bufpos); - /* check soft max if needed, recover client */ - if (soft_max_buffers > 0 && client->bufpos >= soft_max_buffers) { - gint newpos; - - newpos = gst_multi_fd_sink_recover_client (sink, client); - if (newpos != client->bufpos) { - client->dropped_buffers += client->bufpos - newpos; - client->bufpos = newpos; - client->discont = TRUE; - GST_INFO_OBJECT (sink, "[fd %5d] client %p position reset to %d", - client->fd.fd, client, client->bufpos); - } else { - GST_INFO_OBJECT (sink, - "[fd %5d] client %p not recovering position", - client->fd.fd, client); - } - } - /* check hard max and timeout, remove client */ - if ((max_buffers > 0 && client->bufpos >= max_buffers) || - (sink->timeout > 0 - && now - client->last_activity_time > sink->timeout)) { - /* remove client */ - GST_WARNING_OBJECT (sink, "[fd %5d] client %p is too slow, removing", - client->fd.fd, client); - /* remove the client, the fd set will be cleared and the select thread - * will be signaled */ - client->status = GST_CLIENT_STATUS_SLOW; - /* set client to invalid position while being removed */ - client->bufpos = -1; - gst_multi_fd_sink_remove_client_link (sink, clients); - need_signal = TRUE; - continue; - } else if (client->bufpos == 0 || client->new_connection) { - /* can send data to this client now. need to signal the select thread that - * the fd_set changed */ - gst_poll_fd_ctl_write (sink->fdset, &client->fd, TRUE); - need_signal = TRUE; - } - /* keep track of maximum buffer usage */ - if (client->bufpos > max_buffer_usage) { - max_buffer_usage = client->bufpos; - } - } - - /* make sure we respect bytes-min, buffers-min and time-min when they are set */ - { - gint usage, max; - - GST_LOG_OBJECT (sink, - "extending queue %d to respect time_min %" GST_TIME_FORMAT - ", bytes_min %d, buffers_min %d", max_buffer_usage, - GST_TIME_ARGS (sink->time_min), sink->bytes_min, sink->buffers_min); - - /* get index where the limits are ok, we don't really care if all limits - * are ok, we just queue as much as we need. We also don't compare against - * the max limits. */ - find_limits (sink, &usage, sink->bytes_min, sink->buffers_min, - sink->time_min, &max, -1, -1, -1); - - max_buffer_usage = MAX (max_buffer_usage, usage + 1); - GST_LOG_OBJECT (sink, "extended queue to %d", max_buffer_usage); - } - - /* now look for sync points and make sure there is at least one - * sync point in the queue. We only do this if the LATEST_KEYFRAME or - * BURST_KEYFRAME mode is selected */ - if (sink->def_sync_method == GST_SYNC_METHOD_LATEST_KEYFRAME || - sink->def_sync_method == GST_SYNC_METHOD_BURST_KEYFRAME) { - /* no point in searching beyond the queue length */ - gint limit = queuelen; - GstBuffer *buf; - - /* no point in searching beyond the soft-max if any. */ - if (soft_max_buffers > 0) { - limit = MIN (limit, soft_max_buffers); - } - GST_LOG_OBJECT (sink, - "extending queue to include sync point, now at %d, limit is %d", - max_buffer_usage, limit); - for (i = 0; i < limit; i++) { - buf = g_array_index (sink->bufqueue, GstBuffer *, i); - if (is_sync_frame (sink, buf)) { - /* found a sync frame, now extend the buffer usage to - * include at least this frame. */ - max_buffer_usage = MAX (max_buffer_usage, i); - break; - } - } - GST_LOG_OBJECT (sink, "max buffer usage is now %d", max_buffer_usage); - } - - GST_LOG_OBJECT (sink, "len %d, usage %d", queuelen, max_buffer_usage); - - /* nobody is referencing units after max_buffer_usage so we can - * remove them from the queue. We remove them in reverse order as - * this is the most optimal for GArray. */ - for (i = queuelen - 1; i > max_buffer_usage; i--) { - GstBuffer *old; - - /* queue exceeded max size */ - queuelen--; - old = g_array_index (sink->bufqueue, GstBuffer *, i); - sink->bufqueue = g_array_remove_index (sink->bufqueue, i); - - /* unref tail buffer */ - gst_buffer_unref (old); - } - /* save for stats */ - sink->buffers_queued = max_buffer_usage; - CLIENTS_UNLOCK (sink); - - /* and send a signal to thread if fd_set changed */ - if (need_signal) { - gst_poll_restart (sink->fdset); - } -} - -/* Handle the clients. Basically does a blocking select for one - * of the client fds to become read or writable. We also have a - * filedescriptor to receive commands on that we need to check. - * - * After going out of the select call, we read and write to all - * clients that can do so. Badly behaving clients are put on a - * garbage list and removed. - */ -static void -gst_multi_fd_sink_handle_clients (GstMultiFdSink * sink) -{ - int result; - GList *clients, *next; - gboolean try_again; - GstMultiFdSinkClass *fclass; - guint cookie; - - fclass = GST_MULTI_FD_SINK_GET_CLASS (sink); - - do { - try_again = FALSE; - - /* check for: - * - server socket input (ie, new client connections) - * - client socket input (ie, clients saying goodbye) - * - client socket output (ie, client reads) */ - GST_LOG_OBJECT (sink, "waiting on action on fdset"); - result = gst_poll_wait (sink->fdset, GST_CLOCK_TIME_NONE); - - /* < 0 is an error, 0 just means a timeout happened, which is impossible */ - if (result < 0) { - GST_WARNING_OBJECT (sink, "wait failed: %s (%d)", g_strerror (errno), - errno); - if (errno == EBADF) { - /* ok, so one or more of the fds is invalid. We loop over them to find - * the ones that give an error to the F_GETFL fcntl. */ - CLIENTS_LOCK (sink); - restart: - cookie = sink->clients_cookie; - for (clients = sink->clients; clients; clients = next) { - GstTCPClient *client; - int fd; - long flags; - int res; - - if (cookie != sink->clients_cookie) { - GST_DEBUG_OBJECT (sink, "Cookie changed finding bad fd"); - goto restart; - } - - client = (GstTCPClient *) clients->data; - next = g_list_next (clients); - - fd = client->fd.fd; - - res = fcntl (fd, F_GETFL, &flags); - if (res == -1) { - GST_WARNING_OBJECT (sink, "fnctl failed for %d, removing: %s (%d)", - fd, g_strerror (errno), errno); - if (errno == EBADF) { - client->status = GST_CLIENT_STATUS_ERROR; - /* releases the CLIENTS lock */ - gst_multi_fd_sink_remove_client_link (sink, clients); - } - } - } - CLIENTS_UNLOCK (sink); - /* after this, go back in the select loop as the read/writefds - * are not valid */ - try_again = TRUE; - } else if (errno == EINTR) { - /* interrupted system call, just redo the wait */ - try_again = TRUE; - } else if (errno == EBUSY) { - /* the call to gst_poll_wait() was flushed */ - return; - } else { - /* this is quite bad... */ - GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL), - ("select failed: %s (%d)", g_strerror (errno), errno)); - return; - } - } else { - GST_LOG_OBJECT (sink, "wait done: %d sockets with events", result); - } - } while (try_again); - - /* subclasses can check fdset with this virtual function */ - if (fclass->wait) - fclass->wait (sink, sink->fdset); - - /* Check the clients */ - CLIENTS_LOCK (sink); - -restart2: - cookie = sink->clients_cookie; - for (clients = sink->clients; clients; clients = next) { - GstTCPClient *client; - - if (sink->clients_cookie != cookie) { - GST_DEBUG_OBJECT (sink, "Restarting loop, cookie out of date"); - goto restart2; - } - - client = (GstTCPClient *) clients->data; - next = g_list_next (clients); - - if (client->status != GST_CLIENT_STATUS_FLUSHING - && client->status != GST_CLIENT_STATUS_OK) { - gst_multi_fd_sink_remove_client_link (sink, clients); - continue; - } - - if (gst_poll_fd_has_closed (sink->fdset, &client->fd)) { - client->status = GST_CLIENT_STATUS_CLOSED; - gst_multi_fd_sink_remove_client_link (sink, clients); - continue; - } - if (gst_poll_fd_has_error (sink->fdset, &client->fd)) { - GST_WARNING_OBJECT (sink, "gst_poll_fd_has_error for %d", client->fd.fd); - client->status = GST_CLIENT_STATUS_ERROR; - gst_multi_fd_sink_remove_client_link (sink, clients); - continue; - } - if (gst_poll_fd_can_read (sink->fdset, &client->fd)) { - /* handle client read */ - if (!gst_multi_fd_sink_handle_client_read (sink, client)) { - gst_multi_fd_sink_remove_client_link (sink, clients); - continue; - } - } - if (gst_poll_fd_can_write (sink->fdset, &client->fd)) { - /* handle client write */ - if (!gst_multi_fd_sink_handle_client_write (sink, client)) { - gst_multi_fd_sink_remove_client_link (sink, clients); - continue; - } - } - } - CLIENTS_UNLOCK (sink); -} - -/* we handle the client communication in another thread so that we do not block - * the gstreamer thread while we select() on the client fds */ -static gpointer -gst_multi_fd_sink_thread (GstMultiFdSink * sink) -{ - while (sink->running) { - gst_multi_fd_sink_handle_clients (sink); - } - return NULL; -} - -static GstFlowReturn -gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf) -{ - GstMultiFdSink *sink; - gboolean in_caps; - GstCaps *bufcaps, *padcaps; - - sink = GST_MULTI_FD_SINK (bsink); - - g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink, GST_MULTI_FD_SINK_OPEN), - GST_FLOW_WRONG_STATE); - - /* since we check every buffer for streamheader caps, we need to make - * sure every buffer has caps set */ - bufcaps = gst_buffer_get_caps (buf); - padcaps = GST_PAD_CAPS (GST_BASE_SINK_PAD (bsink)); - - /* make sure we have caps on the pad */ - if (!padcaps && !bufcaps) - goto no_caps; - - /* get IN_CAPS first, code below might mess with the flags */ - in_caps = GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS); - - /* stamp the buffer with previous caps if no caps set */ - if (!bufcaps) { - if (!gst_buffer_is_metadata_writable (buf)) { - /* metadata is not writable, copy will be made and original buffer - * will be unreffed so we need to ref so that we don't lose the - * buffer in the render method. */ - gst_buffer_ref (buf); - /* the new buffer is ours only, we keep it out of the scope of this - * function */ - buf = gst_buffer_make_metadata_writable (buf); - } else { - /* else the metadata is writable, we ref because we keep the buffer - * out of the scope of this method */ - gst_buffer_ref (buf); - } - /* buffer metadata is writable now, set the caps */ - gst_buffer_set_caps (buf, padcaps); - } else { - gst_caps_unref (bufcaps); - - /* since we keep this buffer out of the scope of this method */ - gst_buffer_ref (buf); - } - - GST_LOG_OBJECT (sink, "received buffer %p, in_caps: %s, offset %" - G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT - ", timestamp %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT, - buf, in_caps ? "yes" : "no", GST_BUFFER_OFFSET (buf), - GST_BUFFER_OFFSET_END (buf), - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)), - GST_TIME_ARGS (GST_BUFFER_DURATION (buf))); - - /* if we get IN_CAPS buffers, but the previous buffer was not IN_CAPS, - * it means we're getting new streamheader buffers, and we should clear - * the old ones */ - if (in_caps && sink->previous_buffer_in_caps == FALSE) { - GST_DEBUG_OBJECT (sink, - "receiving new IN_CAPS buffers, clearing old streamheader"); - g_slist_foreach (sink->streamheader, (GFunc) gst_mini_object_unref, NULL); - g_slist_free (sink->streamheader); - sink->streamheader = NULL; - } - - /* save the current in_caps */ - sink->previous_buffer_in_caps = in_caps; - - /* if the incoming buffer is marked as IN CAPS, then we assume for now - * it's a streamheader that needs to be sent to each new client, so we - * put it on our internal list of streamheader buffers. - * FIXME: we could check if the buffer's contents are in fact part of the - * current streamheader. - * - * We don't send the buffer to the client, since streamheaders are sent - * separately when necessary. */ - if (in_caps) { - GST_DEBUG_OBJECT (sink, - "appending IN_CAPS buffer with length %d to streamheader", - GST_BUFFER_SIZE (buf)); - sink->streamheader = g_slist_append (sink->streamheader, buf); - } else { - /* queue the buffer, this is a regular data buffer. */ - gst_multi_fd_sink_queue_buffer (sink, buf); - - sink->bytes_to_serve += GST_BUFFER_SIZE (buf); - } - return GST_FLOW_OK; - - /* ERRORS */ -no_caps: - { - GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, (NULL), - ("Received first buffer without caps set")); - return GST_FLOW_NOT_NEGOTIATED; - } -} - -static void -gst_multi_fd_sink_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec) -{ - GstMultiFdSink *multifdsink; - - multifdsink = GST_MULTI_FD_SINK (object); - - switch (prop_id) { - case PROP_PROTOCOL: - multifdsink->protocol = g_value_get_enum (value); - break; - case PROP_MODE: - multifdsink->mode = g_value_get_enum (value); - break; - case PROP_BUFFERS_MAX: - multifdsink->units_max = g_value_get_int (value); - break; - case PROP_BUFFERS_SOFT_MAX: - multifdsink->units_soft_max = g_value_get_int (value); - break; - case PROP_TIME_MIN: - multifdsink->time_min = g_value_get_int64 (value); - break; - case PROP_BYTES_MIN: - multifdsink->bytes_min = g_value_get_int (value); - break; - case PROP_BUFFERS_MIN: - multifdsink->buffers_min = g_value_get_int (value); - break; - case PROP_UNIT_TYPE: - multifdsink->unit_type = g_value_get_enum (value); - break; - case PROP_UNITS_MAX: - multifdsink->units_max = g_value_get_int64 (value); - break; - case PROP_UNITS_SOFT_MAX: - multifdsink->units_soft_max = g_value_get_int64 (value); - break; - case PROP_RECOVER_POLICY: - multifdsink->recover_policy = g_value_get_enum (value); - break; - case PROP_TIMEOUT: - multifdsink->timeout = g_value_get_uint64 (value); - break; - case PROP_SYNC_METHOD: - multifdsink->def_sync_method = g_value_get_enum (value); - break; - case PROP_BURST_UNIT: - multifdsink->def_burst_unit = g_value_get_enum (value); - break; - case PROP_BURST_VALUE: - multifdsink->def_burst_value = g_value_get_uint64 (value); - break; - case PROP_QOS_DSCP: - multifdsink->qos_dscp = g_value_get_int (value); - setup_dscp (multifdsink); - break; - case PROP_HANDLE_READ: - multifdsink->handle_read = g_value_get_boolean (value); - break; - case PROP_RESEND_STREAMHEADER: - multifdsink->resend_streamheader = g_value_get_boolean (value); - break; - - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - -static void -gst_multi_fd_sink_get_property (GObject * object, guint prop_id, GValue * value, - GParamSpec * pspec) -{ - GstMultiFdSink *multifdsink; - - multifdsink = GST_MULTI_FD_SINK (object); - - switch (prop_id) { - case PROP_PROTOCOL: - g_value_set_enum (value, multifdsink->protocol); - break; - case PROP_MODE: - g_value_set_enum (value, multifdsink->mode); - break; - case PROP_BUFFERS_MAX: - g_value_set_int (value, multifdsink->units_max); - break; - case PROP_BUFFERS_SOFT_MAX: - g_value_set_int (value, multifdsink->units_soft_max); - break; - case PROP_TIME_MIN: - g_value_set_int64 (value, multifdsink->time_min); - break; - case PROP_BYTES_MIN: - g_value_set_int (value, multifdsink->bytes_min); - break; - case PROP_BUFFERS_MIN: - g_value_set_int (value, multifdsink->buffers_min); - break; - case PROP_BUFFERS_QUEUED: - g_value_set_uint (value, multifdsink->buffers_queued); - break; - case PROP_BYTES_QUEUED: - g_value_set_uint (value, multifdsink->bytes_queued); - break; - case PROP_TIME_QUEUED: - g_value_set_uint64 (value, multifdsink->time_queued); - break; - case PROP_UNIT_TYPE: - g_value_set_enum (value, multifdsink->unit_type); - break; - case PROP_UNITS_MAX: - g_value_set_int64 (value, multifdsink->units_max); - break; - case PROP_UNITS_SOFT_MAX: - g_value_set_int64 (value, multifdsink->units_soft_max); - break; - case PROP_RECOVER_POLICY: - g_value_set_enum (value, multifdsink->recover_policy); - break; - case PROP_TIMEOUT: - g_value_set_uint64 (value, multifdsink->timeout); - break; - case PROP_SYNC_METHOD: - g_value_set_enum (value, multifdsink->def_sync_method); - break; - case PROP_BYTES_TO_SERVE: - g_value_set_uint64 (value, multifdsink->bytes_to_serve); - break; - case PROP_BYTES_SERVED: - g_value_set_uint64 (value, multifdsink->bytes_served); - break; - case PROP_BURST_UNIT: - g_value_set_enum (value, multifdsink->def_burst_unit); - break; - case PROP_BURST_VALUE: - g_value_set_uint64 (value, multifdsink->def_burst_value); - break; - case PROP_QOS_DSCP: - g_value_set_int (value, multifdsink->qos_dscp); - break; - case PROP_HANDLE_READ: - g_value_set_boolean (value, multifdsink->handle_read); - break; - case PROP_RESEND_STREAMHEADER: - g_value_set_boolean (value, multifdsink->resend_streamheader); - break; - case PROP_NUM_FDS: - g_value_set_uint (value, g_hash_table_size (multifdsink->fd_hash)); - break; - - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - - -/* create a socket for sending to remote machine */ -static gboolean -gst_multi_fd_sink_start (GstBaseSink * bsink) -{ - GstMultiFdSinkClass *fclass; - GstMultiFdSink *this; - - if (GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_FD_SINK_OPEN)) - return TRUE; - - this = GST_MULTI_FD_SINK (bsink); - fclass = GST_MULTI_FD_SINK_GET_CLASS (this); - - GST_INFO_OBJECT (this, "starting in mode %d", this->mode); - if ((this->fdset = gst_poll_new (TRUE)) == NULL) - goto socket_pair; - - this->streamheader = NULL; - this->bytes_to_serve = 0; - this->bytes_served = 0; - - if (fclass->init) { - fclass->init (this); - } - - this->running = TRUE; - this->thread = g_thread_create ((GThreadFunc) gst_multi_fd_sink_thread, - this, TRUE, NULL); - - GST_OBJECT_FLAG_SET (this, GST_MULTI_FD_SINK_OPEN); - - return TRUE; - - /* ERRORS */ -socket_pair: - { - GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ_WRITE, (NULL), - GST_ERROR_SYSTEM); - return FALSE; - } -} - -static gboolean -multifdsink_hash_remove (gpointer key, gpointer value, gpointer data) -{ - return TRUE; -} - -static gboolean -gst_multi_fd_sink_stop (GstBaseSink * bsink) -{ - GstMultiFdSinkClass *fclass; - GstMultiFdSink *this; - GstBuffer *buf; - int i; - - this = GST_MULTI_FD_SINK (bsink); - fclass = GST_MULTI_FD_SINK_GET_CLASS (this); - - if (!GST_OBJECT_FLAG_IS_SET (bsink, GST_MULTI_FD_SINK_OPEN)) - return TRUE; - - this->running = FALSE; - - gst_poll_set_flushing (this->fdset, TRUE); - if (this->thread) { - GST_DEBUG_OBJECT (this, "joining thread"); - g_thread_join (this->thread); - GST_DEBUG_OBJECT (this, "joined thread"); - this->thread = NULL; - } - - /* free the clients */ - gst_multi_fd_sink_clear (this); - - if (this->streamheader) { - g_slist_foreach (this->streamheader, (GFunc) gst_mini_object_unref, NULL); - g_slist_free (this->streamheader); - this->streamheader = NULL; - } - - if (fclass->close) - fclass->close (this); - - if (this->fdset) { - gst_poll_free (this->fdset); - this->fdset = NULL; - } - g_hash_table_foreach_remove (this->fd_hash, multifdsink_hash_remove, this); - - /* remove all queued buffers */ - if (this->bufqueue) { - GST_DEBUG_OBJECT (this, "Emptying bufqueue with %d buffers", - this->bufqueue->len); - for (i = this->bufqueue->len - 1; i >= 0; --i) { - buf = g_array_index (this->bufqueue, GstBuffer *, i); - GST_LOG_OBJECT (this, "Removing buffer %p (%d) with refcount %d", buf, i, - GST_MINI_OBJECT_REFCOUNT (buf)); - gst_buffer_unref (buf); - this->bufqueue = g_array_remove_index (this->bufqueue, i); - } - /* freeing the array is done in _finalize */ - } - GST_OBJECT_FLAG_UNSET (this, GST_MULTI_FD_SINK_OPEN); - - return TRUE; -} - -static GstStateChangeReturn -gst_multi_fd_sink_change_state (GstElement * element, GstStateChange transition) -{ - GstMultiFdSink *sink; - GstStateChangeReturn ret; - - sink = GST_MULTI_FD_SINK (element); - - /* we disallow changing the state from the streaming thread */ - if (g_thread_self () == sink->thread) - return GST_STATE_CHANGE_FAILURE; - - - switch (transition) { - case GST_STATE_CHANGE_NULL_TO_READY: - if (!gst_multi_fd_sink_start (GST_BASE_SINK (sink))) - goto start_failed; - break; - case GST_STATE_CHANGE_READY_TO_PAUSED: - break; - case GST_STATE_CHANGE_PAUSED_TO_PLAYING: - break; - default: - break; - } - - ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); - - switch (transition) { - case GST_STATE_CHANGE_PLAYING_TO_PAUSED: - break; - case GST_STATE_CHANGE_PAUSED_TO_READY: - break; - case GST_STATE_CHANGE_READY_TO_NULL: - gst_multi_fd_sink_stop (GST_BASE_SINK (sink)); - break; - default: - break; - } - return ret; - - /* ERRORS */ -start_failed: - { - /* error message was posted */ - return GST_STATE_CHANGE_FAILURE; - } -} diff --git a/gst/tcp/gstmultifdsink.h b/gst/tcp/gstmultifdsink.h deleted file mode 100644 index d2d9ce4e..00000000 --- a/gst/tcp/gstmultifdsink.h +++ /dev/null @@ -1,286 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu> - * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - - -#ifndef __GST_MULTI_FD_SINK_H__ -#define __GST_MULTI_FD_SINK_H__ - -#include <gst/gst.h> -#include <gst/base/gstbasesink.h> - -G_BEGIN_DECLS - -#include "gsttcp.h" - -#define GST_TYPE_MULTI_FD_SINK \ - (gst_multi_fd_sink_get_type()) -#define GST_MULTI_FD_SINK(obj) \ - (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_MULTI_FD_SINK,GstMultiFdSink)) -#define GST_MULTI_FD_SINK_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_MULTI_FD_SINK,GstMultiFdSinkClass)) -#define GST_IS_MULTI_FD_SINK(obj) \ - (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_MULTI_FD_SINK)) -#define GST_IS_MULTI_FD_SINK_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_MULTI_FD_SINK)) -#define GST_MULTI_FD_SINK_GET_CLASS(klass) \ - (G_TYPE_INSTANCE_GET_CLASS ((klass), GST_TYPE_MULTI_FD_SINK, GstMultiFdSinkClass)) - - -typedef struct _GstMultiFdSink GstMultiFdSink; -typedef struct _GstMultiFdSinkClass GstMultiFdSinkClass; - -typedef enum { - GST_MULTI_FD_SINK_OPEN = (GST_ELEMENT_FLAG_LAST << 0), - - GST_MULTI_FD_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2) -} GstMultiFdSinkFlags; - -/** - * GstRecoverPolicy: - * @GST_RECOVER_POLICY_NONE : no recovering is done - * @GST_RECOVER_POLICY_RESYNC_LATEST : client is moved to last buffer - * @GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT: client is moved to the soft limit - * @GST_RECOVER_POLICY_RESYNC_KEYFRAME : client is moved to latest keyframe - * - * Possible values for the recovery procedure to use when a client consumes - * data too slow and has a backlag of more that soft-limit buffers. - */ -typedef enum -{ - GST_RECOVER_POLICY_NONE, - GST_RECOVER_POLICY_RESYNC_LATEST, - GST_RECOVER_POLICY_RESYNC_SOFT_LIMIT, - GST_RECOVER_POLICY_RESYNC_KEYFRAME -} GstRecoverPolicy; - -/** - * GstSyncMethod: - * @GST_SYNC_METHOD_LATEST : client receives most recent buffer - * @GST_SYNC_METHOD_NEXT_KEYFRAME : client receives next keyframe - * @GST_SYNC_METHOD_LATEST_KEYFRAME : client receives latest keyframe (burst) - * @GST_SYNC_METHOD_BURST : client receives specific amount of data - * @GST_SYNC_METHOD_BURST_KEYFRAME : client receives specific amount of data - * starting from latest keyframe - * @GST_SYNC_METHOD_BURST_WITH_KEYFRAME : client receives specific amount of data from - * a keyframe, or if there is not enough data after - * the keyframe, starting before the keyframe - * - * This enum defines the selection of the first buffer that is sent - * to a new client. - */ -typedef enum -{ - GST_SYNC_METHOD_LATEST, - GST_SYNC_METHOD_NEXT_KEYFRAME, - GST_SYNC_METHOD_LATEST_KEYFRAME, - GST_SYNC_METHOD_BURST, - GST_SYNC_METHOD_BURST_KEYFRAME, - GST_SYNC_METHOD_BURST_WITH_KEYFRAME -} GstSyncMethod; - -/** - * GstTCPUnitType: - * @GST_TCP_UNIT_TYPE_UNDEFINED: undefined - * @GST_TCP_UNIT_TYPE_BUFFERS : buffers - * @GST_TCP_UNIT_TYPE_TIME : timeunits (in nanoseconds) - * @GST_TCP_UNIT_TYPE_BYTES : bytes - * - * The units used to specify limits. - */ -typedef enum -{ - GST_TCP_UNIT_TYPE_UNDEFINED, - GST_TCP_UNIT_TYPE_BUFFERS, - GST_TCP_UNIT_TYPE_TIME, - GST_TCP_UNIT_TYPE_BYTES -} GstTCPUnitType; - -/** - * GstClientStatus: - * @GST_CLIENT_STATUS_OK : client is ok - * @GST_CLIENT_STATUS_CLOSED : client closed the socket - * @GST_CLIENT_STATUS_REMOVED : client is removed - * @GST_CLIENT_STATUS_SLOW : client is too slow - * @GST_CLIENT_STATUS_ERROR : client is in error - * @GST_CLIENT_STATUS_DUPLICATE: same client added twice - * @GST_CLIENT_STATUS_FLUSHING : client is flushing out the remaining buffers. - * - * This specifies the reason why a client was removed from - * multifdsink and is received in the "client-removed" signal. - */ -typedef enum -{ - GST_CLIENT_STATUS_OK = 0, - GST_CLIENT_STATUS_CLOSED = 1, - GST_CLIENT_STATUS_REMOVED = 2, - GST_CLIENT_STATUS_SLOW = 3, - GST_CLIENT_STATUS_ERROR = 4, - GST_CLIENT_STATUS_DUPLICATE = 5, - GST_CLIENT_STATUS_FLUSHING = 6 -} GstClientStatus; - -/* structure for a client - */ -typedef struct { - GstPollFD fd; - - gint bufpos; /* position of this client in the global queue */ - gint flushcount; /* the remaining number of buffers to flush out or -1 if the - client is not flushing. */ - - GstClientStatus status; - gboolean is_socket; - - GSList *sending; /* the buffers we need to send */ - gint bufoffset; /* offset in the first buffer */ - - gboolean discont; - - GstTCPProtocol protocol; - - gboolean caps_sent; - gboolean new_connection; - - gboolean currently_removing; - - /* method to sync client when connecting */ - GstSyncMethod sync_method; - GstTCPUnitType burst_min_unit; - guint64 burst_min_value; - GstTCPUnitType burst_max_unit; - guint64 burst_max_value; - - GstCaps *caps; /* caps of last queued buffer */ - - /* stats */ - guint64 bytes_sent; - guint64 connect_time; - guint64 disconnect_time; - guint64 last_activity_time; - guint64 dropped_buffers; - guint64 avg_queue_size; -} GstTCPClient; - -#define CLIENTS_LOCK_INIT(fdsink) (g_static_rec_mutex_init(&fdsink->clientslock)) -#define CLIENTS_LOCK_FREE(fdsink) (g_static_rec_mutex_free(&fdsink->clientslock)) -#define CLIENTS_LOCK(fdsink) (g_static_rec_mutex_lock(&fdsink->clientslock)) -#define CLIENTS_UNLOCK(fdsink) (g_static_rec_mutex_unlock(&fdsink->clientslock)) - -/** - * GstMultiFdSink: - * - * The multifdsink object structure. - */ -struct _GstMultiFdSink { - GstBaseSink element; - - /*< private >*/ - guint64 bytes_to_serve; /* how much bytes we must serve */ - guint64 bytes_served; /* how much bytes have we served */ - - GStaticRecMutex clientslock; /* lock to protect the clients list */ - GList *clients; /* list of clients we are serving */ - GHashTable *fd_hash; /* index on fd to client */ - guint clients_cookie; /* Cookie to detect changes to the clients list */ - - gint mode; - GstPoll *fdset; - - GSList *streamheader; /* GSList of GstBuffers to use as streamheader */ - gboolean previous_buffer_in_caps; - - GstTCPProtocol protocol; - guint mtu; - gint qos_dscp; - gboolean handle_read; - - GArray *bufqueue; /* global queue of buffers */ - - gboolean running; /* the thread state */ - GThread *thread; /* the sender thread */ - - /* these values are used to check if a client is reading fast - * enough and to control receovery */ - GstTCPUnitType unit_type;/* the type of the units */ - gint64 units_max; /* max units to queue for a client */ - gint64 units_soft_max; /* max units a client can lag before recovery starts */ - GstRecoverPolicy recover_policy; - GstClockTime timeout; /* max amount of nanoseconds to remain idle */ - - GstSyncMethod def_sync_method; /* what method to use for connecting clients */ - GstTCPUnitType def_burst_unit; - guint64 def_burst_value; - - /* these values are used to control the amount of data - * kept in the queues. It allows clients to perform a burst - * on connect. */ - gint bytes_min; /* min number of bytes to queue */ - gint64 time_min; /* min time to queue */ - gint buffers_min; /* min number of buffers to queue */ - - gboolean resend_streamheader; /* resend streamheader if it changes */ - - /* stats */ - gint buffers_queued; /* number of queued buffers */ - gint bytes_queued; /* number of queued bytes */ - gint time_queued; /* number of queued time */ - - guint8 header_flags; -}; - -struct _GstMultiFdSinkClass { - GstBaseSinkClass parent_class; - - /* element methods */ - void (*add) (GstMultiFdSink *sink, int fd); - void (*add_full) (GstMultiFdSink *sink, int fd, GstSyncMethod sync, - GstTCPUnitType format, guint64 value, - GstTCPUnitType max_unit, guint64 max_value); - void (*remove) (GstMultiFdSink *sink, int fd); - void (*remove_flush) (GstMultiFdSink *sink, int fd); - void (*clear) (GstMultiFdSink *sink); - GValueArray* (*get_stats) (GstMultiFdSink *sink, int fd); - - /* vtable */ - gboolean (*init) (GstMultiFdSink *sink); - gboolean (*wait) (GstMultiFdSink *sink, GstPoll *set); - gboolean (*close) (GstMultiFdSink *sink); - void (*removed) (GstMultiFdSink *sink, int fd); - - /* signals */ - void (*client_added) (GstElement *element, gint fd); - void (*client_removed) (GstElement *element, gint fd, GstClientStatus status); - void (*client_fd_removed) (GstElement *element, gint fd); -}; - -GType gst_multi_fd_sink_get_type (void); - -void gst_multi_fd_sink_add (GstMultiFdSink *sink, int fd); -void gst_multi_fd_sink_add_full (GstMultiFdSink *sink, int fd, GstSyncMethod sync, - GstTCPUnitType min_unit, guint64 min_value, - GstTCPUnitType max_unit, guint64 max_value); -void gst_multi_fd_sink_remove (GstMultiFdSink *sink, int fd); -void gst_multi_fd_sink_remove_flush (GstMultiFdSink *sink, int fd); -void gst_multi_fd_sink_clear (GstMultiFdSink *sink); -GValueArray* gst_multi_fd_sink_get_stats (GstMultiFdSink *sink, int fd); - -G_END_DECLS - -#endif /* __GST_MULTI_FD_SINK_H__ */ diff --git a/gst/tcp/gsttcp-marshal.list b/gst/tcp/gsttcp-marshal.list deleted file mode 100644 index 0d7208e1..00000000 --- a/gst/tcp/gsttcp-marshal.list +++ /dev/null @@ -1,5 +0,0 @@ -VOID:STRING,UINT -VOID:INT -VOID:INT,BOXED -VOID:INT,ENUM,INT,UINT64,INT,UINT64 -BOXED:INT diff --git a/gst/tcp/gsttcp.c b/gst/tcp/gsttcp.c deleted file mode 100644 index 5e8aae7c..00000000 --- a/gst/tcp/gsttcp.c +++ /dev/null @@ -1,569 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu> - * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org> - * - * gsttcp.c: TCP functions - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <netdb.h> -#include <unistd.h> -#include <sys/ioctl.h> - -#ifdef HAVE_FIONREAD_IN_SYS_FILIO -#include <sys/filio.h> -#endif - -#include "gsttcp.h" -#include <gst/gst-i18n-plugin.h> - -GST_DEBUG_CATEGORY_EXTERN (tcp_debug); -#define GST_CAT_DEFAULT tcp_debug - -#ifndef MSG_NOSIGNAL -#define MSG_NOSIGNAL 0 -#endif - -/* resolve host to IP address, throwing errors if it fails */ -/* host can already be an IP address */ -/* returns a newly allocated gchar * with the dotted ip address, - or NULL, in which case it already fired an error. */ -gchar * -gst_tcp_host_to_ip (GstElement * element, const gchar * host) -{ - struct hostent *hostinfo; - char **addrs; - gchar *ip; - struct in_addr addr; - - GST_DEBUG_OBJECT (element, "resolving host %s", host); - - /* first check if it already is an IP address */ - if (inet_aton (host, &addr)) { - ip = g_strdup (host); - goto beach; - } - /* FIXME: could do a localhost check here */ - - /* perform a name lookup */ - if (!(hostinfo = gethostbyname (host))) - goto resolve_error; - - if (hostinfo->h_addrtype != AF_INET) - goto not_ip; - - addrs = hostinfo->h_addr_list; - - /* There could be more than one IP address, but we just return the first */ - ip = g_strdup (inet_ntoa (*(struct in_addr *) *addrs)); - -beach: - GST_DEBUG_OBJECT (element, "resolved to IP %s", ip); - return ip; - -resolve_error: - { - GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL), - ("Could not find IP address for host \"%s\".", host)); - return NULL; - } -not_ip: - { - GST_ELEMENT_ERROR (element, RESOURCE, NOT_FOUND, (NULL), - ("host \"%s\" is not an IP host", host)); - return NULL; - } -} - -/* write buffer to given socket incrementally. - * Returns number of bytes written. - */ -gint -gst_tcp_socket_write (int socket, const void *buf, size_t count) -{ - size_t bytes_written = 0; - - while (bytes_written < count) { - ssize_t wrote = send (socket, (const char *) buf + bytes_written, - count - bytes_written, MSG_NOSIGNAL); - - if (wrote <= 0) { - GST_WARNING ("error while writing"); - return bytes_written; - } - bytes_written += wrote; - } - - GST_LOG ("wrote %" G_GSIZE_FORMAT " bytes succesfully", bytes_written); - return bytes_written; -} - -/* atomically read count bytes into buf, cancellable. return val of GST_FLOW_OK - * indicates success, anything else is failure. - */ -static GstFlowReturn -gst_tcp_socket_read (GstElement * this, int socket, void *buf, size_t count, - GstPoll * fdset) -{ - ssize_t n; - size_t bytes_read; - int num_to_read; - int ret; - - bytes_read = 0; - - while (bytes_read < count) { - /* do a blocking select on the socket */ - /* no action (0) is an error too in our case */ - if ((ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE)) <= 0) { - if (ret == -1 && errno == EBUSY) - goto cancelled; - else - goto select_error; - } - - /* ask how much is available for reading on the socket */ - if (ioctl (socket, FIONREAD, &num_to_read) < 0) - goto ioctl_error; - - if (num_to_read == 0) - goto got_eos; - - /* sizeof(ssize_t) >= sizeof(int), so I know num_to_read <= SSIZE_MAX */ - - num_to_read = MIN (num_to_read, count - bytes_read); - - n = read (socket, ((guint8 *) buf) + bytes_read, num_to_read); - - if (n < 0) - goto read_error; - - if (n < num_to_read) - goto short_read; - - bytes_read += num_to_read; - } - - return GST_FLOW_OK; - - /* ERRORS */ -select_error: - { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("select failed: %s", g_strerror (errno))); - return GST_FLOW_ERROR; - } -cancelled: - { - GST_DEBUG_OBJECT (this, "Select was cancelled"); - return GST_FLOW_WRONG_STATE; - } -ioctl_error: - { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("ioctl failed: %s", g_strerror (errno))); - return GST_FLOW_ERROR; - } -got_eos: - { - GST_DEBUG_OBJECT (this, "Got EOS on socket stream"); - return GST_FLOW_UNEXPECTED; - } -read_error: - { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("read failed: %s", g_strerror (errno))); - return GST_FLOW_ERROR; - } -short_read: - { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("short read: wanted %d bytes, got %" G_GSSIZE_FORMAT, num_to_read, n)); - return GST_FLOW_ERROR; - } -} - -/* close the socket and reset the fd. Used to clean up after errors. */ -void -gst_tcp_socket_close (GstPollFD * socket) -{ - if (socket->fd >= 0) { - close (socket->fd); - socket->fd = -1; - } -} - -/* read a buffer from the given socket - * returns: - * - a GstBuffer in which data should be read - * - NULL, indicating a connection close or an error, to be handled with - * EOS - */ -GstFlowReturn -gst_tcp_read_buffer (GstElement * this, int socket, GstPoll * fdset, - GstBuffer ** buf) -{ - int ret; - ssize_t bytes_read; - int readsize; - - *buf = NULL; - - /* do a blocking select on the socket */ - /* no action (0) is an error too in our case */ - if ((ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE)) <= 0) { - if (ret == -1 && errno == EBUSY) - goto cancelled; - else - goto select_error; - } - - /* ask how much is available for reading on the socket */ - if ((ret = ioctl (socket, FIONREAD, &readsize)) < 0) - goto ioctl_error; - - if (readsize == 0) - goto got_eos; - - /* sizeof(ssize_t) >= sizeof(int), so I know readsize <= SSIZE_MAX */ - - *buf = gst_buffer_new_and_alloc (readsize); - - bytes_read = read (socket, GST_BUFFER_DATA (*buf), readsize); - - if (bytes_read < 0) - goto read_error; - - if (bytes_read < readsize) - /* but mom, you promised to give me readsize bytes! */ - goto short_read; - - GST_LOG_OBJECT (this, "returning buffer of size %d", GST_BUFFER_SIZE (*buf)); - return GST_FLOW_OK; - - /* ERRORS */ -select_error: - { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("select failed: %s", g_strerror (errno))); - return GST_FLOW_ERROR; - } -cancelled: - { - GST_DEBUG_OBJECT (this, "Select was cancelled"); - return GST_FLOW_WRONG_STATE; - } -ioctl_error: - { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("ioctl failed: %s", g_strerror (errno))); - return GST_FLOW_ERROR; - } -got_eos: - { - GST_DEBUG_OBJECT (this, "Got EOS on socket stream"); - return GST_FLOW_UNEXPECTED; - } -read_error: - { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("read failed: %s", g_strerror (errno))); - gst_buffer_unref (*buf); - *buf = NULL; - return GST_FLOW_ERROR; - } -short_read: - { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("short read: wanted %d bytes, got %" G_GSSIZE_FORMAT, readsize, - bytes_read)); - gst_buffer_unref (*buf); - *buf = NULL; - return GST_FLOW_ERROR; - } -} - -/* read a buffer from the given socket - * returns: - * - a GstBuffer in which data should be read - * - NULL, indicating a connection close or an error, to be handled with - * EOS - */ -GstFlowReturn -gst_tcp_gdp_read_buffer (GstElement * this, int socket, GstPoll * fdset, - GstBuffer ** buf) -{ - GstFlowReturn ret; - guint8 *header = NULL; - - GST_LOG_OBJECT (this, "Reading %d bytes for buffer packet header", - GST_DP_HEADER_LENGTH); - - *buf = NULL; - header = g_malloc (GST_DP_HEADER_LENGTH); - - ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, fdset); - - if (ret != GST_FLOW_OK) - goto header_read_error; - - if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header)) - goto validate_error; - - if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_BUFFER) - goto is_not_buffer; - - GST_LOG_OBJECT (this, "validated buffer packet header"); - - *buf = gst_dp_buffer_from_header (GST_DP_HEADER_LENGTH, header); - - g_free (header); - - ret = gst_tcp_socket_read (this, socket, GST_BUFFER_DATA (*buf), - GST_BUFFER_SIZE (*buf), fdset); - - if (ret != GST_FLOW_OK) - goto data_read_error; - - return GST_FLOW_OK; - - /* ERRORS */ -header_read_error: - { - g_free (header); - return ret; - } -validate_error: - { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("GDP buffer packet header does not validate")); - g_free (header); - return GST_FLOW_ERROR; - } -is_not_buffer: - { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("GDP packet contains something that is not a buffer (type %d)", - gst_dp_header_payload_type (header))); - g_free (header); - return GST_FLOW_ERROR; - } -data_read_error: - { - gst_buffer_unref (*buf); - *buf = NULL; - return ret; - } -} - -GstFlowReturn -gst_tcp_gdp_read_caps (GstElement * this, int socket, GstPoll * fdset, - GstCaps ** caps) -{ - GstFlowReturn ret; - guint8 *header = NULL; - guint8 *payload = NULL; - size_t payload_length; - - GST_LOG_OBJECT (this, "Reading %d bytes for caps packet header", - GST_DP_HEADER_LENGTH); - - *caps = NULL; - header = g_malloc (GST_DP_HEADER_LENGTH); - - ret = gst_tcp_socket_read (this, socket, header, GST_DP_HEADER_LENGTH, fdset); - - if (ret != GST_FLOW_OK) - goto header_read_error; - - if (!gst_dp_validate_header (GST_DP_HEADER_LENGTH, header)) - goto header_validate_error; - - if (gst_dp_header_payload_type (header) != GST_DP_PAYLOAD_CAPS) - goto is_not_caps; - - GST_LOG_OBJECT (this, "validated caps packet header"); - - payload_length = gst_dp_header_payload_length (header); - payload = g_malloc (payload_length); - - GST_LOG_OBJECT (this, - "Reading %" G_GSIZE_FORMAT " bytes for caps packet payload", - payload_length); - - ret = gst_tcp_socket_read (this, socket, payload, payload_length, fdset); - - if (ret != GST_FLOW_OK) - goto payload_read_error; - - if (!gst_dp_validate_payload (GST_DP_HEADER_LENGTH, header, payload)) - goto payload_validate_error; - - *caps = gst_dp_caps_from_packet (GST_DP_HEADER_LENGTH, header, payload); - - GST_DEBUG_OBJECT (this, "Got caps over GDP: %" GST_PTR_FORMAT, *caps); - - g_free (header); - g_free (payload); - - return GST_FLOW_OK; - - /* ERRORS */ -header_read_error: - { - g_free (header); - return ret; - } -header_validate_error: - { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("GDP caps packet header does not validate")); - g_free (header); - return GST_FLOW_ERROR; - } -is_not_caps: - { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("GDP packet contains something that is not a caps (type %d)", - gst_dp_header_payload_type (header))); - g_free (header); - return GST_FLOW_ERROR; - } -payload_read_error: - { - g_free (header); - g_free (payload); - return ret; - } -payload_validate_error: - { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("GDP caps packet payload does not validate")); - g_free (header); - g_free (payload); - return GST_FLOW_ERROR; - } -} - -/* write a GDP header to the socket. Return false if fails. */ -gboolean -gst_tcp_gdp_write_buffer (GstElement * this, int socket, GstBuffer * buffer, - gboolean fatal, const gchar * host, int port) -{ - guint length; - guint8 *header; - size_t wrote; - - if (!gst_dp_header_from_buffer (buffer, 0, &length, &header)) - goto create_error; - - GST_LOG_OBJECT (this, "writing %d bytes for GDP buffer header", length); - wrote = gst_tcp_socket_write (socket, header, length); - g_free (header); - - if (wrote != length) - goto write_error; - - return TRUE; - - /* ERRORS */ -create_error: - { - if (fatal) - GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL), - ("Could not create GDP header from buffer")); - return FALSE; - } -write_error: - { - if (fatal) - GST_ELEMENT_ERROR (this, RESOURCE, WRITE, - (_("Error while sending data to \"%s:%d\"."), host, port), - ("Only %" G_GSIZE_FORMAT " of %u bytes written: %s", - wrote, GST_BUFFER_SIZE (buffer), g_strerror (errno))); - return FALSE; - } -} - -/* write GDP header and payload to the given socket for the given caps. - * Return false if fails. */ -gboolean -gst_tcp_gdp_write_caps (GstElement * this, int socket, const GstCaps * caps, - gboolean fatal, const char *host, int port) -{ - guint length; - guint8 *header; - guint8 *payload; - size_t wrote; - - if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) - goto create_error; - - GST_LOG_OBJECT (this, "writing %d bytes for GDP caps header", length); - wrote = gst_tcp_socket_write (socket, header, length); - if (wrote != length) - goto write_header_error; - - length = gst_dp_header_payload_length (header); - g_free (header); - - GST_LOG_OBJECT (this, "writing %d bytes for GDP caps payload", length); - wrote = gst_tcp_socket_write (socket, payload, length); - g_free (payload); - - if (wrote != length) - goto write_payload_error; - - return TRUE; - - /* ERRORS */ -create_error: - { - if (fatal) - GST_ELEMENT_ERROR (this, CORE, TOO_LAZY, (NULL), - ("Could not create GDP packet from caps")); - return FALSE; - } -write_header_error: - { - g_free (header); - g_free (payload); - if (fatal) - GST_ELEMENT_ERROR (this, RESOURCE, WRITE, - (_("Error while sending gdp header data to \"%s:%d\"."), host, port), - ("Only %" G_GSIZE_FORMAT " of %u bytes written: %s", - wrote, length, g_strerror (errno))); - return FALSE; - } -write_payload_error: - { - if (fatal) - GST_ELEMENT_ERROR (this, RESOURCE, WRITE, - (_("Error while sending gdp payload data to \"%s:%d\"."), host, port), - ("Only %" G_GSIZE_FORMAT " of %u bytes written: %s", - wrote, length, g_strerror (errno))); - return FALSE; - } -} diff --git a/gst/tcp/gsttcp.h b/gst/tcp/gsttcp.h deleted file mode 100644 index 533c1622..00000000 --- a/gst/tcp/gsttcp.h +++ /dev/null @@ -1,76 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu> - * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org> - * - * gsttcp.h: helper functions - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -#ifndef __GST_TCP_HELP_H__ -#define __GST_TCP_HELP_H__ - -#include "gsttcp-enumtypes.h" -#include <gst/gst.h> -#undef GST_DISABLE_DEPRECATED -#include <gst/dataprotocol/dataprotocol.h> - -#define TCP_HIGHEST_PORT 65535 -#define TCP_DEFAULT_HOST "localhost" -#define TCP_DEFAULT_PORT 4953 - -G_BEGIN_DECLS - -/** - * GstTCPProtocol: - * @GST_TCP_PROTOCOL_NONE: Raw data transmission - * @GST_TCP_PROTOCOL_GDP: #GstBuffers are wrapped and sent/received using the - * GDP protocol. - * - * This enum is provided by the tcp/multifd elements to configure the format of - * data transmission/reception. - * - * The GDP protocol wraps data buffers in a header that also carries format - * information and timestamps. The None value indicates the data is - * sent/received as-is. In that case, format information and timestamping - * must be transmitted separately, or implicit in the bytestream itself. - */ -typedef enum -{ - GST_TCP_PROTOCOL_NONE, - GST_TCP_PROTOCOL_GDP -} GstTCPProtocol; - -gchar * gst_tcp_host_to_ip (GstElement *element, const gchar *host); - -gint gst_tcp_socket_write (int socket, const void *buf, size_t count); - -void gst_tcp_socket_close (GstPollFD *socket); - -GstFlowReturn gst_tcp_read_buffer (GstElement * this, int socket, GstPoll * fdset, GstBuffer **buf); - -GstFlowReturn gst_tcp_gdp_read_buffer (GstElement * this, int socket, GstPoll * fdset, GstBuffer **buf); -GstFlowReturn gst_tcp_gdp_read_caps (GstElement * this, int socket, GstPoll * fdset, GstCaps **caps); - -GstEvent * gst_tcp_gdp_read_event (GstElement *elem, int socket, GstPoll * fdset); - -gboolean gst_tcp_gdp_write_buffer (GstElement *elem, int socket, GstBuffer *buffer, gboolean fatal, const gchar *host, int port); -gboolean gst_tcp_gdp_write_event (GstElement *elem, int socket, GstEvent *event, gboolean fatal, const gchar *host, int port); -gboolean gst_tcp_gdp_write_caps (GstElement *elem, int socket, const GstCaps *caps, gboolean fatal, const gchar *host, int port); - -G_END_DECLS - -#endif /* __GST_TCP_HELP_H__ */ diff --git a/gst/tcp/gsttcpclientsink.c b/gst/tcp/gsttcpclientsink.c deleted file mode 100644 index 53394442..00000000 --- a/gst/tcp/gsttcpclientsink.c +++ /dev/null @@ -1,464 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu> - * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -/** - * SECTION:element-tcpclientsink - * @see_also: #tcpclientsrc - * - * <refsect2> - * <title>Example launch line</title> - * |[ - * # server: - * nc -l -p 3000 - * # client: - * gst-launch fdsrc fd=1 ! tcpclientsink protocol=none port=3000 - * ]| everything you type in the client is shown on the server - * </refsect2> - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif -#include <gst/gst-i18n-plugin.h> -#include <gst/dataprotocol/dataprotocol.h> -#include "gsttcp.h" -#include "gsttcpclientsink.h" -#include <string.h> /* memset */ - -/* elementfactory information */ -static const GstElementDetails gst_tcp_client_sink_details = -GST_ELEMENT_DETAILS ("TCP client sink", - "Sink/Network", - "Send data as a client over the network via TCP", - "Thomas Vander Stichele <thomas at apestaart dot org>"); - -/* TCPClientSink signals and args */ -enum -{ - FRAME_ENCODED, - /* FILL ME */ - LAST_SIGNAL -}; - -GST_DEBUG_CATEGORY_STATIC (tcpclientsink_debug); -#define GST_CAT_DEFAULT (tcpclientsink_debug) - -enum -{ - ARG_0, - ARG_HOST, - ARG_PORT, - ARG_PROTOCOL - /* FILL ME */ -}; - -static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", - GST_PAD_SINK, - GST_PAD_ALWAYS, - GST_STATIC_CAPS_ANY); - -static void gst_tcp_client_sink_base_init (gpointer g_class); -static void gst_tcp_client_sink_class_init (GstTCPClientSink * klass); -static void gst_tcp_client_sink_init (GstTCPClientSink * tcpclientsink); -static void gst_tcp_client_sink_finalize (GObject * gobject); - -static gboolean gst_tcp_client_sink_setcaps (GstBaseSink * bsink, - GstCaps * caps); -static GstFlowReturn gst_tcp_client_sink_render (GstBaseSink * bsink, - GstBuffer * buf); -static GstStateChangeReturn gst_tcp_client_sink_change_state (GstElement * - element, GstStateChange transition); - -static void gst_tcp_client_sink_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec); -static void gst_tcp_client_sink_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec); - - -static GstElementClass *parent_class = NULL; - -/*static guint gst_tcp_client_sink_signals[LAST_SIGNAL] = { 0 }; */ - -GType -gst_tcp_client_sink_get_type (void) -{ - static GType tcpclientsink_type = 0; - - - if (!tcpclientsink_type) { - static const GTypeInfo tcpclientsink_info = { - sizeof (GstTCPClientSinkClass), - gst_tcp_client_sink_base_init, - NULL, - (GClassInitFunc) gst_tcp_client_sink_class_init, - NULL, - NULL, - sizeof (GstTCPClientSink), - 0, - (GInstanceInitFunc) gst_tcp_client_sink_init, - NULL - }; - - tcpclientsink_type = - g_type_register_static (GST_TYPE_BASE_SINK, "GstTCPClientSink", - &tcpclientsink_info, 0); - } - return tcpclientsink_type; -} - -static void -gst_tcp_client_sink_base_init (gpointer g_class) -{ - GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); - - gst_element_class_add_pad_template (element_class, - gst_static_pad_template_get (&sinktemplate)); - - gst_element_class_set_details (element_class, &gst_tcp_client_sink_details); -} - -static void -gst_tcp_client_sink_class_init (GstTCPClientSink * klass) -{ - GObjectClass *gobject_class; - GstElementClass *gstelement_class; - GstBaseSinkClass *gstbasesink_class; - - gobject_class = (GObjectClass *) klass; - gstelement_class = (GstElementClass *) klass; - gstbasesink_class = (GstBaseSinkClass *) klass; - - parent_class = g_type_class_peek_parent (klass); - - gobject_class->set_property = gst_tcp_client_sink_set_property; - gobject_class->get_property = gst_tcp_client_sink_get_property; - gobject_class->finalize = gst_tcp_client_sink_finalize; - - g_object_class_install_property (gobject_class, ARG_HOST, - g_param_spec_string ("host", "Host", "The host/IP to send the packets to", - TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, ARG_PORT, - g_param_spec_int ("port", "Port", "The port to send the packets to", - 0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, ARG_PROTOCOL, - g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in", - GST_TYPE_TCP_PROTOCOL, GST_TCP_PROTOCOL_NONE, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - gstelement_class->change_state = gst_tcp_client_sink_change_state; - - gstbasesink_class->set_caps = gst_tcp_client_sink_setcaps; - gstbasesink_class->render = gst_tcp_client_sink_render; - - GST_DEBUG_CATEGORY_INIT (tcpclientsink_debug, "tcpclientsink", 0, "TCP sink"); -} - -static void -gst_tcp_client_sink_init (GstTCPClientSink * this) -{ - this->host = g_strdup (TCP_DEFAULT_HOST); - this->port = TCP_DEFAULT_PORT; - - this->sock_fd.fd = -1; - this->protocol = GST_TCP_PROTOCOL_NONE; - GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SINK_OPEN); -} - -static void -gst_tcp_client_sink_finalize (GObject * gobject) -{ - GstTCPClientSink *this = GST_TCP_CLIENT_SINK (gobject); - - g_free (this->host); - - G_OBJECT_CLASS (parent_class)->finalize (gobject); -} - -static gboolean -gst_tcp_client_sink_setcaps (GstBaseSink * bsink, GstCaps * caps) -{ - GstTCPClientSink *sink; - - sink = GST_TCP_CLIENT_SINK (bsink); - - /* write the buffer header if we have one */ - switch (sink->protocol) { - case GST_TCP_PROTOCOL_NONE: - break; - - case GST_TCP_PROTOCOL_GDP: - /* if we haven't send caps yet, send them first */ - if (!sink->caps_sent) { - const GstCaps *caps; - gchar *string; - - caps = GST_PAD_CAPS (GST_PAD_PEER (GST_BASE_SINK_PAD (bsink))); - string = gst_caps_to_string (caps); - GST_DEBUG_OBJECT (sink, "Sending caps %s through GDP", string); - g_free (string); - - if (!gst_tcp_gdp_write_caps (GST_ELEMENT (sink), sink->sock_fd.fd, - caps, TRUE, sink->host, sink->port)) - goto gdp_write_error; - - sink->caps_sent = TRUE; - } - break; - default: - g_warning ("Unhandled protocol type"); - break; - } - - return TRUE; - - /* ERRORS */ -gdp_write_error: - { - return FALSE; - } -} - -static GstFlowReturn -gst_tcp_client_sink_render (GstBaseSink * bsink, GstBuffer * buf) -{ - size_t wrote = 0; - GstTCPClientSink *sink; - gint size; - - sink = GST_TCP_CLIENT_SINK (bsink); - - g_return_val_if_fail (GST_OBJECT_FLAG_IS_SET (sink, GST_TCP_CLIENT_SINK_OPEN), - GST_FLOW_WRONG_STATE); - - size = GST_BUFFER_SIZE (buf); - - GST_LOG_OBJECT (sink, "writing %d bytes for buffer data", size); - - /* write the buffer header if we have one */ - switch (sink->protocol) { - case GST_TCP_PROTOCOL_NONE: - break; - case GST_TCP_PROTOCOL_GDP: - GST_LOG_OBJECT (sink, "Sending buffer header through GDP"); - if (!gst_tcp_gdp_write_buffer (GST_ELEMENT (sink), sink->sock_fd.fd, buf, - TRUE, sink->host, sink->port)) - goto gdp_write_error; - break; - default: - break; - } - - /* write buffer data */ - wrote = gst_tcp_socket_write (sink->sock_fd.fd, GST_BUFFER_DATA (buf), size); - - if (wrote < size) - goto write_error; - - sink->data_written += wrote; - - return GST_FLOW_OK; - - /* ERRORS */ -gdp_write_error: - { - return FALSE; - } -write_error: - { - GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, - (_("Error while sending data to \"%s:%d\"."), sink->host, sink->port), - ("Only %" G_GSIZE_FORMAT " of %u bytes written: %s", - wrote, GST_BUFFER_SIZE (buf), g_strerror (errno))); - return GST_FLOW_ERROR; - } -} - -static void -gst_tcp_client_sink_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec) -{ - GstTCPClientSink *tcpclientsink; - - g_return_if_fail (GST_IS_TCP_CLIENT_SINK (object)); - tcpclientsink = GST_TCP_CLIENT_SINK (object); - - switch (prop_id) { - case ARG_HOST: - if (!g_value_get_string (value)) { - g_warning ("host property cannot be NULL"); - break; - } - g_free (tcpclientsink->host); - tcpclientsink->host = g_strdup (g_value_get_string (value)); - break; - case ARG_PORT: - tcpclientsink->port = g_value_get_int (value); - break; - case ARG_PROTOCOL: - tcpclientsink->protocol = g_value_get_enum (value); - break; - - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - -static void -gst_tcp_client_sink_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec) -{ - GstTCPClientSink *tcpclientsink; - - g_return_if_fail (GST_IS_TCP_CLIENT_SINK (object)); - tcpclientsink = GST_TCP_CLIENT_SINK (object); - - switch (prop_id) { - case ARG_HOST: - g_value_set_string (value, tcpclientsink->host); - break; - case ARG_PORT: - g_value_set_int (value, tcpclientsink->port); - break; - case ARG_PROTOCOL: - g_value_set_enum (value, tcpclientsink->protocol); - break; - - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - - -/* create a socket for sending to remote machine */ -static gboolean -gst_tcp_client_sink_start (GstTCPClientSink * this) -{ - int ret; - gchar *ip; - - if (GST_OBJECT_FLAG_IS_SET (this, GST_TCP_CLIENT_SINK_OPEN)) - return TRUE; - - /* reset caps_sent flag */ - this->caps_sent = FALSE; - - /* create sending client socket */ - GST_DEBUG_OBJECT (this, "opening sending client socket to %s:%d", this->host, - this->port); - if ((this->sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) { - GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE, (NULL), GST_ERROR_SYSTEM); - return FALSE; - } - GST_DEBUG_OBJECT (this, "opened sending client socket with fd %d", - this->sock_fd.fd); - - /* look up name if we need to */ - ip = gst_tcp_host_to_ip (GST_ELEMENT (this), this->host); - if (!ip) { - gst_tcp_socket_close (&this->sock_fd); - return FALSE; - } - GST_DEBUG_OBJECT (this, "IP address for host %s is %s", this->host, ip); - - /* connect to server */ - memset (&this->server_sin, 0, sizeof (this->server_sin)); - this->server_sin.sin_family = AF_INET; /* network socket */ - this->server_sin.sin_port = htons (this->port); /* on port */ - this->server_sin.sin_addr.s_addr = inet_addr (ip); /* on host ip */ - g_free (ip); - - GST_DEBUG_OBJECT (this, "connecting to server"); - ret = connect (this->sock_fd.fd, (struct sockaddr *) &this->server_sin, - sizeof (this->server_sin)); - - if (ret) { - gst_tcp_socket_close (&this->sock_fd); - switch (errno) { - case ECONNREFUSED: - GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE, - (_("Connection to %s:%d refused."), this->host, this->port), - (NULL)); - return FALSE; - break; - default: - GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), - ("connect to %s:%d failed: %s", this->host, this->port, - g_strerror (errno))); - return FALSE; - break; - } - } - - GST_OBJECT_FLAG_SET (this, GST_TCP_CLIENT_SINK_OPEN); - - this->data_written = 0; - - return TRUE; -} - -static gboolean -gst_tcp_client_sink_stop (GstTCPClientSink * this) -{ - if (!GST_OBJECT_FLAG_IS_SET (this, GST_TCP_CLIENT_SINK_OPEN)) - return TRUE; - - gst_tcp_socket_close (&this->sock_fd); - - GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SINK_OPEN); - - return TRUE; -} - -static GstStateChangeReturn -gst_tcp_client_sink_change_state (GstElement * element, - GstStateChange transition) -{ - GstTCPClientSink *sink; - GstStateChangeReturn res; - - sink = GST_TCP_CLIENT_SINK (element); - - switch (transition) { - case GST_STATE_CHANGE_NULL_TO_READY: - case GST_STATE_CHANGE_READY_TO_PAUSED: - if (!gst_tcp_client_sink_start (sink)) - goto start_failure; - break; - default: - break; - } - res = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); - - switch (transition) { - case GST_STATE_CHANGE_READY_TO_NULL: - gst_tcp_client_sink_stop (sink); - default: - break; - } - return res; - -start_failure: - { - return GST_STATE_CHANGE_FAILURE; - } -} diff --git a/gst/tcp/gsttcpclientsink.h b/gst/tcp/gsttcpclientsink.h deleted file mode 100644 index 43998704..00000000 --- a/gst/tcp/gsttcpclientsink.h +++ /dev/null @@ -1,91 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - - -#ifndef __GST_TCP_CLIENT_SINK_H__ -#define __GST_TCP_CLIENT_SINK_H__ - - -#include <gst/gst.h> -#include <gst/base/gstbasesink.h> - -#include "gsttcp.h" - -G_BEGIN_DECLS - -#include <stdio.h> -#include <stdlib.h> -#include <unistd.h> -#include <errno.h> -#include <string.h> -#include <sys/time.h> -#include <sys/types.h> -#include <netinet/in.h> -#include <netdb.h> -#include <sys/socket.h> -#include <sys/wait.h> -#include <fcntl.h> -#include <arpa/inet.h> -#include "gsttcp.h" - -#define GST_TYPE_TCP_CLIENT_SINK \ - (gst_tcp_client_sink_get_type()) -#define GST_TCP_CLIENT_SINK(obj) \ - (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCP_CLIENT_SINK,GstTCPClientSink)) -#define GST_TCP_CLIENT_SINK_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCP_CLIENT_SINK,GstTCPClientSinkClass)) -#define GST_IS_TCP_CLIENT_SINK(obj) \ - (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCP_CLIENT_SINK)) -#define GST_IS_TCP_CLIENT_SINK_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCP_CLIENT_SINK)) - -typedef struct _GstTCPClientSink GstTCPClientSink; -typedef struct _GstTCPClientSinkClass GstTCPClientSinkClass; - -typedef enum { - GST_TCP_CLIENT_SINK_OPEN = (GST_ELEMENT_FLAG_LAST << 0), - - GST_TCP_CLIENT_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2), -} GstTCPClientSinkFlags; - -struct _GstTCPClientSink { - GstBaseSink element; - - /* server information */ - int port; - gchar *host; - struct sockaddr_in server_sin; - - /* socket */ - GstPollFD sock_fd; - - size_t data_written; /* how much bytes have we written ? */ - GstTCPProtocol protocol; /* used with the protocol enum */ - gboolean caps_sent; /* whether or not we sent caps already */ -}; - -struct _GstTCPClientSinkClass { - GstBaseSinkClass parent_class; -}; - -GType gst_tcp_client_sink_get_type(void); - -G_END_DECLS - -#endif /* __GST_TCP_CLIENT_SINK_H__ */ diff --git a/gst/tcp/gsttcpclientsrc.c b/gst/tcp/gsttcpclientsrc.c deleted file mode 100644 index a0548b99..00000000 --- a/gst/tcp/gsttcpclientsrc.c +++ /dev/null @@ -1,444 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu> - * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -/** - * SECTION:element-tcpclientsrc - * @see_also: #tcpclientsink - * - * <refsect2> - * <title>Example launch line</title> - * |[ - * # server: - * nc -l -p 3000 - * # client: - * gst-launch tcpclientsrc protocol=none port=3000 ! fdsink fd=2 - * ]| everything you type in the server is shown on the client - * </refsect2> - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include <gst/gst-i18n-plugin.h> -#include "gsttcp.h" -#include "gsttcpclientsrc.h" -#include <string.h> /* memset */ -#include <unistd.h> -#include <arpa/inet.h> -#include <fcntl.h> - - -GST_DEBUG_CATEGORY_STATIC (tcpclientsrc_debug); -#define GST_CAT_DEFAULT tcpclientsrc_debug - -#define MAX_READ_SIZE 4 * 1024 - - -static const GstElementDetails gst_tcp_client_src_details = -GST_ELEMENT_DETAILS ("TCP client source", - "Source/Network", - "Receive data as a client over the network via TCP", - "Thomas Vander Stichele <thomas at apestaart dot org>"); - -static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", - GST_PAD_SRC, - GST_PAD_ALWAYS, - GST_STATIC_CAPS_ANY); - - -enum -{ - PROP_0, - PROP_HOST, - PROP_PORT, - PROP_PROTOCOL -}; - - -GST_BOILERPLATE (GstTCPClientSrc, gst_tcp_client_src, GstPushSrc, - GST_TYPE_PUSH_SRC); - - -static void gst_tcp_client_src_finalize (GObject * gobject); - -static GstCaps *gst_tcp_client_src_getcaps (GstBaseSrc * psrc); - -static GstFlowReturn gst_tcp_client_src_create (GstPushSrc * psrc, - GstBuffer ** outbuf); -static gboolean gst_tcp_client_src_stop (GstBaseSrc * bsrc); -static gboolean gst_tcp_client_src_start (GstBaseSrc * bsrc); -static gboolean gst_tcp_client_src_unlock (GstBaseSrc * bsrc); -static gboolean gst_tcp_client_src_unlock_stop (GstBaseSrc * bsrc); - -static void gst_tcp_client_src_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec); -static void gst_tcp_client_src_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec); - - -static void -gst_tcp_client_src_base_init (gpointer g_class) -{ - GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); - - gst_element_class_add_pad_template (element_class, - gst_static_pad_template_get (&srctemplate)); - - gst_element_class_set_details (element_class, &gst_tcp_client_src_details); -} - -static void -gst_tcp_client_src_class_init (GstTCPClientSrcClass * klass) -{ - GObjectClass *gobject_class; - GstBaseSrcClass *gstbasesrc_class; - GstPushSrcClass *gstpush_src_class; - - gobject_class = (GObjectClass *) klass; - gstbasesrc_class = (GstBaseSrcClass *) klass; - gstpush_src_class = (GstPushSrcClass *) klass; - - gobject_class->set_property = gst_tcp_client_src_set_property; - gobject_class->get_property = gst_tcp_client_src_get_property; - gobject_class->finalize = gst_tcp_client_src_finalize; - - g_object_class_install_property (gobject_class, PROP_HOST, - g_param_spec_string ("host", "Host", - "The host IP address to receive packets from", TCP_DEFAULT_HOST, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_PORT, - g_param_spec_int ("port", "Port", "The port to receive packets from", 0, - TCP_HIGHEST_PORT, TCP_DEFAULT_PORT, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_PROTOCOL, - g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in", - GST_TYPE_TCP_PROTOCOL, GST_TCP_PROTOCOL_NONE, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - gstbasesrc_class->get_caps = gst_tcp_client_src_getcaps; - gstbasesrc_class->start = gst_tcp_client_src_start; - gstbasesrc_class->stop = gst_tcp_client_src_stop; - gstbasesrc_class->unlock = gst_tcp_client_src_unlock; - gstbasesrc_class->unlock_stop = gst_tcp_client_src_unlock_stop; - - gstpush_src_class->create = gst_tcp_client_src_create; - - GST_DEBUG_CATEGORY_INIT (tcpclientsrc_debug, "tcpclientsrc", 0, - "TCP Client Source"); -} - -static void -gst_tcp_client_src_init (GstTCPClientSrc * this, GstTCPClientSrcClass * g_class) -{ - this->port = TCP_DEFAULT_PORT; - this->host = g_strdup (TCP_DEFAULT_HOST); - this->sock_fd.fd = -1; - this->protocol = GST_TCP_PROTOCOL_NONE; - this->caps = NULL; - - GST_OBJECT_FLAG_UNSET (this, GST_TCP_CLIENT_SRC_OPEN); -} - -static void -gst_tcp_client_src_finalize (GObject * gobject) -{ - GstTCPClientSrc *this = GST_TCP_CLIENT_SRC (gobject); - - g_free (this->host); - - G_OBJECT_CLASS (parent_class)->finalize (gobject); -} - -static GstCaps * -gst_tcp_client_src_getcaps (GstBaseSrc * bsrc) -{ - GstTCPClientSrc *src; - GstCaps *caps = NULL; - - src = GST_TCP_CLIENT_SRC (bsrc); - - if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_CLIENT_SRC_OPEN)) - caps = gst_caps_new_any (); - else if (src->caps) - caps = gst_caps_copy (src->caps); - else - caps = gst_caps_new_any (); - GST_DEBUG_OBJECT (src, "returning caps %" GST_PTR_FORMAT, caps); - g_assert (GST_IS_CAPS (caps)); - return caps; -} - -static GstFlowReturn -gst_tcp_client_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) -{ - GstTCPClientSrc *src; - GstFlowReturn ret = GST_FLOW_OK; - - src = GST_TCP_CLIENT_SRC (psrc); - - if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_CLIENT_SRC_OPEN)) - goto wrong_state; - - GST_LOG_OBJECT (src, "asked for a buffer"); - - /* read the buffer header if we're using a protocol */ - switch (src->protocol) { - case GST_TCP_PROTOCOL_NONE: - ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->sock_fd.fd, - src->fdset, outbuf); - break; - - case GST_TCP_PROTOCOL_GDP: - /* get the caps if we're using GDP */ - if (!src->caps_received) { - GstCaps *caps; - - GST_DEBUG_OBJECT (src, "getting caps through GDP"); - ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->sock_fd.fd, - src->fdset, &caps); - - if (ret != GST_FLOW_OK) - goto no_caps; - - src->caps_received = TRUE; - src->caps = caps; - } - - ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->sock_fd.fd, - src->fdset, outbuf); - break; - default: - /* need to assert as buf == NULL */ - g_assert ("Unhandled protocol type"); - break; - } - - if (ret == GST_FLOW_OK) { - GST_LOG_OBJECT (src, - "Returning buffer from _get of size %d, ts %" - GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT - ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT, - GST_BUFFER_SIZE (*outbuf), - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)), - GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)), - GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf)); - - gst_buffer_set_caps (*outbuf, src->caps); - } - - return ret; - -wrong_state: - { - GST_DEBUG_OBJECT (src, "connection to closed, cannot read data"); - return GST_FLOW_WRONG_STATE; - } -no_caps: - { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("Could not read caps through GDP")); - return ret; - } -} - -static void -gst_tcp_client_src_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec) -{ - GstTCPClientSrc *tcpclientsrc = GST_TCP_CLIENT_SRC (object); - - switch (prop_id) { - case PROP_HOST: - if (!g_value_get_string (value)) { - g_warning ("host property cannot be NULL"); - break; - } - g_free (tcpclientsrc->host); - tcpclientsrc->host = g_strdup (g_value_get_string (value)); - break; - case PROP_PORT: - tcpclientsrc->port = g_value_get_int (value); - break; - case PROP_PROTOCOL: - tcpclientsrc->protocol = g_value_get_enum (value); - break; - - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - -static void -gst_tcp_client_src_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec) -{ - GstTCPClientSrc *tcpclientsrc = GST_TCP_CLIENT_SRC (object); - - switch (prop_id) { - case PROP_HOST: - g_value_set_string (value, tcpclientsrc->host); - break; - case PROP_PORT: - g_value_set_int (value, tcpclientsrc->port); - break; - case PROP_PROTOCOL: - g_value_set_enum (value, tcpclientsrc->protocol); - break; - - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - -/* create a socket for connecting to remote server */ -static gboolean -gst_tcp_client_src_start (GstBaseSrc * bsrc) -{ - int ret; - gchar *ip; - GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc); - - if ((src->fdset = gst_poll_new (TRUE)) == NULL) - goto socket_pair; - - /* create receiving client socket */ - GST_DEBUG_OBJECT (src, "opening receiving client socket to %s:%d", - src->host, src->port); - - if ((src->sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) - goto no_socket; - - GST_DEBUG_OBJECT (src, "opened receiving client socket with fd %d", - src->sock_fd.fd); - GST_OBJECT_FLAG_SET (src, GST_TCP_CLIENT_SRC_OPEN); - - /* look up name if we need to */ - if (!(ip = gst_tcp_host_to_ip (GST_ELEMENT (src), src->host))) - goto name_resolv; - - GST_DEBUG_OBJECT (src, "IP address for host %s is %s", src->host, ip); - - /* connect to server */ - memset (&src->server_sin, 0, sizeof (src->server_sin)); - src->server_sin.sin_family = AF_INET; /* network socket */ - src->server_sin.sin_port = htons (src->port); /* on port */ - src->server_sin.sin_addr.s_addr = inet_addr (ip); /* on host ip */ - g_free (ip); - - GST_DEBUG_OBJECT (src, "connecting to server"); - ret = connect (src->sock_fd.fd, (struct sockaddr *) &src->server_sin, - sizeof (src->server_sin)); - if (ret) - goto connect_failed; - - /* add the socket to the poll */ - gst_poll_add_fd (src->fdset, &src->sock_fd); - gst_poll_fd_ctl_read (src->fdset, &src->sock_fd, TRUE); - - return TRUE; - -socket_pair: - { - GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL), - GST_ERROR_SYSTEM); - return FALSE; - } -no_socket: - { - GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM); - return FALSE; - } -name_resolv: - { - gst_tcp_client_src_stop (GST_BASE_SRC (src)); - return FALSE; - } -connect_failed: - { - gst_tcp_client_src_stop (GST_BASE_SRC (src)); - switch (errno) { - case ECONNREFUSED: - GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, - (_("Connection to %s:%d refused."), src->host, src->port), (NULL)); - break; - default: - GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), - ("connect to %s:%d failed: %s", src->host, src->port, - g_strerror (errno))); - break; - } - return FALSE; - } -} - -/* close the socket and associated resources - * unset OPEN flag - * used both to recover from errors and go to NULL state */ -static gboolean -gst_tcp_client_src_stop (GstBaseSrc * bsrc) -{ - GstTCPClientSrc *src; - - src = GST_TCP_CLIENT_SRC (bsrc); - - GST_DEBUG_OBJECT (src, "closing socket"); - - if (src->fdset != NULL) { - gst_poll_free (src->fdset); - src->fdset = NULL; - } - - gst_tcp_socket_close (&src->sock_fd); - src->caps_received = FALSE; - if (src->caps) { - gst_caps_unref (src->caps); - src->caps = NULL; - } - GST_OBJECT_FLAG_UNSET (src, GST_TCP_CLIENT_SRC_OPEN); - - return TRUE; -} - -/* will be called only between calls to start() and stop() */ -static gboolean -gst_tcp_client_src_unlock (GstBaseSrc * bsrc) -{ - GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc); - - GST_DEBUG_OBJECT (src, "set to flushing"); - gst_poll_set_flushing (src->fdset, TRUE); - - return TRUE; -} - -/* will be called only between calls to start() and stop() */ -static gboolean -gst_tcp_client_src_unlock_stop (GstBaseSrc * bsrc) -{ - GstTCPClientSrc *src = GST_TCP_CLIENT_SRC (bsrc); - - GST_DEBUG_OBJECT (src, "unset flushing"); - gst_poll_set_flushing (src->fdset, FALSE); - - return TRUE; -} diff --git a/gst/tcp/gsttcpclientsrc.h b/gst/tcp/gsttcpclientsrc.h deleted file mode 100644 index 24d31e8a..00000000 --- a/gst/tcp/gsttcpclientsrc.h +++ /dev/null @@ -1,83 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu> - * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - - -#ifndef __GST_TCP_CLIENT_SRC_H__ -#define __GST_TCP_CLIENT_SRC_H__ - -#include <gst/gst.h> -#include <gst/base/gstpushsrc.h> - -G_BEGIN_DECLS - -#include <netdb.h> /* sockaddr_in */ -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> /* sockaddr_in */ -#include <unistd.h> - -#include "gsttcp.h" - -#define GST_TYPE_TCP_CLIENT_SRC \ - (gst_tcp_client_src_get_type()) -#define GST_TCP_CLIENT_SRC(obj) \ - (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCP_CLIENT_SRC,GstTCPClientSrc)) -#define GST_TCP_CLIENT_SRC_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCP_CLIENT_SRC,GstTCPClientSrcClass)) -#define GST_IS_TCP_CLIENT_SRC(obj) \ - (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCP_CLIENT_SRC)) -#define GST_IS_TCP_CLIENT_SRC_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCP_CLIENT_SRC)) - -typedef struct _GstTCPClientSrc GstTCPClientSrc; -typedef struct _GstTCPClientSrcClass GstTCPClientSrcClass; - -typedef enum { - GST_TCP_CLIENT_SRC_OPEN = (GST_ELEMENT_FLAG_LAST << 0), - - GST_TCP_CLIENT_SRC_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2) -} GstTCPClientSrcFlags; - -struct _GstTCPClientSrc { - GstPushSrc element; - - /* server information */ - int port; - gchar *host; - struct sockaddr_in server_sin; - - /* socket */ - GstPollFD sock_fd; - GstPoll *fdset; - - GstTCPProtocol protocol; /* protocol used for reading data */ - gboolean caps_received; /* if we have received caps yet */ - GstCaps *caps; -}; - -struct _GstTCPClientSrcClass { - GstPushSrcClass parent_class; -}; - -GType gst_tcp_client_src_get_type (void); - -G_END_DECLS - -#endif /* __GST_TCP_CLIENT_SRC_H__ */ diff --git a/gst/tcp/gsttcpplugin.c b/gst/tcp/gsttcpplugin.c deleted file mode 100644 index bb2cf489..00000000 --- a/gst/tcp/gsttcpplugin.c +++ /dev/null @@ -1,63 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include <gst/dataprotocol/dataprotocol.h> -#include "gsttcpclientsrc.h" -#include "gsttcpclientsink.h" -#include "gsttcpserversrc.h" -#include "gsttcpserversink.h" -#include "gstmultifdsink.h" - -GST_DEBUG_CATEGORY (tcp_debug); - -static gboolean -plugin_init (GstPlugin * plugin) -{ - gst_dp_init (); - - if (!gst_element_register (plugin, "tcpclientsink", GST_RANK_NONE, - GST_TYPE_TCP_CLIENT_SINK)) - return FALSE; - if (!gst_element_register (plugin, "tcpclientsrc", GST_RANK_NONE, - GST_TYPE_TCP_CLIENT_SRC)) - return FALSE; - if (!gst_element_register (plugin, "tcpserversink", GST_RANK_NONE, - GST_TYPE_TCP_SERVER_SINK)) - return FALSE; - if (!gst_element_register (plugin, "tcpserversrc", GST_RANK_NONE, - GST_TYPE_TCP_SERVER_SRC)) - return FALSE; - if (!gst_element_register (plugin, "multifdsink", GST_RANK_NONE, - GST_TYPE_MULTI_FD_SINK)) - return FALSE; - - GST_DEBUG_CATEGORY_INIT (tcp_debug, "tcp", 0, "TCP calls"); - - return TRUE; -} - -GST_PLUGIN_DEFINE (GST_VERSION_MAJOR, - GST_VERSION_MINOR, - "tcp", - "transfer data over the network via TCP", - plugin_init, VERSION, GST_LICENSE, GST_PACKAGE_NAME, GST_PACKAGE_ORIGIN) diff --git a/gst/tcp/gsttcpplugin.h b/gst/tcp/gsttcpplugin.h deleted file mode 100644 index 38b91be1..00000000 --- a/gst/tcp/gsttcpplugin.h +++ /dev/null @@ -1,40 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - - -#ifndef __GST_TCP_H__ -#define __GST_TCP_H__ - -#ifdef __cplusplus -extern "C" -{ -#endif /* __cplusplus */ - - typedef enum - { - CONTROL_ZERO, - CONTROL_NONE, - CONTROL_TCP - } Gst_TCP_Control; - -#ifdef __cplusplus -} -#endif /* __cplusplus */ - -#endif /* __GST_TCP_H__ */ diff --git a/gst/tcp/gsttcpserversink.c b/gst/tcp/gsttcpserversink.c deleted file mode 100644 index fcec3159..00000000 --- a/gst/tcp/gsttcpserversink.c +++ /dev/null @@ -1,382 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu> - * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -/** - * SECTION:element-tcpserversink - * @see_also: #multifdsink - * - * <refsect2> - * <title>Example launch line</title> - * |[ - * # server: - * gst-launch fdsrc fd=1 ! tcpserversink protocol=none port=3000 - * # client: - * gst-launch tcpclientsrc protocol=none port=3000 ! fdsink fd=2 - * ]| - * </refsect2> - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif -#include <gst/gst-i18n-plugin.h> -#include <string.h> /* memset */ - -#include <sys/ioctl.h> - -#ifdef HAVE_FIONREAD_IN_SYS_FILIO -#include <sys/filio.h> -#endif - -#include "gsttcp.h" -#include "gsttcpserversink.h" -#include "gsttcp-marshal.h" - -#define TCP_BACKLOG 5 - -/* elementfactory information */ -static const GstElementDetails gst_tcp_server_sink_details = -GST_ELEMENT_DETAILS ("TCP server sink", - "Sink/Network", - "Send data as a server over the network via TCP", - "Thomas Vander Stichele <thomas at apestaart dot org>"); - -GST_DEBUG_CATEGORY_STATIC (tcpserversink_debug); -#define GST_CAT_DEFAULT (tcpserversink_debug) - -enum -{ - ARG_0, - ARG_HOST, - ARG_PORT, -}; - -static void gst_tcp_server_sink_finalize (GObject * gobject); - -static gboolean gst_tcp_server_sink_handle_wait (GstMultiFdSink * sink, - GstPoll * set); -static gboolean gst_tcp_server_sink_init_send (GstMultiFdSink * this); -static gboolean gst_tcp_server_sink_close (GstMultiFdSink * this); -static void gst_tcp_server_sink_removed (GstMultiFdSink * sink, int fd); - -static void gst_tcp_server_sink_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec); -static void gst_tcp_server_sink_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec); - - -GST_BOILERPLATE (GstTCPServerSink, gst_tcp_server_sink, GstMultiFdSink, - GST_TYPE_MULTI_FD_SINK); - - -static void -gst_tcp_server_sink_base_init (gpointer g_class) -{ - GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); - - gst_element_class_set_details (element_class, &gst_tcp_server_sink_details); -} - -static void -gst_tcp_server_sink_class_init (GstTCPServerSinkClass * klass) -{ - GObjectClass *gobject_class; - GstMultiFdSinkClass *gstmultifdsink_class; - - gobject_class = (GObjectClass *) klass; - gstmultifdsink_class = (GstMultiFdSinkClass *) klass; - - gobject_class->set_property = gst_tcp_server_sink_set_property; - gobject_class->get_property = gst_tcp_server_sink_get_property; - gobject_class->finalize = gst_tcp_server_sink_finalize; - - g_object_class_install_property (gobject_class, ARG_HOST, - g_param_spec_string ("host", "host", "The host/IP to send the packets to", - TCP_DEFAULT_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, ARG_PORT, - g_param_spec_int ("port", "port", "The port to send the packets to", - 0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - gstmultifdsink_class->init = gst_tcp_server_sink_init_send; - gstmultifdsink_class->wait = gst_tcp_server_sink_handle_wait; - gstmultifdsink_class->close = gst_tcp_server_sink_close; - gstmultifdsink_class->removed = gst_tcp_server_sink_removed; - - GST_DEBUG_CATEGORY_INIT (tcpserversink_debug, "tcpserversink", 0, "TCP sink"); -} - -static void -gst_tcp_server_sink_init (GstTCPServerSink * this, - GstTCPServerSinkClass * klass) -{ - this->server_port = TCP_DEFAULT_PORT; - /* should support as minimum 576 for IPV4 and 1500 for IPV6 */ - /* this->mtu = 1500; */ - this->host = g_strdup (TCP_DEFAULT_HOST); - - this->server_sock.fd = -1; -} - -static void -gst_tcp_server_sink_finalize (GObject * gobject) -{ - GstTCPServerSink *this = GST_TCP_SERVER_SINK (gobject); - - g_free (this->host); - - G_OBJECT_CLASS (parent_class)->finalize (gobject); -} - -/* handle a read request on the server, - * which indicates a new client connection */ -static gboolean -gst_tcp_server_sink_handle_server_read (GstTCPServerSink * sink) -{ - /* new client */ - int client_sock_fd; - struct sockaddr_in client_address; - unsigned int client_address_len; - - /* For some stupid reason, client_address and client_address_len has to be - * zeroed */ - memset (&client_address, 0, sizeof (client_address)); - client_address_len = 0; - - client_sock_fd = - accept (sink->server_sock.fd, (struct sockaddr *) &client_address, - &client_address_len); - if (client_sock_fd == -1) - goto accept_failed; - - gst_multi_fd_sink_add (GST_MULTI_FD_SINK (sink), client_sock_fd); - - GST_DEBUG_OBJECT (sink, "added new client ip %s with fd %d", - inet_ntoa (client_address.sin_addr), client_sock_fd); - - return TRUE; - - /* ERRORS */ -accept_failed: - { - GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE, (NULL), - ("Could not accept client on server socket %d: %s (%d)", - sink->server_sock.fd, g_strerror (errno), errno)); - return FALSE; - } -} - -static void -gst_tcp_server_sink_removed (GstMultiFdSink * sink, int fd) -{ -#ifndef GST_DISABLE_GST_DEBUG - GstTCPServerSink *this = GST_TCP_SERVER_SINK (sink); -#endif - - GST_LOG_OBJECT (this, "closing fd %d", fd); - if (close (fd) < 0) { - GST_WARNING_OBJECT (this, "error closing fd %d: %s", fd, - g_strerror (errno)); - } -} - -static gboolean -gst_tcp_server_sink_handle_wait (GstMultiFdSink * sink, GstPoll * set) -{ - GstTCPServerSink *this = GST_TCP_SERVER_SINK (sink); - - if (gst_poll_fd_can_read (set, &this->server_sock)) { - /* handle new client connection on server socket */ - if (!gst_tcp_server_sink_handle_server_read (this)) - goto connection_failed; - } - return TRUE; - - /* ERRORS */ -connection_failed: - { - GST_ELEMENT_ERROR (this, RESOURCE, READ, (NULL), - ("client connection failed: %s", g_strerror (errno))); - return FALSE; - } -} - -static void -gst_tcp_server_sink_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec) -{ - GstTCPServerSink *sink; - - g_return_if_fail (GST_IS_TCP_SERVER_SINK (object)); - sink = GST_TCP_SERVER_SINK (object); - - switch (prop_id) { - case ARG_HOST: - if (!g_value_get_string (value)) { - g_warning ("host property cannot be NULL"); - break; - } - g_free (sink->host); - sink->host = g_strdup (g_value_get_string (value)); - break; - case ARG_PORT: - sink->server_port = g_value_get_int (value); - break; - - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - -static void -gst_tcp_server_sink_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec) -{ - GstTCPServerSink *sink; - - g_return_if_fail (GST_IS_TCP_SERVER_SINK (object)); - sink = GST_TCP_SERVER_SINK (object); - - switch (prop_id) { - case ARG_HOST: - g_value_set_string (value, sink->host); - break; - case ARG_PORT: - g_value_set_int (value, sink->server_port); - break; - - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - - -/* create a socket for sending to remote machine */ -static gboolean -gst_tcp_server_sink_init_send (GstMultiFdSink * parent) -{ - int ret; - GstTCPServerSink *this = GST_TCP_SERVER_SINK (parent); - - /* create sending server socket */ - if ((this->server_sock.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) - goto no_socket; - - GST_DEBUG_OBJECT (this, "opened sending server socket with fd %d", - this->server_sock.fd); - - /* make address reusable */ - ret = 1; - if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_REUSEADDR, - (void *) &ret, sizeof (ret)) < 0) - goto reuse_failed; - - /* keep connection alive; avoids SIGPIPE during write */ - ret = 1; - if (setsockopt (this->server_sock.fd, SOL_SOCKET, SO_KEEPALIVE, - (void *) &ret, sizeof (ret)) < 0) - goto keepalive_failed; - - /* name the socket */ - memset (&this->server_sin, 0, sizeof (this->server_sin)); - this->server_sin.sin_family = AF_INET; /* network socket */ - this->server_sin.sin_port = htons (this->server_port); /* on port */ - this->server_sin.sin_addr.s_addr = htonl (INADDR_ANY); /* for hosts */ - - /* bind it */ - GST_DEBUG_OBJECT (this, "binding server socket to address"); - ret = bind (this->server_sock.fd, (struct sockaddr *) &this->server_sin, - sizeof (this->server_sin)); - if (ret) - goto bind_failed; - - /* set the server socket to nonblocking */ - fcntl (this->server_sock.fd, F_SETFL, O_NONBLOCK); - - GST_DEBUG_OBJECT (this, "listening on server socket %d with queue of %d", - this->server_sock.fd, TCP_BACKLOG); - if (listen (this->server_sock.fd, TCP_BACKLOG) == -1) - goto listen_failed; - - GST_DEBUG_OBJECT (this, - "listened on server socket %d, returning from connection setup", - this->server_sock.fd); - - gst_poll_add_fd (parent->fdset, &this->server_sock); - gst_poll_fd_ctl_read (parent->fdset, &this->server_sock, TRUE); - - return TRUE; - - /* ERRORS */ -no_socket: - { - GST_ELEMENT_ERROR (this, RESOURCE, OPEN_WRITE, (NULL), GST_ERROR_SYSTEM); - return FALSE; - } -reuse_failed: - { - gst_tcp_socket_close (&this->server_sock); - GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL), - ("Could not setsockopt: %s", g_strerror (errno))); - return FALSE; - } -keepalive_failed: - { - gst_tcp_socket_close (&this->server_sock); - GST_ELEMENT_ERROR (this, RESOURCE, SETTINGS, (NULL), - ("Could not setsockopt: %s", g_strerror (errno))); - return FALSE; - } -listen_failed: - { - gst_tcp_socket_close (&this->server_sock); - GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), - ("Could not listen on server socket: %s", g_strerror (errno))); - return FALSE; - } -bind_failed: - { - gst_tcp_socket_close (&this->server_sock); - switch (errno) { - default: - GST_ELEMENT_ERROR (this, RESOURCE, OPEN_READ, (NULL), - ("bind on port %d failed: %s", this->server_port, - g_strerror (errno))); - break; - } - return FALSE; - } -} - -static gboolean -gst_tcp_server_sink_close (GstMultiFdSink * parent) -{ - GstTCPServerSink *this = GST_TCP_SERVER_SINK (parent); - - if (this->server_sock.fd != -1) { - gst_poll_remove_fd (parent->fdset, &this->server_sock); - - close (this->server_sock.fd); - this->server_sock.fd = -1; - } - return TRUE; -} diff --git a/gst/tcp/gsttcpserversink.h b/gst/tcp/gsttcpserversink.h deleted file mode 100644 index ac8846da..00000000 --- a/gst/tcp/gsttcpserversink.h +++ /dev/null @@ -1,90 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu> - * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - - -#ifndef __GST_TCP_SERVER_SINK_H__ -#define __GST_TCP_SERVER_SINK_H__ - - -#include <gst/gst.h> - -G_BEGIN_DECLS - -#include <stdio.h> -#include <stdlib.h> -#include <unistd.h> -#include <errno.h> -#include <string.h> -#include <sys/time.h> -#include <sys/types.h> -#include <netinet/in.h> -#include <netdb.h> -#include <sys/socket.h> -#include <sys/wait.h> -#include <fcntl.h> -#include <arpa/inet.h> -#include "gstmultifdsink.h" - -#define GST_TYPE_TCP_SERVER_SINK \ - (gst_tcp_server_sink_get_type()) -#define GST_TCP_SERVER_SINK(obj) \ - (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCP_SERVER_SINK,GstTCPServerSink)) -#define GST_TCP_SERVER_SINK_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCP_SERVER_SINK,GstTCPServerSinkClass)) -#define GST_IS_TCP_SERVER_SINK(obj) \ - (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCP_SERVER_SINK)) -#define GST_IS_TCP_SERVER_SINK_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCP_SERVER_SINK)) - -typedef struct _GstTCPServerSink GstTCPServerSink; -typedef struct _GstTCPServerSinkClass GstTCPServerSinkClass; - -typedef enum { - GST_TCP_SERVER_SINK_OPEN = (GST_ELEMENT_FLAG_LAST << 0), - - GST_TCP_SERVER_SINK_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2) -} GstTCPServerSinkFlags; - -/** - * GstTCPServerSink: - * - * Opaque data structure. - */ -struct _GstTCPServerSink { - GstMultiFdSink element; - - /* server information */ - int server_port; - gchar *host; - struct sockaddr_in server_sin; - - /* socket */ - GstPollFD server_sock; -}; - -struct _GstTCPServerSinkClass { - GstMultiFdSinkClass parent_class; -}; - -GType gst_tcp_server_sink_get_type (void); - -G_END_DECLS - -#endif /* __GST_TCP_SERVER_SINK_H__ */ diff --git a/gst/tcp/gsttcpserversrc.c b/gst/tcp/gsttcpserversrc.c deleted file mode 100644 index 1273e0a8..00000000 --- a/gst/tcp/gsttcpserversrc.c +++ /dev/null @@ -1,486 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu> - * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -/** - * SECTION:element-tcpserversrc - * @see_also: #tcpserversink - * - * <refsect2> - * <title>Example launch line</title> - * |[ - * # server: - * gst-launch tcpserversrc protocol=none port=3000 ! fdsink fd=2 - * # client: - * gst-launch fdsrc fd=1 ! tcpclientsink protocol=none port=3000 - * ]| - * </refsect2> - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include <gst/gst-i18n-plugin.h> -#include "gsttcp.h" -#include "gsttcpserversrc.h" -#include <string.h> /* memset */ -#include <unistd.h> -#include <sys/ioctl.h> -#include <fcntl.h> - - -GST_DEBUG_CATEGORY_STATIC (tcpserversrc_debug); -#define GST_CAT_DEFAULT tcpserversrc_debug - -#define TCP_DEFAULT_LISTEN_HOST NULL /* listen on all interfaces */ -#define TCP_BACKLOG 1 /* client connection queue */ - - -static const GstElementDetails gst_tcp_server_src_details = -GST_ELEMENT_DETAILS ("TCP server source", - "Source/Network", - "Receive data as a server over the network via TCP", - "Thomas Vander Stichele <thomas at apestaart dot org>"); - -static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src", - GST_PAD_SRC, - GST_PAD_ALWAYS, - GST_STATIC_CAPS_ANY); - - -enum -{ - PROP_0, - PROP_HOST, - PROP_PORT, - PROP_PROTOCOL -}; - - -GST_BOILERPLATE (GstTCPServerSrc, gst_tcp_server_src, GstPushSrc, - GST_TYPE_PUSH_SRC); - - -static void gst_tcp_server_src_finalize (GObject * gobject); - -static gboolean gst_tcp_server_src_start (GstBaseSrc * bsrc); -static gboolean gst_tcp_server_src_stop (GstBaseSrc * bsrc); -static gboolean gst_tcp_server_src_unlock (GstBaseSrc * bsrc); -static GstFlowReturn gst_tcp_server_src_create (GstPushSrc * psrc, - GstBuffer ** buf); - -static void gst_tcp_server_src_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec); -static void gst_tcp_server_src_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec); - - -static void -gst_tcp_server_src_base_init (gpointer g_class) -{ - GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); - - gst_element_class_add_pad_template (element_class, - gst_static_pad_template_get (&srctemplate)); - - gst_element_class_set_details (element_class, &gst_tcp_server_src_details); -} - -static void -gst_tcp_server_src_class_init (GstTCPServerSrcClass * klass) -{ - GObjectClass *gobject_class; - GstBaseSrcClass *gstbasesrc_class; - GstPushSrcClass *gstpush_src_class; - - gobject_class = (GObjectClass *) klass; - gstbasesrc_class = (GstBaseSrcClass *) klass; - gstpush_src_class = (GstPushSrcClass *) klass; - - gobject_class->set_property = gst_tcp_server_src_set_property; - gobject_class->get_property = gst_tcp_server_src_get_property; - gobject_class->finalize = gst_tcp_server_src_finalize; - - g_object_class_install_property (gobject_class, PROP_HOST, - g_param_spec_string ("host", "Host", "The hostname to listen as", - TCP_DEFAULT_LISTEN_HOST, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_PORT, - g_param_spec_int ("port", "Port", "The port to listen to", - 0, TCP_HIGHEST_PORT, TCP_DEFAULT_PORT, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, PROP_PROTOCOL, - g_param_spec_enum ("protocol", "Protocol", "The protocol to wrap data in", - GST_TYPE_TCP_PROTOCOL, GST_TCP_PROTOCOL_NONE, - G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); - - gstbasesrc_class->start = gst_tcp_server_src_start; - gstbasesrc_class->stop = gst_tcp_server_src_stop; - gstbasesrc_class->unlock = gst_tcp_server_src_unlock; - - gstpush_src_class->create = gst_tcp_server_src_create; - - GST_DEBUG_CATEGORY_INIT (tcpserversrc_debug, "tcpserversrc", 0, - "TCP Server Source"); -} - -static void -gst_tcp_server_src_init (GstTCPServerSrc * src, GstTCPServerSrcClass * g_class) -{ - src->server_port = TCP_DEFAULT_PORT; - src->host = g_strdup (TCP_DEFAULT_HOST); - src->server_sock_fd.fd = -1; - src->client_sock_fd.fd = -1; - src->protocol = GST_TCP_PROTOCOL_NONE; - - GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN); -} - -static void -gst_tcp_server_src_finalize (GObject * gobject) -{ - GstTCPServerSrc *src = GST_TCP_SERVER_SRC (gobject); - - g_free (src->host); - - G_OBJECT_CLASS (parent_class)->finalize (gobject); -} - -static GstFlowReturn -gst_tcp_server_src_create (GstPushSrc * psrc, GstBuffer ** outbuf) -{ - GstTCPServerSrc *src; - GstFlowReturn ret = GST_FLOW_OK; - - src = GST_TCP_SERVER_SRC (psrc); - - if (!GST_OBJECT_FLAG_IS_SET (src, GST_TCP_SERVER_SRC_OPEN)) - goto wrong_state; - -restart: - if (src->client_sock_fd.fd >= 0) { - /* if we have a client, wait for read */ - gst_poll_fd_ctl_read (src->fdset, &src->server_sock_fd, FALSE); - gst_poll_fd_ctl_read (src->fdset, &src->client_sock_fd, TRUE); - } else { - /* else wait on server socket for connections */ - gst_poll_fd_ctl_read (src->fdset, &src->server_sock_fd, TRUE); - } - - /* no action (0) is an error too in our case */ - if ((ret = gst_poll_wait (src->fdset, GST_CLOCK_TIME_NONE)) <= 0) { - if (ret == -1 && errno == EBUSY) - goto select_cancelled; - else - goto select_error; - } - - /* if we have no client socket we can accept one now */ - if (src->client_sock_fd.fd < 0) { - if (gst_poll_fd_can_read (src->fdset, &src->server_sock_fd)) { - if ((src->client_sock_fd.fd = - accept (src->server_sock_fd.fd, - (struct sockaddr *) &src->client_sin, - &src->client_sin_len)) == -1) - goto accept_error; - - gst_poll_add_fd (src->fdset, &src->client_sock_fd); - } - /* and restart now to poll the socket. */ - goto restart; - } - - GST_LOG_OBJECT (src, "asked for a buffer"); - - switch (src->protocol) { - case GST_TCP_PROTOCOL_NONE: - ret = gst_tcp_read_buffer (GST_ELEMENT (src), src->client_sock_fd.fd, - src->fdset, outbuf); - break; - - case GST_TCP_PROTOCOL_GDP: - if (!src->caps_received) { - GstCaps *caps; - gchar *string; - - ret = gst_tcp_gdp_read_caps (GST_ELEMENT (src), src->client_sock_fd.fd, - src->fdset, &caps); - - if (ret == GST_FLOW_WRONG_STATE) - goto gdp_cancelled; - - if (ret != GST_FLOW_OK) - goto gdp_caps_read_error; - - src->caps_received = TRUE; - string = gst_caps_to_string (caps); - GST_DEBUG_OBJECT (src, "Received caps through GDP: %s", string); - g_free (string); - - gst_pad_set_caps (GST_BASE_SRC_PAD (psrc), caps); - } - - ret = gst_tcp_gdp_read_buffer (GST_ELEMENT (src), src->client_sock_fd.fd, - src->fdset, outbuf); - - if (ret == GST_FLOW_OK) - gst_buffer_set_caps (*outbuf, GST_PAD_CAPS (GST_BASE_SRC_PAD (src))); - - break; - - default: - /* need to assert as buf == NULL */ - g_assert ("Unhandled protocol type"); - break; - } - - if (ret == GST_FLOW_OK) { - GST_LOG_OBJECT (src, - "Returning buffer from _get of size %d, ts %" - GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT - ", offset %" G_GINT64_FORMAT ", offset_end %" G_GINT64_FORMAT, - GST_BUFFER_SIZE (*outbuf), - GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (*outbuf)), - GST_TIME_ARGS (GST_BUFFER_DURATION (*outbuf)), - GST_BUFFER_OFFSET (*outbuf), GST_BUFFER_OFFSET_END (*outbuf)); - } - - return ret; - -wrong_state: - { - GST_DEBUG_OBJECT (src, "connection to closed, cannot read data"); - return GST_FLOW_WRONG_STATE; - } -select_error: - { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("Select error: %s", g_strerror (errno))); - return GST_FLOW_ERROR; - } -select_cancelled: - { - GST_DEBUG_OBJECT (src, "select canceled"); - return GST_FLOW_WRONG_STATE; - } -accept_error: - { - GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), - ("Could not accept client on server socket: %s", g_strerror (errno))); - return GST_FLOW_ERROR; - } -gdp_cancelled: - { - GST_DEBUG_OBJECT (src, "reading gdp canceled"); - return GST_FLOW_WRONG_STATE; - } -gdp_caps_read_error: - { - /* if we did not get canceled, report an error */ - if (ret != GST_FLOW_WRONG_STATE) { - GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), - ("Could not read caps through GDP")); - } - return ret; - } -} - -static void -gst_tcp_server_src_set_property (GObject * object, guint prop_id, - const GValue * value, GParamSpec * pspec) -{ - GstTCPServerSrc *tcpserversrc = GST_TCP_SERVER_SRC (object); - - switch (prop_id) { - case PROP_HOST: - if (!g_value_get_string (value)) { - g_warning ("host property cannot be NULL"); - break; - } - g_free (tcpserversrc->host); - tcpserversrc->host = g_strdup (g_value_get_string (value)); - break; - case PROP_PORT: - tcpserversrc->server_port = g_value_get_int (value); - break; - case PROP_PROTOCOL: - tcpserversrc->protocol = g_value_get_enum (value); - break; - - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - -static void -gst_tcp_server_src_get_property (GObject * object, guint prop_id, - GValue * value, GParamSpec * pspec) -{ - GstTCPServerSrc *tcpserversrc = GST_TCP_SERVER_SRC (object); - - switch (prop_id) { - case PROP_HOST: - g_value_set_string (value, tcpserversrc->host); - break; - case PROP_PORT: - g_value_set_int (value, tcpserversrc->server_port); - break; - case PROP_PROTOCOL: - g_value_set_enum (value, tcpserversrc->protocol); - break; - - default: - G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); - break; - } -} - -/* set up server */ -static gboolean -gst_tcp_server_src_start (GstBaseSrc * bsrc) -{ - int ret; - GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc); - - /* reset caps_received flag */ - src->caps_received = FALSE; - - /* create the server listener socket */ - if ((src->server_sock_fd.fd = socket (AF_INET, SOCK_STREAM, 0)) == -1) - goto socket_error; - - GST_DEBUG_OBJECT (src, "opened receiving server socket with fd %d", - src->server_sock_fd.fd); - - /* make address reusable */ - ret = 1; - if (setsockopt (src->server_sock_fd.fd, SOL_SOCKET, SO_REUSEADDR, &ret, - sizeof (int)) < 0) - goto sock_opt; - - /* name the socket */ - memset (&src->server_sin, 0, sizeof (src->server_sin)); - src->server_sin.sin_family = AF_INET; /* network socket */ - src->server_sin.sin_port = htons (src->server_port); /* on port */ - if (src->host) { - gchar *host; - - if (!(host = gst_tcp_host_to_ip (GST_ELEMENT (src), src->host))) - goto host_error; - src->server_sin.sin_addr.s_addr = inet_addr (host); - g_free (host); - } else - src->server_sin.sin_addr.s_addr = htonl (INADDR_ANY); - - /* bind it */ - GST_DEBUG_OBJECT (src, "binding server socket to address"); - if ((ret = bind (src->server_sock_fd.fd, (struct sockaddr *) &src->server_sin, - sizeof (src->server_sin))) < 0) - goto bind_error; - - GST_DEBUG_OBJECT (src, "listening on server socket %d with queue of %d", - src->server_sock_fd.fd, TCP_BACKLOG); - - if (listen (src->server_sock_fd.fd, TCP_BACKLOG) == -1) - goto listen_error; - - /* create an fdset to keep track of our file descriptors */ - if ((src->fdset = gst_poll_new (TRUE)) == NULL) - goto socket_pair; - - gst_poll_add_fd (src->fdset, &src->server_sock_fd); - - GST_DEBUG_OBJECT (src, "received client"); - - GST_OBJECT_FLAG_SET (src, GST_TCP_SERVER_SRC_OPEN); - - return TRUE; - - /* ERRORS */ -socket_error: - { - GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), GST_ERROR_SYSTEM); - return FALSE; - } -sock_opt: - { - GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, (NULL), - ("Could not setsockopt: %s", g_strerror (errno))); - gst_tcp_socket_close (&src->server_sock_fd); - return FALSE; - } -host_error: - { - gst_tcp_socket_close (&src->server_sock_fd); - return FALSE; - } -bind_error: - { - gst_tcp_socket_close (&src->server_sock_fd); - switch (errno) { - default: - GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), - ("bind failed: %s", g_strerror (errno))); - break; - } - return FALSE; - } -listen_error: - { - gst_tcp_socket_close (&src->server_sock_fd); - GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), - ("Could not listen on server socket: %s", g_strerror (errno))); - return FALSE; - } -socket_pair: - { - GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL), - GST_ERROR_SYSTEM); - gst_tcp_socket_close (&src->server_sock_fd); - return FALSE; - } -} - -static gboolean -gst_tcp_server_src_stop (GstBaseSrc * bsrc) -{ - GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc); - - gst_poll_free (src->fdset); - src->fdset = NULL; - - gst_tcp_socket_close (&src->server_sock_fd); - gst_tcp_socket_close (&src->client_sock_fd); - - GST_OBJECT_FLAG_UNSET (src, GST_TCP_SERVER_SRC_OPEN); - - return TRUE; -} - -/* will be called only between calls to start() and stop() */ -static gboolean -gst_tcp_server_src_unlock (GstBaseSrc * bsrc) -{ - GstTCPServerSrc *src = GST_TCP_SERVER_SRC (bsrc); - - gst_poll_set_flushing (src->fdset, TRUE); - - return TRUE; -} diff --git a/gst/tcp/gsttcpserversrc.h b/gst/tcp/gsttcpserversrc.h deleted file mode 100644 index 22c7afe2..00000000 --- a/gst/tcp/gsttcpserversrc.h +++ /dev/null @@ -1,89 +0,0 @@ -/* GStreamer - * Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu> - * Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org> - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - - -#ifndef __GST_TCP_SERVER_SRC_H__ -#define __GST_TCP_SERVER_SRC_H__ - -#include <gst/gst.h> -#include <gst/base/gstpushsrc.h> - -G_END_DECLS - -#include <errno.h> -#include <string.h> -#include <sys/types.h> -#include <netdb.h> -#include <sys/socket.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include "gsttcp.h" - -#include <fcntl.h> - -#define GST_TYPE_TCP_SERVER_SRC \ - (gst_tcp_server_src_get_type()) -#define GST_TCP_SERVER_SRC(obj) \ - (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_TCP_SERVER_SRC,GstTCPServerSrc)) -#define GST_TCP_SERVER_SRC_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_TCP_SERVER_SRC,GstTCPServerSrcClass)) -#define GST_IS_TCP_SERVER_SRC(obj) \ - (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_TCP_SERVER_SRC)) -#define GST_IS_TCP_SERVER_SRC_CLASS(klass) \ - (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_TCP_SERVER_SRC)) - -typedef struct _GstTCPServerSrc GstTCPServerSrc; -typedef struct _GstTCPServerSrcClass GstTCPServerSrcClass; - -typedef enum { - GST_TCP_SERVER_SRC_OPEN = (GST_ELEMENT_FLAG_LAST << 0), - - GST_TCP_SERVER_SRC_FLAG_LAST = (GST_ELEMENT_FLAG_LAST << 2) -} GstTCPServerSrcFlags; - -struct _GstTCPServerSrc { - GstPushSrc element; - - /* server information */ - int server_port; - gchar *host; - struct sockaddr_in server_sin; - GstPollFD server_sock_fd; - - /* client information */ - struct sockaddr_in client_sin; - socklen_t client_sin_len; - GstPollFD client_sock_fd; - - GstPoll *fdset; - - GstTCPProtocol protocol; /* protocol used for reading data */ - gboolean caps_received; /* if we have received caps yet */ -}; - -struct _GstTCPServerSrcClass { - GstPushSrcClass parent_class; -}; - -GType gst_tcp_server_src_get_type (void); - -G_BEGIN_DECLS - -#endif /* __GST_TCP_SERVER_SRC_H__ */ diff --git a/gst/tcp/tcp.vcproj b/gst/tcp/tcp.vcproj deleted file mode 100644 index 4573f336..00000000 --- a/gst/tcp/tcp.vcproj +++ /dev/null @@ -1,160 +0,0 @@ -<?xml version="1.0" encoding="Windows-1252"?> -<VisualStudioProject - ProjectType="Visual C++" - Version="7.10" - Name="avi" - ProjectGUID="{979C216F-0ACF-4956-AE00-055A42D678D1}" - Keyword="Win32Proj"> - <Platforms> - <Platform - Name="Win32"/> - </Platforms> - <Configurations> - <Configuration - Name="Debug|Win32" - OutputDirectory="../../win32/Debug" - IntermediateDirectory="../../win32/Debug" - ConfigurationType="2" - CharacterSet="2"> - <Tool - Name="VCCLCompilerTool" - Optimization="0" - AdditionalIncludeDirectories="../../../gstreamer/win32;../../../gstreamer;../../../gstreamer/libs;../../../glib;../../../glib/glib;../../../glib/gmodule;"../../gst-libs";../../../popt/include;../../../libxml2/include/libxml2" - PreprocessorDefinitions="WIN32;_DEBUG;_WINDOWS;_USRDLL;AVI_EXPORTS;HAVE_CONFIG_H;_USE_MATH_DEFINES" - MinimalRebuild="TRUE" - BasicRuntimeChecks="3" - RuntimeLibrary="3" - UsePrecompiledHeader="0" - WarningLevel="3" - Detect64BitPortabilityProblems="TRUE" - DebugInformationFormat="4"/> - <Tool - Name="VCCustomBuildTool"/> - <Tool - Name="VCLinkerTool" - AdditionalDependencies="glib-2.0.lib gmodule-2.0.lib gthread-2.0.lib gobject-2.0.lib libgstreamer.lib gstbytestream.lib iconv.lib intl.lib" - OutputFile="$(OutDir)/gstavi.dll" - LinkIncremental="2" - AdditionalLibraryDirectories="../../../gstreamer/win32/Debug;../../../glib/glib;../../../glib/gmodule;../../../glib/gthread;../../../glib/gobject;../../../gettext/lib;../../../libiconv/lib" - ModuleDefinitionFile="" - GenerateDebugInformation="TRUE" - ProgramDatabaseFile="$(OutDir)/avi.pdb" - SubSystem="2" - OptimizeReferences="2" - ImportLibrary="$(OutDir)/gstavi.lib" - TargetMachine="1"/> - <Tool - Name="VCMIDLTool"/> - <Tool - Name="VCPostBuildEventTool" - CommandLine="copy /Y $(TargetPath) c:\gstreamer\plugins"/> - <Tool - Name="VCPreBuildEventTool"/> - <Tool - Name="VCPreLinkEventTool"/> - <Tool - Name="VCResourceCompilerTool"/> - <Tool - Name="VCWebServiceProxyGeneratorTool"/> - <Tool - Name="VCXMLDataGeneratorTool"/> - <Tool - Name="VCWebDeploymentTool"/> - <Tool - Name="VCManagedWrapperGeneratorTool"/> - <Tool - Name="VCAuxiliaryManagedWrapperGeneratorTool"/> - </Configuration> - <Configuration - Name="Release|Win32" - OutputDirectory="../../win32/Release" - IntermediateDirectory="../../win32/Release" - ConfigurationType="2" - CharacterSet="2"> - <Tool - Name="VCCLCompilerTool" - AdditionalIncludeDirectories="../../../gstreamer/win32;../../../gstreamer;../../../gstreamer/libs;../../../glib;../../../glib/glib;../../../glib/gmodule;"../../gst-libs";../../../popt/include;../../../libxml2/include/libxml2" - PreprocessorDefinitions="WIN32;NDEBUG;GST_DISABLE_GST_DEBUG;_WINDOWS;_USRDLL;AVI_EXPORTS;HAVE_CONFIG_H;_USE_MATH_DEFINES" - RuntimeLibrary="2" - UsePrecompiledHeader="0" - WarningLevel="3" - Detect64BitPortabilityProblems="TRUE" - DebugInformationFormat="3"/> - <Tool - Name="VCCustomBuildTool"/> - <Tool - Name="VCLinkerTool" - AdditionalDependencies="glib-2.0.lib gmodule-2.0.lib gthread-2.0.lib gobject-2.0.lib libgstreamer.lib gstbytestream.lib iconv.lib intl.lib" - OutputFile="$(OutDir)/gstavi.dll" - LinkIncremental="1" - AdditionalLibraryDirectories="../../../gstreamer/win32/Release;../../../glib/glib;../../../glib/gmodule;../../../glib/gthread;../../../glib/gobject;../../../gettext/lib;../../../libiconv/lib" - ModuleDefinitionFile="" - GenerateDebugInformation="TRUE" - SubSystem="2" - OptimizeReferences="2" - EnableCOMDATFolding="2" - ImportLibrary="$(OutDir)/gstavi.lib" - TargetMachine="1"/> - <Tool - Name="VCMIDLTool"/> - <Tool - Name="VCPostBuildEventTool" - CommandLine="copy /Y $(TargetPath) c:\gstreamer\plugins"/> - <Tool - Name="VCPreBuildEventTool"/> - <Tool - Name="VCPreLinkEventTool"/> - <Tool - Name="VCResourceCompilerTool"/> - <Tool - Name="VCWebServiceProxyGeneratorTool"/> - <Tool - Name="VCXMLDataGeneratorTool"/> - <Tool - Name="VCWebDeploymentTool"/> - <Tool - Name="VCManagedWrapperGeneratorTool"/> - <Tool - Name="VCAuxiliaryManagedWrapperGeneratorTool"/> - </Configuration> - </Configurations> - <References> - </References> - <Files> - <Filter - Name="Source Files" - Filter="cpp;c;cxx;def;odl;idl;hpj;bat;asm;asmx" - UniqueIdentifier="{4FC737F1-C7A5-4376-A066-2A32D752A2FF}"> - <File - RelativePath=".\gstavi.c"> - </File> - <File - RelativePath=".\gstavidemux.c"> - </File> - <File - RelativePath=".\gstavimux.c"> - </File> - </Filter> - <Filter - Name="Header Files" - Filter="h;hpp;hxx;hm;inl;inc;xsd" - UniqueIdentifier="{93995380-89BD-4b04-88EB-625FBE52EBFB}"> - <File - RelativePath=".\avi-ids.h"> - </File> - <File - RelativePath=".\gstavidemux.h"> - </File> - <File - RelativePath=".\gstavimux.h"> - </File> - </Filter> - <Filter - Name="Resource Files" - Filter="rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx" - UniqueIdentifier="{67DA6AB6-F800-4c08-8B7A-83BB121AAD01}"> - </Filter> - </Files> - <Globals> - </Globals> -</VisualStudioProject> |