Discussion:
[pulseaudio-discuss] [PATCH 0/8] *** Overview ***
Yclept Nemo
2018-07-17 00:25:36 UTC
Permalink
Module zeroconf-discover would add the same sink twice: once over IPv4, once
over IPv6. Fixes:

* Options to disable IPv4 and IPv6. If both options are provided, the module
fails to load.
* Option to only load one tunnel per avahi service name and type.

All the tunnel modules would immediately add a sink or source, even before
authentication. Fix:

* Add the sink/source only after the context is successfully connected.

And some additional fixes:

* zeroconf-discover: fix memory leak and prevent double-free.
* pa_object_new: fix missing close paren.


Yclept Nemo (8):
zeroconf-discover: add arguments to disable ipv4/6
zeroconf-discover: fix memory issues
zeroconf-discover: add argument 'one_per_name_type'
tunnel*: put sink/source after authentication
Fixes
tunnel*: redo 'put sink/source after auth..'
Fix pa_object_new macro (missing close paren)
zeroconf-discover: fix 'one_per_name_type'

src/modules/module-tunnel-sink-new.c | 273 ++++++++++++++---------
src/modules/module-tunnel-source-new.c | 345 +++++++++++++++++------------
src/modules/module-tunnel.c | 198 ++++++++++-------
src/modules/module-zeroconf-discover.c | 382 ++++++++++++++++++++++++++++-----
src/pulsecore/object.h | 2 +-
5 files changed, 836 insertions(+), 364 deletions(-)
--
2.14.4
Yclept Nemo
2018-07-17 00:25:37 UTC
Permalink
---
src/modules/module-zeroconf-discover.c | 57 +++++++++++++++++++++++++++++++---
1 file changed, 53 insertions(+), 4 deletions(-)

diff --git a/src/modules/module-zeroconf-discover.c b/src/modules/module-zeroconf-discover.c
index 612e2ed4..bc61b403 100644
--- a/src/modules/module-zeroconf-discover.c
+++ b/src/modules/module-zeroconf-discover.c
@@ -51,6 +51,8 @@ PA_MODULE_LOAD_ONCE(true);
#define SERVICE_TYPE_SOURCE "_non-monitor._sub._pulse-source._tcp"

static const char* const valid_modargs[] = {
+ "disable_ipv4",
+ "disable_ipv6",
NULL
};

@@ -67,6 +69,7 @@ struct userdata {
AvahiPoll *avahi_poll;
AvahiClient *client;
AvahiServiceBrowser *source_browser, *sink_browser;
+ AvahiProtocol protocol;

pa_hashmap *tunnels;
};
@@ -134,10 +137,15 @@ static void resolver_cb(
void *userdata) {

struct userdata *u = userdata;
- struct tunnel *tnl;
+ struct tunnel *tnl = NULL;

pa_assert(u);

+ if (u->protocol != AVAHI_PROTO_UNSPEC && u->protocol != protocol) {
+ pa_log_warn("Expected address protocol '%i' but received '%i'", u->protocol, protocol);
+ goto finish;
+ }
+
tnl = tunnel_new(interface, protocol, name, type, domain);

if (event != AVAHI_RESOLVER_FOUND)
@@ -278,12 +286,17 @@ static void browser_cb(
if (flags & AVAHI_LOOKUP_RESULT_LOCAL)
return;

+ if (u->protocol != AVAHI_PROTO_UNSPEC && u->protocol != protocol) {
+ pa_log_warn("Expected query protocol '%i' but received '%i'", u->protocol, protocol);
+ return;
+ }
+
t = tunnel_new(interface, protocol, name, type, domain);

if (event == AVAHI_BROWSER_NEW) {

if (!pa_hashmap_get(u->tunnels, t))
- if (!(avahi_service_resolver_new(u->client, interface, protocol, name, type, domain, AVAHI_PROTO_UNSPEC, 0, resolver_cb, u)))
+ if (!(avahi_service_resolver_new(u->client, interface, protocol, name, type, domain, u->protocol, 0, resolver_cb, u)))
pa_log("avahi_service_resolver_new() failed: %s", avahi_strerror(avahi_client_errno(u->client)));

/* We ignore the returned resolver object here, since the we don't
@@ -303,6 +316,16 @@ static void browser_cb(
tunnel_free(t);
}

+/* Avahi browser and resolver callbacks only receive a concrete protocol;
+ * always AVAHI_PROTO_INET or AVAHI_PROTO_INET6 and never AVAHI_PROTO_UNSPEC. A
+ * new browser given UNSPEC will receive both (separate) INET and INET6 events.
+ * A new resolver given a query protocol of UNSPEC will default to querying
+ * with INET6. A new resolver given an address protocol of UNSPEC will always
+ * resolve a service to an address matching the query protocol. So a resolver
+ * with UNSPEC/UNSPEC is equivalent to INET6/INET6. Strangely, INET addresses
+ * via INET6 queries fail to resolve; all other combinations succeed (avahi
+ * 0.7). */
+
static void client_callback(AvahiClient *c, AvahiClientState state, void *userdata) {
struct userdata *u = userdata;

@@ -320,7 +343,8 @@ static void client_callback(AvahiClient *c, AvahiClientState state, void *userda

if (!(u->sink_browser = avahi_service_browser_new(
c,
- AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC,
+ AVAHI_IF_UNSPEC,
+ u->protocol,
SERVICE_TYPE_SINK,
NULL,
0,
@@ -335,7 +359,8 @@ static void client_callback(AvahiClient *c, AvahiClientState state, void *userda

if (!(u->source_browser = avahi_service_browser_new(
c,
- AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC,
+ AVAHI_IF_UNSPEC,
+ u->protocol,
SERVICE_TYPE_SOURCE,
NULL,
0,
@@ -384,6 +409,8 @@ int pa__init(pa_module*m) {

struct userdata *u;
pa_modargs *ma = NULL;
+ bool disable_ipv4, disable_ipv6;
+ AvahiProtocol protocol;
int error;

if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
@@ -391,10 +418,32 @@ int pa__init(pa_module*m) {
goto fail;
}

+ if (pa_modargs_get_value_boolean(ma, "disable_ipv4", &disable_ipv4) < 0) {
+ pa_log("Failed to parse argument 'disable_ipv4'.");
+ goto fail;
+ }
+
+ if (pa_modargs_get_value_boolean(ma, "disable_ipv6", &disable_ipv6) < 0) {
+ pa_log("Failed to parse argument 'disable_ipv6'.");
+ goto fail;
+ }
+
+ if (disable_ipv4 && disable_ipv6) {
+ pa_log("Given both 'disable_ipv4' and 'disable_ipv6', unloading.");
+ goto fail;
+ } else if (disable_ipv4)
+ protocol = AVAHI_PROTO_INET6;
+ else if (disable_ipv6)
+ protocol = AVAHI_PROTO_INET;
+ else
+ protocol = AVAHI_PROTO_UNSPEC;
+
+
m->userdata = u = pa_xnew(struct userdata, 1);
u->core = m->core;
u->module = m;
u->sink_browser = u->source_browser = NULL;
+ u->protocol = protocol;

u->tunnels = pa_hashmap_new(tunnel_hash, tunnel_compare);
--
2.14.4
Tanu Kaskinen
2018-08-11 13:09:46 UTC
Permalink
Post by Yclept Nemo
---
src/modules/module-zeroconf-discover.c | 57 +++++++++++++++++++++++++++++++---
1 file changed, 53 insertions(+), 4 deletions(-)
Thanks for the patches!

Please always explain in the commit message why the change is done. I
don't think it's obvious to everyone why you want to disable ipv4 or
ipv6.
Post by Yclept Nemo
diff --git a/src/modules/module-zeroconf-discover.c b/src/modules/module-zeroconf-discover.c
index 612e2ed4..bc61b403 100644
--- a/src/modules/module-zeroconf-discover.c
+++ b/src/modules/module-zeroconf-discover.c
@@ -51,6 +51,8 @@ PA_MODULE_LOAD_ONCE(true);
#define SERVICE_TYPE_SOURCE "_non-monitor._sub._pulse-source._tcp"
static const char* const valid_modargs[] = {
+ "disable_ipv4",
+ "disable_ipv6",
The modargs should be documented with PA_MODULE_USAGE. See other
modules for examples.
Post by Yclept Nemo
NULL
};
@@ -67,6 +69,7 @@ struct userdata {
AvahiPoll *avahi_poll;
AvahiClient *client;
AvahiServiceBrowser *source_browser, *sink_browser;
+ AvahiProtocol protocol;
pa_hashmap *tunnels;
};
@@ -134,10 +137,15 @@ static void resolver_cb(
void *userdata) {
struct userdata *u = userdata;
- struct tunnel *tnl;
+ struct tunnel *tnl = NULL;
pa_assert(u);
+ if (u->protocol != AVAHI_PROTO_UNSPEC && u->protocol != protocol) {
+ pa_log_warn("Expected address protocol '%i' but received '%i'", u->protocol, protocol);
+ goto finish;
+ }
+
tnl = tunnel_new(interface, protocol, name, type, domain);
if (event != AVAHI_RESOLVER_FOUND)
@@ -278,12 +286,17 @@ static void browser_cb(
if (flags & AVAHI_LOOKUP_RESULT_LOCAL)
return;
+ if (u->protocol != AVAHI_PROTO_UNSPEC && u->protocol != protocol) {
+ pa_log_warn("Expected query protocol '%i' but received '%i'", u->protocol, protocol);
+ return;
+ }
+
t = tunnel_new(interface, protocol, name, type, domain);
if (event == AVAHI_BROWSER_NEW) {
if (!pa_hashmap_get(u->tunnels, t))
- if (!(avahi_service_resolver_new(u->client, interface, protocol, name, type, domain, AVAHI_PROTO_UNSPEC, 0, resolver_cb, u)))
+ if (!(avahi_service_resolver_new(u->client, interface, protocol, name, type, domain, u->protocol, 0, resolver_cb, u)))
pa_log("avahi_service_resolver_new() failed: %s", avahi_strerror(avahi_client_errno(u->client)));
/* We ignore the returned resolver object here, since the we don't
@@ -303,6 +316,16 @@ static void browser_cb(
tunnel_free(t);
}
+/* Avahi browser and resolver callbacks only receive a concrete protocol;
+ * always AVAHI_PROTO_INET or AVAHI_PROTO_INET6 and never AVAHI_PROTO_UNSPEC. A
+ * new browser given UNSPEC will receive both (separate) INET and INET6 events.
+ * A new resolver given a query protocol of UNSPEC will default to querying
+ * with INET6. A new resolver given an address protocol of UNSPEC will always
+ * resolve a service to an address matching the query protocol. So a resolver
+ * with UNSPEC/UNSPEC is equivalent to INET6/INET6. Strangely, INET addresses
+ * via INET6 queries fail to resolve; all other combinations succeed (avahi
+ * 0.7). */
+
static void client_callback(AvahiClient *c, AvahiClientState state, void *userdata) {
struct userdata *u = userdata;
@@ -320,7 +343,8 @@ static void client_callback(AvahiClient *c, AvahiClientState state, void *userda
if (!(u->sink_browser = avahi_service_browser_new(
c,
- AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC,
+ AVAHI_IF_UNSPEC,
+ u->protocol,
SERVICE_TYPE_SINK,
NULL,
0,
@@ -335,7 +359,8 @@ static void client_callback(AvahiClient *c, AvahiClientState state, void *userda
if (!(u->source_browser = avahi_service_browser_new(
c,
- AVAHI_IF_UNSPEC, AVAHI_PROTO_UNSPEC,
+ AVAHI_IF_UNSPEC,
+ u->protocol,
SERVICE_TYPE_SOURCE,
NULL,
0,
@@ -384,6 +409,8 @@ int pa__init(pa_module*m) {
struct userdata *u;
pa_modargs *ma = NULL;
+ bool disable_ipv4, disable_ipv6;
These variables have to be initialized before calling
pa_modargs_get_value_boolean(), because pa_modargs_get_value_boolean()
doesn't set any value to the variables if the argument isn't given.
Post by Yclept Nemo
+ AvahiProtocol protocol;
int error;
if (!(ma = pa_modargs_new(m->argument, valid_modargs))) {
@@ -391,10 +418,32 @@ int pa__init(pa_module*m) {
goto fail;
}
+ if (pa_modargs_get_value_boolean(ma, "disable_ipv4", &disable_ipv4) < 0) {
+ pa_log("Failed to parse argument 'disable_ipv4'.");
+ goto fail;
+ }
+
+ if (pa_modargs_get_value_boolean(ma, "disable_ipv6", &disable_ipv6) < 0) {
+ pa_log("Failed to parse argument 'disable_ipv6'.");
+ goto fail;
+ }
+
+ if (disable_ipv4 && disable_ipv6) {
+ pa_log("Given both 'disable_ipv4' and 'disable_ipv6', unloading.");
+ goto fail;
+ } else if (disable_ipv4)
+ protocol = AVAHI_PROTO_INET6;
+ else if (disable_ipv6)
+ protocol = AVAHI_PROTO_INET;
+ else
+ protocol = AVAHI_PROTO_UNSPEC;
+
+
m->userdata = u = pa_xnew(struct userdata, 1);
u->core = m->core;
u->module = m;
u->sink_browser = u->source_browser = NULL;
+ u->protocol = protocol;
u->tunnels = pa_hashmap_new(tunnel_hash, tunnel_compare);
--
Tanu

https://www.patreon.com/tanuk
https://liberapay.com/tanuk
Yclept Nemo
2018-07-17 00:25:38 UTC
Permalink
Fix memory leak and prevent posible double-free.
---
src/modules/module-zeroconf-discover.c | 20 +++++++++++++++++---
1 file changed, 17 insertions(+), 3 deletions(-)

diff --git a/src/modules/module-zeroconf-discover.c b/src/modules/module-zeroconf-discover.c
index bc61b403..2c49d13a 100644
--- a/src/modules/module-zeroconf-discover.c
+++ b/src/modules/module-zeroconf-discover.c
@@ -379,10 +379,17 @@ static void client_callback(AvahiClient *c, AvahiClientState state, void *userda

pa_log_debug("Avahi daemon disconnected.");

- if (!(u->client = avahi_client_new(u->avahi_poll, AVAHI_CLIENT_NO_FAIL, client_callback, u, &error))) {
+ /* Frees all associated resources, i.e. browsers, resolvers,
+ * and groups. */
+ avahi_client_free(c);
+ u->client = u->sink_browser = u->source_browser = NULL;
+
+ if (!avahi_client_new(u->avahi_poll, AVAHI_CLIENT_NO_FAIL, client_callback, u, &error)) {
pa_log("avahi_client_new() failed: %s", avahi_strerror(error));
pa_module_unload_request(u->module, true);
}
+
+ break;
}

/* Fall through */
@@ -442,14 +449,21 @@ int pa__init(pa_module*m) {
m->userdata = u = pa_xnew(struct userdata, 1);
u->core = m->core;
u->module = m;
- u->sink_browser = u->source_browser = NULL;
+ u->client = u->sink_browser = u->source_browser = NULL;
u->protocol = protocol;

u->tunnels = pa_hashmap_new(tunnel_hash, tunnel_compare);

u->avahi_poll = pa_avahi_poll_new(m->core->mainloop);

- if (!(u->client = avahi_client_new(u->avahi_poll, AVAHI_CLIENT_NO_FAIL, client_callback, u, &error))) {
+ /* The client callback is run for the first time within 'avahi_client_new',
+ * and on AVAHI_CLIENT_FAILURE may free the old client and create a new
+ * client assigned to 'userdata.client'. If so 'avahi_client_new' will
+ * return a pointer to already-freed data. When 'avahi_client_new' fails it
+ * returns NULL and does not run the callback; 'userdata.client' remains
+ * NULL (see above). Otherwise the callback is run, ensuring that
+ * 'userdata.client' is appropriately set. */
+ if (!avahi_client_new(u->avahi_poll, AVAHI_CLIENT_NO_FAIL, client_callback, u, &error)) {
pa_log("pa_avahi_client_new() failed: %s", avahi_strerror(error));
goto fail;
}
--
2.14.4
Tanu Kaskinen
2018-08-11 13:23:13 UTC
Permalink
Post by Yclept Nemo
Fix memory leak and prevent posible double-free.
---
src/modules/module-zeroconf-discover.c | 20 +++++++++++++++++---
1 file changed, 17 insertions(+), 3 deletions(-)
diff --git a/src/modules/module-zeroconf-discover.c b/src/modules/module-zeroconf-discover.c
index bc61b403..2c49d13a 100644
--- a/src/modules/module-zeroconf-discover.c
+++ b/src/modules/module-zeroconf-discover.c
@@ -379,10 +379,17 @@ static void client_callback(AvahiClient *c, AvahiClientState state, void *userda
pa_log_debug("Avahi daemon disconnected.");
- if (!(u->client = avahi_client_new(u->avahi_poll, AVAHI_CLIENT_NO_FAIL, client_callback, u, &error))) {
+ /* Frees all associated resources, i.e. browsers, resolvers,
+ * and groups. */
+ avahi_client_free(c);
+ u->client = u->sink_browser = u->source_browser = NULL;
GCC doesn't like this:

modules/module-zeroconf-discover.c: In function ‘client_callback’:
modules/module-zeroconf-discover.c:385:27: warning: assignment from incompatible pointer type [-Wincompatible-pointer-types]
u->client = u->sink_browser = u->source_browser = NULL;
^
Post by Yclept Nemo
+
+ if (!avahi_client_new(u->avahi_poll, AVAHI_CLIENT_NO_FAIL, client_callback, u, &error)) {
It would be good to have a comment, something like this:

/* We don't set u->client here, the reason is explained in pa__init(). */
Post by Yclept Nemo
pa_log("avahi_client_new() failed: %s", avahi_strerror(error));
pa_module_unload_request(u->module, true);
}
+
+ break;
}
/* Fall through */
@@ -442,14 +449,21 @@ int pa__init(pa_module*m) {
m->userdata = u = pa_xnew(struct userdata, 1);
u->core = m->core;
u->module = m;
- u->sink_browser = u->source_browser = NULL;
+ u->client = u->sink_browser = u->source_browser = NULL;
modules/module-zeroconf-discover.c: In function ‘module_zeroconf_discover_LTX_pa__init’:
modules/module-zeroconf-discover.c:452:15: warning: assignment from incompatible pointer type [-Wincompatible-pointer-types]
u->client = u->sink_browser = u->source_browser = NULL;
^
Post by Yclept Nemo
u->protocol = protocol;
u->tunnels = pa_hashmap_new(tunnel_hash, tunnel_compare);
u->avahi_poll = pa_avahi_poll_new(m->core->mainloop);
- if (!(u->client = avahi_client_new(u->avahi_poll, AVAHI_CLIENT_NO_FAIL, client_callback, u, &error))) {
+ /* The client callback is run for the first time within 'avahi_client_new',
+ * and on AVAHI_CLIENT_FAILURE may free the old client and create a new
+ * client assigned to 'userdata.client'. If so 'avahi_client_new' will
+ * return a pointer to already-freed data. When 'avahi_client_new' fails it
+ * returns NULL and does not run the callback; 'userdata.client' remains
+ * NULL (see above). Otherwise the callback is run, ensuring that
+ * 'userdata.client' is appropriately set. */
+ if (!avahi_client_new(u->avahi_poll, AVAHI_CLIENT_NO_FAIL, client_callback, u, &error)) {
pa_log("pa_avahi_client_new() failed: %s", avahi_strerror(error));
goto fail;
}
--
Tanu

https://www.patreon.com/tanuk
https://liberapay.com/tanuk
Yclept Nemo
2018-07-17 00:25:40 UTC
Permalink
Call 'pa_sink_put' or 'pa_source_put' after the connection is
authorized. For the new tunnel modules this involves sending a
module-specific message from the IO thread to the main thread. On
receipt the sink/source message handler calls 'pa_*_put' if connected
(this doesn't need to by guarded by a conditional; the message should
only be sent once). For the old tunnel module this involves moving
'pa_*_put' down the call chain of the main thread: 'pa__init' ->
'on_connection' -> 'setup_complete_callback'.
---
src/modules/module-tunnel-sink-new.c | 14 +++++++++++++-
src/modules/module-tunnel-source-new.c | 14 +++++++++++++-
src/modules/module-tunnel.c | 12 ++++++------
3 files changed, 32 insertions(+), 8 deletions(-)

diff --git a/src/modules/module-tunnel-sink-new.c b/src/modules/module-tunnel-sink-new.c
index 802e6a59..b301f999 100644
--- a/src/modules/module-tunnel-sink-new.c
+++ b/src/modules/module-tunnel-sink-new.c
@@ -60,6 +60,10 @@ PA_MODULE_USAGE(
#define MAX_LATENCY_USEC (200 * PA_USEC_PER_MSEC)
#define TUNNEL_THREAD_FAILED_MAINLOOP 1

+enum {
+ SINK_MESSAGE_PUT = PA_SINK_MESSAGE_MAX,
+};
+
static void stream_state_cb(pa_stream *stream, void *userdata);
static void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata);
static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata);
@@ -345,6 +349,7 @@ static void context_state_cb(pa_context *c, void *userdata) {
u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
}
u->connected = true;
+ pa_asyncmsgq_send(u->thread_mq->outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PUT, NULL, 0, NULL);
break;
}
case PA_CONTEXT_FAILED:
@@ -402,6 +407,7 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
struct userdata *u = PA_SINK(o)->userdata;

switch (code) {
+ /* Delivered from the main thread, handled in the IO thread. */
case PA_SINK_MESSAGE_GET_LATENCY: {
int negative;
pa_usec_t remote_latency;
@@ -429,6 +435,13 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
*((int64_t*) data) = remote_latency;
return 0;
}
+
+ /* Delivered from the IO thread, handled in the main thread. */
+ case SINK_MESSAGE_PUT: {
+ if (u->connected)
+ pa_sink_put(u->sink);
+ return 0;
+ }
}
return pa_sink_process_msg(o, code, data, offset, chunk);
}
@@ -572,7 +585,6 @@ int pa__init(pa_module *m) {
goto fail;
}

- pa_sink_put(u->sink);
pa_modargs_free(ma);
pa_xfree(default_sink_name);

diff --git a/src/modules/module-tunnel-source-new.c b/src/modules/module-tunnel-source-new.c
index b41f53e2..4b2488d9 100644
--- a/src/modules/module-tunnel-source-new.c
+++ b/src/modules/module-tunnel-source-new.c
@@ -59,6 +59,10 @@ PA_MODULE_USAGE(

#define TUNNEL_THREAD_FAILED_MAINLOOP 1

+enum {
+ SOURCE_MESSAGE_PUT = PA_SOURCE_MESSAGE_MAX,
+};
+
static void stream_state_cb(pa_stream *stream, void *userdata);
static void stream_read_cb(pa_stream *s, size_t length, void *userdata);
static void context_state_cb(pa_context *c, void *userdata);
@@ -341,6 +345,7 @@ static void context_state_cb(pa_context *c, void *userdata) {
u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
}
u->connected = true;
+ pa_asyncmsgq_send(u->thread_mq->outq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_PUT, NULL, 0, NULL);
break;
}
case PA_CONTEXT_FAILED:
@@ -397,6 +402,7 @@ static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t
struct userdata *u = PA_SOURCE(o)->userdata;

switch (code) {
+ /* Delivered from the main thread, handled in the IO thread. */
case PA_SOURCE_MESSAGE_GET_LATENCY: {
int negative;
pa_usec_t remote_latency;
@@ -428,6 +434,13 @@ static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t

return 0;
}
+
+ /* Delivered from the IO thread, handled in the main thread. */
+ case SOURCE_MESSAGE_PUT: {
+ if (u->connected)
+ pa_source_put(u->source);
+ return 0;
+ }
}
return pa_source_process_msg(o, code, data, offset, chunk);
}
@@ -566,7 +579,6 @@ int pa__init(pa_module *m) {
goto fail;
}

- pa_source_put(u->source);
pa_modargs_free(ma);
pa_xfree(default_source_name);

diff --git a/src/modules/module-tunnel.c b/src/modules/module-tunnel.c
index 054d7d8f..960f8533 100644
--- a/src/modules/module-tunnel.c
+++ b/src/modules/module-tunnel.c
@@ -1598,6 +1598,12 @@ static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t
goto fail;
}

+#ifdef TUNNEL_SINK
+ pa_sink_put(u->sink);
+#else
+ pa_source_put(u->source);
+#endif
+
/* Starting with protocol version 13 the MSB of the version tag
reflects if shm is enabled for this connection or not. We don't
support SHM here at all, so we just ignore this. */
@@ -2224,12 +2230,6 @@ int pa__init(pa_module*m) {
goto fail;
}

-#ifdef TUNNEL_SINK
- pa_sink_put(u->sink);
-#else
- pa_source_put(u->source);
-#endif
-
pa_xfree(dn);

if (server)
--
2.14.4
Tanu Kaskinen
2018-08-12 12:05:30 UTC
Permalink
Post by Yclept Nemo
Call 'pa_sink_put' or 'pa_source_put' after the connection is
authorized. For the new tunnel modules this involves sending a
module-specific message from the IO thread to the main thread. On
receipt the sink/source message handler calls 'pa_*_put' if connected
(this doesn't need to by guarded by a conditional; the message should
only be sent once). For the old tunnel module this involves moving
'pa_*_put' down the call chain of the main thread: 'pa__init' ->
'on_connection' -> 'setup_complete_callback'.
---
src/modules/module-tunnel-sink-new.c | 14 +++++++++++++-
src/modules/module-tunnel-source-new.c | 14 +++++++++++++-
src/modules/module-tunnel.c | 12 ++++++------
3 files changed, 32 insertions(+), 8 deletions(-)
diff --git a/src/modules/module-tunnel-sink-new.c b/src/modules/module-tunnel-sink-new.c
index 802e6a59..b301f999 100644
--- a/src/modules/module-tunnel-sink-new.c
+++ b/src/modules/module-tunnel-sink-new.c
@@ -60,6 +60,10 @@ PA_MODULE_USAGE(
#define MAX_LATENCY_USEC (200 * PA_USEC_PER_MSEC)
#define TUNNEL_THREAD_FAILED_MAINLOOP 1
+enum {
+ SINK_MESSAGE_PUT = PA_SINK_MESSAGE_MAX,
+};
+
static void stream_state_cb(pa_stream *stream, void *userdata);
static void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata);
static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata);
@@ -345,6 +349,7 @@ static void context_state_cb(pa_context *c, void *userdata) {
u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
}
u->connected = true;
+ pa_asyncmsgq_send(u->thread_mq->outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PUT, NULL, 0, NULL);
pa_asyncmsgq_send() is synchronous, and it shouldn't be used from the
IO thread, because that can cause deadlocks. I think an asynchronous
message would work fine in this case (i.e. pa_asyncmsgq_post()).
Post by Yclept Nemo
break;
}
@@ -402,6 +407,7 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
struct userdata *u = PA_SINK(o)->userdata;
switch (code) {
+ /* Delivered from the main thread, handled in the IO thread. */
case PA_SINK_MESSAGE_GET_LATENCY: {
int negative;
pa_usec_t remote_latency;
@@ -429,6 +435,13 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
*((int64_t*) data) = remote_latency;
return 0;
}
+
+ /* Delivered from the IO thread, handled in the main thread. */
+ case SINK_MESSAGE_PUT: {
+ if (u->connected)
+ pa_sink_put(u->sink);
u->connected is supposed to be used only in the IO thread. In what
situation would you want to not put the sink? I suppose at least during
module unloading it could happen that the SINK_MESSAGE_PUT message is
processed, but we don't want to call pa_sink_put().

A separate flag could be set in the beginning of pa__done() to indicate
that the module is being unloaded, but I think it would make sense to
have that flag in the core pa_module struct instead, and set it in
pa_module_free(). There's already pa_module.unload_requested, which is
kind of what we want, but it's currently only set if
pa_module_unload_request() is called. I think
pa_module.unload_requested could be set also from pa_module_free(), and
then it would be a reliable flag for checking whether the module is
being unloaded.
--
Tanu

https://www.patreon.com/tanuk
https://liberapay.com/tanuk
Yclept Nemo
2018-07-17 00:25:39 UTC
Permalink
If 'one_per_name_type' is true, only add one tunnel per avahi service
name and type. In effect, do not add more than one sink or source per
remote pulseaudio server.
---
src/modules/module-zeroconf-discover.c | 46 +++++++++++++++++++++++++++++++---
1 file changed, 43 insertions(+), 3 deletions(-)

diff --git a/src/modules/module-zeroconf-discover.c b/src/modules/module-zeroconf-discover.c
index 2c49d13a..1c960983 100644
--- a/src/modules/module-zeroconf-discover.c
+++ b/src/modules/module-zeroconf-discover.c
@@ -53,6 +53,7 @@ PA_MODULE_LOAD_ONCE(true);
static const char* const valid_modargs[] = {
"disable_ipv4",
"disable_ipv6",
+ "one_per_name_type",
NULL
};

@@ -74,6 +75,14 @@ struct userdata {
pa_hashmap *tunnels;
};

+static unsigned tunnel_hash_simple(const void *p) {
+ const struct tunnel *t = p;
+
+ return
+ pa_idxset_string_hash_func(t->name) +
+ pa_idxset_string_hash_func(t->type) +
+}
+
static unsigned tunnel_hash(const void *p) {
const struct tunnel *t = p;

@@ -85,6 +94,18 @@ static unsigned tunnel_hash(const void *p) {
pa_idxset_string_hash_func(t->domain);
}

+static int tunnel_compare_simple(const void *a, const void *b) {
+ const struct tunnel *ta = a, *tb = b;
+ int r;
+
+ if ((r = strcmp(ta->name, tb->name)))
+ return r;
+ if ((r = strcmp(ta->type, tb->type)))
+ return r;
+
+ return 0;
+}
+
static int tunnel_compare(const void *a, const void *b) {
const struct tunnel *ta = a, *tb = b;
int r;
@@ -148,6 +169,12 @@ static void resolver_cb(

tnl = tunnel_new(interface, protocol, name, type, domain);

+ if (pa_hashmap_get(u->tunnels, tnl)) {
+ pa_log_debug("Tunnel [%i,%i,%s,%s,%s] already mapped, skipping.",
+ interface, protocol, name, type, domain);
+ goto finish;
+ }
+
if (event != AVAHI_RESOLVER_FOUND)
pa_log("Resolving of '%s' failed: %s", name, avahi_strerror(avahi_client_errno(u->client)));
else {
@@ -295,6 +322,8 @@ static void browser_cb(

if (event == AVAHI_BROWSER_NEW) {

+ /* Since the resolver is asynchronous and the hashmap may not yet be
+ * updated, this check must be duplicated in the resolver callback. */
if (!pa_hashmap_get(u->tunnels, t))
if (!(avahi_service_resolver_new(u->client, interface, protocol, name, type, domain, u->protocol, 0, resolver_cb, u)))
pa_log("avahi_service_resolver_new() failed: %s", avahi_strerror(avahi_client_errno(u->client)));
@@ -304,9 +333,11 @@ static void browser_cb(
* it from the callback */

} else if (event == AVAHI_BROWSER_REMOVE) {
- struct tunnel *t2;
+ struct tunnel *t2 = pa_hashmap_get(u->tunnels, t);

- if ((t2 = pa_hashmap_get(u->tunnels, t))) {
+ /* A full comparison is required even if 'one_per_name_type' is true.
+ * Yes, this is redundant if it's false. */
+ if (t2 && !tunnel_compare(t2, t)) {
pa_module_unload_request_by_index(u->core, t2->module_index, true);
pa_hashmap_remove(u->tunnels, t2);
tunnel_free(t2);
@@ -417,6 +448,7 @@ int pa__init(pa_module*m) {
struct userdata *u;
pa_modargs *ma = NULL;
bool disable_ipv4, disable_ipv6;
+ bool one_per_name_type;
AvahiProtocol protocol;
int error;

@@ -435,6 +467,11 @@ int pa__init(pa_module*m) {
goto fail;
}

+ if (pa_modargs_get_value_boolean(ma, "one_per_name_type", &one_per_name_type) < 0) {
+ pa_log("Failed to parse argument 'one_per_name_type'.");
+ goto fail;
+ }
+
if (disable_ipv4 && disable_ipv6) {
pa_log("Given both 'disable_ipv4' and 'disable_ipv6', unloading.");
goto fail;
@@ -452,7 +489,10 @@ int pa__init(pa_module*m) {
u->client = u->sink_browser = u->source_browser = NULL;
u->protocol = protocol;

- u->tunnels = pa_hashmap_new(tunnel_hash, tunnel_compare);
+ if (one_per_name_type)
+ u->tunnels = pa_hashmap_new(tunnel_hash_simple, tunnel_compare_simple);
+ else
+ u->tunnels = pa_hashmap_new(tunnel_hash, tunnel_compare);

u->avahi_poll = pa_avahi_poll_new(m->core->mainloop);
--
2.14.4
Tanu Kaskinen
2018-08-11 14:05:51 UTC
Permalink
Post by Yclept Nemo
If 'one_per_name_type' is true, only add one tunnel per avahi service
name and type. In effect, do not add more than one sink or source per
remote pulseaudio server.
Is avahi service name the address? And is service type sink or source?

What's the reason you want this feature?
Post by Yclept Nemo
---
src/modules/module-zeroconf-discover.c | 46 +++++++++++++++++++++++++++++++---
1 file changed, 43 insertions(+), 3 deletions(-)
diff --git a/src/modules/module-zeroconf-discover.c b/src/modules/module-zeroconf-discover.c
index 2c49d13a..1c960983 100644
--- a/src/modules/module-zeroconf-discover.c
+++ b/src/modules/module-zeroconf-discover.c
@@ -53,6 +53,7 @@ PA_MODULE_LOAD_ONCE(true);
static const char* const valid_modargs[] = {
"disable_ipv4",
"disable_ipv6",
+ "one_per_name_type",
NULL
};
@@ -74,6 +75,14 @@ struct userdata {
pa_hashmap *tunnels;
};
+static unsigned tunnel_hash_simple(const void *p) {
+ const struct tunnel *t = p;
+
+ return
+ pa_idxset_string_hash_func(t->name) +
+ pa_idxset_string_hash_func(t->type) +
+}
This doesn't build.
Post by Yclept Nemo
+
static unsigned tunnel_hash(const void *p) {
const struct tunnel *t = p;
@@ -85,6 +94,18 @@ static unsigned tunnel_hash(const void *p) {
pa_idxset_string_hash_func(t->domain);
}
+static int tunnel_compare_simple(const void *a, const void *b) {
+ const struct tunnel *ta = a, *tb = b;
+ int r;
+
+ if ((r = strcmp(ta->name, tb->name)))
+ return r;
+ if ((r = strcmp(ta->type, tb->type)))
+ return r;
+
+ return 0;
+}
+
static int tunnel_compare(const void *a, const void *b) {
const struct tunnel *ta = a, *tb = b;
int r;
@@ -148,6 +169,12 @@ static void resolver_cb(
tnl = tunnel_new(interface, protocol, name, type, domain);
+ if (pa_hashmap_get(u->tunnels, tnl)) {
+ pa_log_debug("Tunnel [%i,%i,%s,%s,%s] already mapped, skipping.",
+ interface, protocol, name, type, domain);
+ goto finish;
+ }
+
if (event != AVAHI_RESOLVER_FOUND)
pa_log("Resolving of '%s' failed: %s", name, avahi_strerror(avahi_client_errno(u->client)));
else {
@@ -295,6 +322,8 @@ static void browser_cb(
if (event == AVAHI_BROWSER_NEW) {
+ /* Since the resolver is asynchronous and the hashmap may not yet be
+ * updated, this check must be duplicated in the resolver callback. */
if (!pa_hashmap_get(u->tunnels, t))
if (!(avahi_service_resolver_new(u->client, interface, protocol, name, type, domain, u->protocol, 0, resolver_cb, u)))
pa_log("avahi_service_resolver_new() failed: %s", avahi_strerror(avahi_client_errno(u->client)));
@@ -304,9 +333,11 @@ static void browser_cb(
* it from the callback */
} else if (event == AVAHI_BROWSER_REMOVE) {
- struct tunnel *t2;
+ struct tunnel *t2 = pa_hashmap_get(u->tunnels, t);
- if ((t2 = pa_hashmap_get(u->tunnels, t))) {
+ /* A full comparison is required even if 'one_per_name_type' is true.
+ * Yes, this is redundant if it's false. */
This comment is a bit weird. It would make more sense to me if you
dropped the words "even" and "yes". You could also add an explanation
why it's redundant when one_per_name_type is false.
Post by Yclept Nemo
+ if (t2 && !tunnel_compare(t2, t)) {
pa_module_unload_request_by_index(u->core, t2->module_index, true);
pa_hashmap_remove(u->tunnels, t2);
tunnel_free(t2);
--
Tanu

https://www.patreon.com/tanuk
https://liberapay.com/tanuk
Yclept Nemo
2018-07-17 00:25:41 UTC
Permalink
---
src/modules/module-tunnel-sink-new.c | 2 +-
src/modules/module-tunnel-source-new.c | 2 +-
src/modules/module-zeroconf-discover.c | 21 +++++++++++++--------
3 files changed, 15 insertions(+), 10 deletions(-)

diff --git a/src/modules/module-tunnel-sink-new.c b/src/modules/module-tunnel-sink-new.c
index b301f999..fd24e690 100644
--- a/src/modules/module-tunnel-sink-new.c
+++ b/src/modules/module-tunnel-sink-new.c
@@ -349,7 +349,7 @@ static void context_state_cb(pa_context *c, void *userdata) {
u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
}
u->connected = true;
- pa_asyncmsgq_send(u->thread_mq->outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PUT, NULL, 0, NULL);
+ pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PUT, NULL, 0, NULL, NULL);
break;
}
case PA_CONTEXT_FAILED:
diff --git a/src/modules/module-tunnel-source-new.c b/src/modules/module-tunnel-source-new.c
index 4b2488d9..bff2f6a1 100644
--- a/src/modules/module-tunnel-source-new.c
+++ b/src/modules/module-tunnel-source-new.c
@@ -345,7 +345,7 @@ static void context_state_cb(pa_context *c, void *userdata) {
u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
}
u->connected = true;
- pa_asyncmsgq_send(u->thread_mq->outq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_PUT, NULL, 0, NULL);
+ pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_PUT, NULL, 0, NULL, NULL);
break;
}
case PA_CONTEXT_FAILED:
diff --git a/src/modules/module-zeroconf-discover.c b/src/modules/module-zeroconf-discover.c
index 1c960983..a64d6c76 100644
--- a/src/modules/module-zeroconf-discover.c
+++ b/src/modules/module-zeroconf-discover.c
@@ -80,7 +80,7 @@ static unsigned tunnel_hash_simple(const void *p) {

return
pa_idxset_string_hash_func(t->name) +
- pa_idxset_string_hash_func(t->type) +
+ pa_idxset_string_hash_func(t->type);
}

static unsigned tunnel_hash(const void *p) {
@@ -353,9 +353,11 @@ static void browser_cb(
* A new resolver given a query protocol of UNSPEC will default to querying
* with INET6. A new resolver given an address protocol of UNSPEC will always
* resolve a service to an address matching the query protocol. So a resolver
- * with UNSPEC/UNSPEC is equivalent to INET6/INET6. Strangely, INET addresses
- * via INET6 queries fail to resolve; all other combinations succeed (avahi
- * 0.7). */
+ * with UNSPEC/UNSPEC is equivalent to INET6/INET6. By default the avahi daemon
+ * publishes AAAA (IPv6) records over IPv4, but not A (IPv4) records over IPv6
+ * (see 'publish-aaaa-on-ipv4' and 'publish-a-on-ipv6' in 'avahi-daemon.conf').
+ * That's why, given most daemons, all four combinations of concrete query and
+ * address protocols resolve except INET addresses via INET6 queries. */

static void client_callback(AvahiClient *c, AvahiClientState state, void *userdata) {
struct userdata *u = userdata;
@@ -413,7 +415,8 @@ static void client_callback(AvahiClient *c, AvahiClientState state, void *userda
/* Frees all associated resources, i.e. browsers, resolvers,
* and groups. */
avahi_client_free(c);
- u->client = u->sink_browser = u->source_browser = NULL;
+ u->client = NULL;
+ u->sink_browser = u->source_browser = NULL;

if (!avahi_client_new(u->avahi_poll, AVAHI_CLIENT_NO_FAIL, client_callback, u, &error)) {
pa_log("avahi_client_new() failed: %s", avahi_strerror(error));
@@ -447,8 +450,9 @@ int pa__init(pa_module*m) {

struct userdata *u;
pa_modargs *ma = NULL;
- bool disable_ipv4, disable_ipv6;
- bool one_per_name_type;
+ bool disable_ipv4 = false;
+ bool disable_ipv6 = false;
+ bool one_per_name_type = false;
AvahiProtocol protocol;
int error;

@@ -486,7 +490,8 @@ int pa__init(pa_module*m) {
m->userdata = u = pa_xnew(struct userdata, 1);
u->core = m->core;
u->module = m;
- u->client = u->sink_browser = u->source_browser = NULL;
+ u->client = NULL;
+ u->sink_browser = u->source_browser = NULL;
u->protocol = protocol;

if (one_per_name_type)
--
2.14.4
Tanu Kaskinen
2018-08-12 12:09:52 UTC
Permalink
Please include these fixes in the original patches.
Post by Yclept Nemo
---
src/modules/module-tunnel-sink-new.c | 2 +-
src/modules/module-tunnel-source-new.c | 2 +-
src/modules/module-zeroconf-discover.c | 21 +++++++++++++--------
3 files changed, 15 insertions(+), 10 deletions(-)
diff --git a/src/modules/module-tunnel-sink-new.c b/src/modules/module-tunnel-sink-new.c
index b301f999..fd24e690 100644
--- a/src/modules/module-tunnel-sink-new.c
+++ b/src/modules/module-tunnel-sink-new.c
@@ -349,7 +349,7 @@ static void context_state_cb(pa_context *c, void *userdata) {
u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
}
u->connected = true;
- pa_asyncmsgq_send(u->thread_mq->outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PUT, NULL, 0, NULL);
+ pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PUT, NULL, 0, NULL, NULL);
break;
}
diff --git a/src/modules/module-tunnel-source-new.c b/src/modules/module-tunnel-source-new.c
index 4b2488d9..bff2f6a1 100644
--- a/src/modules/module-tunnel-source-new.c
+++ b/src/modules/module-tunnel-source-new.c
@@ -345,7 +345,7 @@ static void context_state_cb(pa_context *c, void *userdata) {
u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
}
u->connected = true;
- pa_asyncmsgq_send(u->thread_mq->outq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_PUT, NULL, 0, NULL);
+ pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_PUT, NULL, 0, NULL, NULL);
break;
}
diff --git a/src/modules/module-zeroconf-discover.c b/src/modules/module-zeroconf-discover.c
index 1c960983..a64d6c76 100644
--- a/src/modules/module-zeroconf-discover.c
+++ b/src/modules/module-zeroconf-discover.c
@@ -80,7 +80,7 @@ static unsigned tunnel_hash_simple(const void *p) {
return
pa_idxset_string_hash_func(t->name) +
- pa_idxset_string_hash_func(t->type) +
+ pa_idxset_string_hash_func(t->type);
}
static unsigned tunnel_hash(const void *p) {
@@ -353,9 +353,11 @@ static void browser_cb(
* A new resolver given a query protocol of UNSPEC will default to querying
* with INET6. A new resolver given an address protocol of UNSPEC will always
* resolve a service to an address matching the query protocol. So a resolver
- * with UNSPEC/UNSPEC is equivalent to INET6/INET6. Strangely, INET addresses
- * via INET6 queries fail to resolve; all other combinations succeed (avahi
- * 0.7). */
+ * with UNSPEC/UNSPEC is equivalent to INET6/INET6. By default the avahi daemon
+ * publishes AAAA (IPv6) records over IPv4, but not A (IPv4) records over IPv6
+ * (see 'publish-aaaa-on-ipv4' and 'publish-a-on-ipv6' in 'avahi-daemon.conf').
+ * That's why, given most daemons, all four combinations of concrete query and
+ * address protocols resolve except INET addresses via INET6 queries. */
static void client_callback(AvahiClient *c, AvahiClientState state, void *userdata) {
struct userdata *u = userdata;
@@ -413,7 +415,8 @@ static void client_callback(AvahiClient *c, AvahiClientState state, void *userda
/* Frees all associated resources, i.e. browsers, resolvers,
* and groups. */
avahi_client_free(c);
- u->client = u->sink_browser = u->source_browser = NULL;
+ u->client = NULL;
+ u->sink_browser = u->source_browser = NULL;
if (!avahi_client_new(u->avahi_poll, AVAHI_CLIENT_NO_FAIL, client_callback, u, &error)) {
pa_log("avahi_client_new() failed: %s", avahi_strerror(error));
@@ -447,8 +450,9 @@ int pa__init(pa_module*m) {
struct userdata *u;
pa_modargs *ma = NULL;
- bool disable_ipv4, disable_ipv6;
- bool one_per_name_type;
+ bool disable_ipv4 = false;
+ bool disable_ipv6 = false;
+ bool one_per_name_type = false;
AvahiProtocol protocol;
int error;
@@ -486,7 +490,8 @@ int pa__init(pa_module*m) {
m->userdata = u = pa_xnew(struct userdata, 1);
u->core = m->core;
u->module = m;
- u->client = u->sink_browser = u->source_browser = NULL;
+ u->client = NULL;
+ u->sink_browser = u->source_browser = NULL;
u->protocol = protocol;
if (one_per_name_type)
--
Tanu

https://www.patreon.com/tanuk
https://liberapay.com/tanuk
Yclept Nemo
2018-07-17 00:25:44 UTC
Permalink
Queue additional tunnels per name/type. When a tunnel module loaded by
zeroconf-discover is unlinked, remove the corresponding tunnel and load
a queued tunnel matching name/type (if available).
---
src/modules/module-zeroconf-discover.c | 298 ++++++++++++++++++++++++++-------
1 file changed, 234 insertions(+), 64 deletions(-)

diff --git a/src/modules/module-zeroconf-discover.c b/src/modules/module-zeroconf-discover.c
index a64d6c76..b5949b0b 100644
--- a/src/modules/module-zeroconf-discover.c
+++ b/src/modules/module-zeroconf-discover.c
@@ -57,26 +57,49 @@ static const char* const valid_modargs[] = {
NULL
};

-struct tunnel {
- AvahiIfIndex interface;
- AvahiProtocol protocol;
- char *name, *type, *domain;
- uint32_t module_index;
-};
-
struct userdata {
pa_core *core;
pa_module *module;
+
+ pa_hook_slot *module_unlink_slot;
+
AvahiPoll *avahi_poll;
AvahiClient *client;
AvahiServiceBrowser *source_browser, *sink_browser;
AvahiProtocol protocol;

- pa_hashmap *tunnels;
+ pa_hashmap *tunnels_loaded;
+ pa_hashmap *tunnels_loaded_by_index;
+ pa_hashmap *tunnels_queued;
};

+typedef struct {
+ pa_object parent;
+
+ struct userdata *userdata;
+
+ AvahiIfIndex interface;
+ AvahiProtocol protocol;
+ char *name, *type, *domain;
+
+ uint32_t module_index;
+} tunnel;
+
+PA_DEFINE_PRIVATE_CLASS(tunnel, pa_object);
+
+static int module_index_compare(const void *a, const void *b) {
+ uint32_t idx_a = *(uint32_t *)a;
+ uint32_t idx_b = *(uint32_t *)b;
+ return idx_a < idx_b ? -1 : (idx_a > idx_b ? 1 : 0);
+}
+
+static unsigned module_index_hash(const void *p) {
+ uint32_t idx_p = *(uint32_t *)p;
+ return (unsigned) idx_p;
+}
+
static unsigned tunnel_hash_simple(const void *p) {
- const struct tunnel *t = p;
+ const tunnel *t = p;

return
pa_idxset_string_hash_func(t->name) +
@@ -84,7 +107,7 @@ static unsigned tunnel_hash_simple(const void *p) {
}

static unsigned tunnel_hash(const void *p) {
- const struct tunnel *t = p;
+ const tunnel *t = p;

return
(unsigned) t->interface +
@@ -95,7 +118,7 @@ static unsigned tunnel_hash(const void *p) {
}

static int tunnel_compare_simple(const void *a, const void *b) {
- const struct tunnel *ta = a, *tb = b;
+ const tunnel *ta = a, *tb = b;
int r;

if ((r = strcmp(ta->name, tb->name)))
@@ -107,7 +130,7 @@ static int tunnel_compare_simple(const void *a, const void *b) {
}

static int tunnel_compare(const void *a, const void *b) {
- const struct tunnel *ta = a, *tb = b;
+ const tunnel *ta = a, *tb = b;
int r;

if (ta->interface != tb->interface)
@@ -124,12 +147,31 @@ static int tunnel_compare(const void *a, const void *b) {
return 0;
}

-static struct tunnel *tunnel_new(
+static void tunnel_free(tunnel *t) {
+ pa_assert(t);
+ pa_xfree(t->name);
+ pa_xfree(t->type);
+ pa_xfree(t->domain);
+ pa_xfree(t);
+}
+
+static void tunnel_ref_free(pa_object *o) {
+ tunnel *t = tunnel_cast(o);
+
+ pa_assert(t);
+ pa_assert(!tunnel_refcnt(t));
+
+ tunnel_free(t);
+}
+
+static tunnel *tunnel_new(
+ struct userdata *u,
AvahiIfIndex interface, AvahiProtocol protocol,
const char *name, const char *type, const char *domain) {

- struct tunnel *t;
- t = pa_xnew(struct tunnel, 1);
+ tunnel *t = pa_object_new(tunnel);
+ t->parent.free = tunnel_ref_free;
+ t->userdata = u;
t->interface = interface;
t->protocol = protocol;
t->name = pa_xstrdup(name);
@@ -139,12 +181,8 @@ static struct tunnel *tunnel_new(
return t;
}

-static void tunnel_free(struct tunnel *t) {
- pa_assert(t);
- pa_xfree(t->name);
- pa_xfree(t->type);
- pa_xfree(t->domain);
- pa_xfree(t);
+static bool tunnel_loaded(tunnel *t) {
+ return t->module_index != PA_IDXSET_INVALID;
}

static void resolver_cb(
@@ -157,26 +195,34 @@ static void resolver_cb(
AvahiLookupResultFlags flags,
void *userdata) {

- struct userdata *u = userdata;
- struct tunnel *tnl = NULL;
+ bool remove = false;
+ tunnel *tnl = userdata;
+ struct userdata *u;
+
+ pa_assert(tnl);
+
+ u = tnl->userdata;

pa_assert(u);

+ pa_assert(tnl->interface == interface && tnl->protocol == protocol &&
+ !strcmp(tnl->name, name) && !strcmp(tnl->type, type) && !strcmp(tnl->domain, domain));
+
+ /* Doesn't exist; exists but different; exists but already loaded */
+ if (pa_hashmap_get(u->tunnels_loaded, tnl) != tnl || tunnel_loaded(tnl))
+ goto finish;
+
if (u->protocol != AVAHI_PROTO_UNSPEC && u->protocol != protocol) {
pa_log_warn("Expected address protocol '%i' but received '%i'", u->protocol, protocol);
+ remove = true;
goto finish;
}

- tnl = tunnel_new(interface, protocol, name, type, domain);
-
- if (pa_hashmap_get(u->tunnels, tnl)) {
- pa_log_debug("Tunnel [%i,%i,%s,%s,%s] already mapped, skipping.",
- interface, protocol, name, type, domain);
+ if (event != AVAHI_RESOLVER_FOUND) {
+ pa_log("Resolving of '%s' failed: %s", name, avahi_strerror(avahi_client_errno(u->client)));
+ remove = true;
goto finish;
}
-
- if (event != AVAHI_RESOLVER_FOUND)
- pa_log("Resolving of '%s' failed: %s", name, avahi_strerror(avahi_client_errno(u->client)));
else {
char *device = NULL, *dname, *module_name, *args;
const char *t;
@@ -225,6 +271,7 @@ static void resolver_cb(
pa_log("Service '%s' contains an invalid sample specification.", name);
avahi_free(device);
pa_xfree(properties);
+ remove = true;
goto finish;
}

@@ -232,6 +279,7 @@ static void resolver_cb(
pa_log("Service '%s' contains an invalid channel map.", name);
avahi_free(device);
pa_xfree(properties);
+ remove = true;
goto finish;
}

@@ -245,6 +293,7 @@ static void resolver_cb(
avahi_free(device);
pa_xfree(dname);
pa_xfree(properties);
+ remove = true;
goto finish;
}

@@ -277,9 +326,10 @@ static void resolver_cb(

if (pa_module_load(&m, u->core, module_name, args) >= 0) {
tnl->module_index = m->index;
- pa_hashmap_put(u->tunnels, tnl, tnl);
- tnl = NULL;
+ pa_hashmap_put(u->tunnels_loaded_by_index, &tnl->module_index, tnl);
}
+ else
+ remove = true;

pa_xfree(module_name);
pa_xfree(dname);
@@ -291,10 +341,72 @@ static void resolver_cb(

finish:

+ if (remove) {
+ pa_hashmap_remove(u->tunnels_loaded, tnl);
+ tunnel_unref(tnl);
+ }
+
avahi_service_resolver_free(r);
+ tunnel_unref(tnl);
+ return;
+}
+
+static void tunnel_add_from_queue_cb(pa_mainloop_api *a, pa_defer_event *e, void *userdata) {
+ tunnel *t_search = userdata;
+ tunnel *t_queued;
+ struct userdata *u;
+ void *state;
+
+ pa_assert(t_search);
+
+ u = t_search->userdata;
+
+ pa_assert(u);
+
+ if (pa_hashmap_get(u->tunnels_loaded, t_search))
+ goto finish;
+
+ PA_HASHMAP_FOREACH(t_queued, u->tunnels_queued, state) {
+ if (!tunnel_compare_simple(t_queued, t_search)) {
+ if (!avahi_service_resolver_new(u->client, t_queued->interface, t_queued->protocol,
+ t_queued->name, t_queued->type, t_queued->domain, u->protocol,
+ 0, resolver_cb, t_queued)) {
+ pa_log("avahi_service_resolver_new() failed: %s", avahi_strerror(avahi_client_errno(u->client)));
+ continue;
+ }
+ tunnel_ref(t_queued);
+ pa_hashmap_remove(u->tunnels_queued, t_queued);
+ pa_hashmap_put(u->tunnels_loaded, t_queued, t_queued);
+ break;
+ }
+ }
+
+finish:
+
+ tunnel_unref(t_search);
+ a->defer_free(e);
+}
+
+static pa_hook_result_t tunnel_remove_cb(void *hook_data, void *call_data, void *slot_data) {
+ struct userdata *u = slot_data;
+ pa_module *m = call_data;
+
+ tunnel *t;
+
+ pa_assert(u);
+ pa_assert(m);
+
+ if (!(t = pa_hashmap_remove(u->tunnels_loaded_by_index, &m->index)))
+ return PA_HOOK_OK;

- if (tnl)
- tunnel_free(tnl);
+ pa_assert(pa_hashmap_remove(u->tunnels_loaded, t) == t);
+
+ if (u->tunnels_queued)
+ u->core->mainloop->defer_new(u->core->mainloop, tunnel_add_from_queue_cb, t);
+ else
+ tunnel_unref(t);
+
+ return PA_HOOK_OK;
}

static void browser_cb(
@@ -306,45 +418,78 @@ static void browser_cb(
void *userdata) {

struct userdata *u = userdata;
- struct tunnel *t;
+ tunnel *t_new;
+ tunnel *t_old_loaded;
+ tunnel *t_old_queued;

pa_assert(u);

if (flags & AVAHI_LOOKUP_RESULT_LOCAL)
return;

+ if (event != AVAHI_BROWSER_NEW && event != AVAHI_BROWSER_REMOVE)
+ return;
+
if (u->protocol != AVAHI_PROTO_UNSPEC && u->protocol != protocol) {
pa_log_warn("Expected query protocol '%i' but received '%i'", u->protocol, protocol);
return;
}

- t = tunnel_new(interface, protocol, name, type, domain);
+ t_new = tunnel_new(u, interface, protocol, name, type, domain);

if (event == AVAHI_BROWSER_NEW) {

- /* Since the resolver is asynchronous and the hashmap may not yet be
- * updated, this check must be duplicated in the resolver callback. */
- if (!pa_hashmap_get(u->tunnels, t))
- if (!(avahi_service_resolver_new(u->client, interface, protocol, name, type, domain, u->protocol, 0, resolver_cb, u)))
+ if (!(t_old_loaded = pa_hashmap_get(u->tunnels_loaded, t_new))) {
+ /* We ignore the returned resolver object here, since the we don't
+ * need to attach any special data to it, and we can still destroy
+ * it from the callback */
+ if (!avahi_service_resolver_new(u->client, interface, protocol, name, type, domain, u->protocol, 0, resolver_cb, t_new)) {
pa_log("avahi_service_resolver_new() failed: %s", avahi_strerror(avahi_client_errno(u->client)));
-
- /* We ignore the returned resolver object here, since the we don't
- * need to attach any special data to it, and we can still destroy
- * it from the callback */
+ tunnel_unref(t_new);
+ return;
+ }
+ if (u->tunnels_queued && (t_old_queued = pa_hashmap_remove(u->tunnels_queued, t_new))) {
+ tunnel_unref(t_old_queued);
+ }
+ pa_hashmap_put(u->tunnels_loaded, t_new, t_new);
+ tunnel_ref(t_new);
+ return;
+ }
+ else if (u->tunnels_queued && tunnel_compare(t_new, t_old_loaded) && !pa_hashmap_get(u->tunnels_queued, t_new)) {
+ pa_hashmap_put(u->tunnels_queued, t_new, t_new);
+ return;
+ }
+ tunnel_unref(t_new);
+ return;

} else if (event == AVAHI_BROWSER_REMOVE) {
- struct tunnel *t2 = pa_hashmap_get(u->tunnels, t);
-
- /* A full comparison is required even if 'one_per_name_type' is true.
- * Yes, this is redundant if it's false. */
- if (t2 && !tunnel_compare(t2, t)) {
- pa_module_unload_request_by_index(u->core, t2->module_index, true);
- pa_hashmap_remove(u->tunnels, t2);
- tunnel_free(t2);
+
+ if (u->tunnels_queued) {
+ if ((t_old_queued = pa_hashmap_remove(u->tunnels_queued, t_new))) {
+ tunnel_unref(t_old_queued);
+ }
+ if ((t_old_loaded = pa_hashmap_get(u->tunnels_loaded, t_new)) && !tunnel_compare(t_new, t_old_loaded)) {
+ pa_hashmap_remove(u->tunnels_loaded, t_old_loaded);
+ pa_hashmap_remove(u->tunnels_loaded_by_index, &t_old_loaded->module_index);
+ pa_module_unload_request_by_index(u->core, t_old_loaded->module_index, true);
+ /* Allow queued AVAHI_BROWSER_REMOVE events to be processed
+ * first. The event object is ignored as it can be destroyed
+ * from the callback. */
+ u->core->mainloop->defer_new(u->core->mainloop, tunnel_add_from_queue_cb, t_old_loaded);
+ }
}
- }
+ else if ((t_old_loaded = pa_hashmap_remove(u->tunnels_loaded, t_new))) {
+ pa_hashmap_remove(u->tunnels_loaded, t_old_loaded);
+ pa_hashmap_remove(u->tunnels_loaded_by_index, &t_old_loaded->module_index);
+ pa_module_unload_request_by_index(u->core, t_old_loaded->module_index, true);
+ tunnel_unref(t_old_loaded);
+ }
+ tunnel_unref(t_new);
+ return;

- tunnel_free(t);
+ } else
+
+ tunnel_unref(t_new);
}

/* Avahi browser and resolver callbacks only receive a concrete protocol;
@@ -494,10 +639,16 @@ int pa__init(pa_module*m) {
u->sink_browser = u->source_browser = NULL;
u->protocol = protocol;

- if (one_per_name_type)
- u->tunnels = pa_hashmap_new(tunnel_hash_simple, tunnel_compare_simple);
- else
- u->tunnels = pa_hashmap_new(tunnel_hash, tunnel_compare);
+ if (one_per_name_type) {
+ u->tunnels_loaded = pa_hashmap_new(tunnel_hash_simple, tunnel_compare_simple);
+ u->tunnels_queued = pa_hashmap_new(tunnel_hash, tunnel_compare);
+ }
+ else {
+ u->tunnels_loaded = pa_hashmap_new(tunnel_hash, tunnel_compare);
+ u->tunnels_queued = NULL;
+ }
+ u->tunnels_loaded_by_index = pa_hashmap_new(module_index_hash, module_index_compare);
+

u->avahi_poll = pa_avahi_poll_new(m->core->mainloop);

@@ -513,6 +664,8 @@ int pa__init(pa_module*m) {
goto fail;
}

+ u->module_unlink_slot = pa_hook_connect(&u->core->hooks[PA_CORE_HOOK_MODULE_UNLINK], PA_HOOK_NORMAL, tunnel_remove_cb, u);
+
pa_modargs_free(ma);

return 0;
@@ -527,7 +680,8 @@ fail:
}

void pa__done(pa_module*m) {
- struct userdata*u;
+ struct userdata *u;
+ tunnel *t;
pa_assert(m);

if (!(u = m->userdata))
@@ -539,16 +693,32 @@ void pa__done(pa_module*m) {
if (u->avahi_poll)
pa_avahi_poll_free(u->avahi_poll);

- if (u->tunnels) {
- struct tunnel *t;
+ if (u->tunnels_queued) {
+ while ((t = pa_hashmap_steal_first(u->tunnels_queued)))
+ tunnel_free(t);
+
+ pa_hashmap_free(u->tunnels_queued);
+ }

- while ((t = pa_hashmap_steal_first(u->tunnels))) {
+ if (u->tunnels_loaded) {
+ while ((t = pa_hashmap_steal_first(u->tunnels_loaded))) {
+ if (u->tunnels_loaded_by_index)
+ pa_assert(pa_hashmap_remove(u->tunnels_loaded_by_index, &t->module_index) == t);
pa_module_unload_request_by_index(u->core, t->module_index, true);
tunnel_free(t);
}

- pa_hashmap_free(u->tunnels);
+ if (u->tunnels_loaded_by_index)
+ pa_assert(pa_hashmap_isempty(u->tunnels_loaded_by_index));
+
+ pa_hashmap_free(u->tunnels_loaded);
}

+ if (u->tunnels_loaded_by_index)
+ pa_hashmap_free(u->tunnels_loaded_by_index);
+
+ if (u->module_unlink_slot)
+ pa_hook_slot_free(u->module_unlink_slot);
+
pa_xfree(u);
}
--
2.14.4
Yclept Nemo
2018-07-17 00:25:42 UTC
Permalink
Turns out sinks and sources must be 'put' right after they've been
created otherwise they'll receive message whose handlers assert 'put'.
---
src/modules/module-tunnel-sink-new.c | 283 ++++++++++++++++----------
src/modules/module-tunnel-source-new.c | 355 +++++++++++++++++++--------------
src/modules/module-tunnel.c | 190 +++++++++++-------
3 files changed, 499 insertions(+), 329 deletions(-)

diff --git a/src/modules/module-tunnel-sink-new.c b/src/modules/module-tunnel-sink-new.c
index fd24e690..e25e3931 100644
--- a/src/modules/module-tunnel-sink-new.c
+++ b/src/modules/module-tunnel-sink-new.c
@@ -60,16 +60,23 @@ PA_MODULE_USAGE(
#define MAX_LATENCY_USEC (200 * PA_USEC_PER_MSEC)
#define TUNNEL_THREAD_FAILED_MAINLOOP 1

-enum {
- SINK_MESSAGE_PUT = PA_SINK_MESSAGE_MAX,
-};
-
static void stream_state_cb(pa_stream *stream, void *userdata);
static void stream_changed_buffer_attr_cb(pa_stream *stream, void *userdata);
static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *userdata);
static void context_state_cb(pa_context *c, void *userdata);
static void sink_update_requested_latency_cb(pa_sink *s);

+enum {
+ TUNNEL_MESSAGE_CREATE_SINK,
+ TUNNEL_MESSAGE_CREATE_STREAM,
+};
+
+typedef struct {
+ pa_msgobject parent;
+} tunnel_msg;
+
+PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject);
+
struct userdata {
pa_module *module;
pa_sink *sink;
@@ -85,12 +92,20 @@ struct userdata {
bool update_stream_bufferattr_after_connect;

bool connected;
+ bool created;

char *cookie_file;
char *remote_server;
char *remote_sink_name;
+
+ tunnel_msg *msg;
+
+ pa_sink_new_data *sink_data;
};

+static int stream_create(struct userdata *u);
+static int sink_create(struct userdata *u);
+
static const char* const valid_modargs[] = {
"sink_name",
"sink_properties",
@@ -144,6 +159,42 @@ static pa_proplist* tunnel_new_proplist(struct userdata *u) {
return proplist;
}

+static int tunnel_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
+ struct userdata *u = data;
+
+ pa_assert(u);
+
+ switch (code) {
+
+ /* Delivered from the IO thread, handled in the main thread. */
+ case TUNNEL_MESSAGE_CREATE_SINK:
+ pa_log_debug("Creating sink.");
+ if (sink_create(u) < 0) {
+ pa_module_unload_request(u->module, true);
+ return -1;
+ }
+ pa_sink_new_data_done(u->sink_data);
+ pa_xfree(u->sink_data);
+ u->sink_data = NULL;
+ pa_asyncmsgq_post(u->thread_mq->inq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_STREAM, u, 0, NULL, NULL);
+ break;
+
+ /* Delivered from the main thread, handled in the IO thread. */
+ case TUNNEL_MESSAGE_CREATE_STREAM:
+ u->created = true;
+ pa_log_debug("Creating stream.");
+ if (stream_create(u) < 0) {
+ u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
+ return -1;
+ }
+ u->connected = true;
+ break;
+
+ }
+
+ return 0;
+}
+
static void thread_func(void *userdata) {
struct userdata *u = userdata;
pa_proplist *proplist;
@@ -173,7 +224,7 @@ static void thread_func(void *userdata) {
u->remote_server,
PA_CONTEXT_NOAUTOSPAWN,
NULL) < 0) {
- pa_log("Failed to connect libpulse context");
+ pa_log("Failed to connect libpulse context: %s", pa_strerror(pa_context_errno(u->context)));
goto fail;
}

@@ -187,7 +238,7 @@ static void thread_func(void *userdata) {
goto fail;
}

- if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
+ if (PA_UNLIKELY(u->created && u->sink->thread_info.rewind_requested))
pa_sink_process_rewind(u->sink, 0);

if (u->connected &&
@@ -244,6 +295,34 @@ finish:
pa_log_debug("Thread shutting down");
}

+static void context_state_cb(pa_context *c, void *userdata) {
+ struct userdata *u = userdata;
+ pa_assert(u);
+
+ switch (pa_context_get_state(c)) {
+ case PA_CONTEXT_UNCONNECTED:
+ case PA_CONTEXT_CONNECTING:
+ case PA_CONTEXT_AUTHORIZING:
+ case PA_CONTEXT_SETTING_NAME:
+ break;
+ case PA_CONTEXT_READY: {
+ pa_log_debug("Context successfully connected.");
+ pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SINK, u, 0, NULL, NULL);
+ break;
+ }
+ case PA_CONTEXT_FAILED:
+ pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u->context)));
+ u->connected = false;
+ u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
+ break;
+ case PA_CONTEXT_TERMINATED:
+ pa_log_debug("Context terminated.");
+ u->connected = false;
+ u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
+ break;
+ }
+}
+
static void stream_state_cb(pa_stream *stream, void *userdata) {
struct userdata *u = userdata;

@@ -251,7 +330,7 @@ static void stream_state_cb(pa_stream *stream, void *userdata) {

switch (pa_stream_get_state(stream)) {
case PA_STREAM_FAILED:
- pa_log_error("Stream failed.");
+ pa_log_error("Stream failed: %s", pa_strerror(pa_context_errno(u->context)));
u->connected = false;
u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
break;
@@ -291,78 +370,53 @@ static void stream_set_buffer_attr_cb(pa_stream *stream, int success, void *user
stream_changed_buffer_attr_cb(stream, userdata);
}

-static void context_state_cb(pa_context *c, void *userdata) {
- struct userdata *u = userdata;
- pa_assert(u);
+/* Handled in the IO thread. */
+static int stream_create(struct userdata *u) {
+ pa_proplist *proplist;
+ pa_buffer_attr bufferattr;
+ pa_usec_t requested_latency;
+ char *username = pa_get_user_name_malloc();
+ char *hostname = pa_get_host_name_malloc();
+ /* TODO: old tunnel put here the remote sink_name into stream name e.g. 'Null Output for ***@lazus' */
+ char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
+ pa_xfree(hostname);
+ pa_xfree(username);

- switch (pa_context_get_state(c)) {
- case PA_CONTEXT_UNCONNECTED:
- case PA_CONTEXT_CONNECTING:
- case PA_CONTEXT_AUTHORIZING:
- case PA_CONTEXT_SETTING_NAME:
- break;
- case PA_CONTEXT_READY: {
- pa_proplist *proplist;
- pa_buffer_attr bufferattr;
- pa_usec_t requested_latency;
- char *username = pa_get_user_name_malloc();
- char *hostname = pa_get_host_name_malloc();
- /* TODO: old tunnel put here the remote sink_name into stream name e.g. 'Null Output for ***@lazus' */
- char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
- pa_xfree(hostname);
- pa_xfree(username);
-
- pa_log_debug("Connection successful. Creating stream.");
- pa_assert(!u->stream);
-
- proplist = tunnel_new_proplist(u);
- u->stream = pa_stream_new_with_proplist(u->context,
- stream_name,
- &u->sink->sample_spec,
- &u->sink->channel_map,
- proplist);
- pa_proplist_free(proplist);
- pa_xfree(stream_name);
+ pa_assert(!u->stream);

- if (!u->stream) {
- pa_log_error("Could not create a stream.");
- u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
- return;
- }
+ proplist = tunnel_new_proplist(u);
+ u->stream = pa_stream_new_with_proplist(u->context,
+ stream_name,
+ &u->sink->sample_spec,
+ &u->sink->channel_map,
+ proplist);
+ pa_proplist_free(proplist);
+ pa_xfree(stream_name);

- requested_latency = pa_sink_get_requested_latency_within_thread(u->sink);
- if (requested_latency == (pa_usec_t) -1)
- requested_latency = u->sink->thread_info.max_latency;
-
- reset_bufferattr(&bufferattr);
- bufferattr.tlength = pa_usec_to_bytes(requested_latency, &u->sink->sample_spec);
-
- pa_stream_set_state_callback(u->stream, stream_state_cb, userdata);
- pa_stream_set_buffer_attr_callback(u->stream, stream_changed_buffer_attr_cb, userdata);
- if (pa_stream_connect_playback(u->stream,
- u->remote_sink_name,
- &bufferattr,
- PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_DONT_MOVE | PA_STREAM_START_CORKED | PA_STREAM_AUTO_TIMING_UPDATE,
- NULL,
- NULL) < 0) {
- pa_log_error("Could not connect stream.");
- u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
- }
- u->connected = true;
- pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_PUT, NULL, 0, NULL, NULL);
- break;
- }
- case PA_CONTEXT_FAILED:
- pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u->context)));
- u->connected = false;
- u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
- break;
- case PA_CONTEXT_TERMINATED:
- pa_log_debug("Context terminated.");
- u->connected = false;
- u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
- break;
+ if (!u->stream) {
+ pa_log_error("Could not create stream: %s", pa_strerror(pa_context_errno(u->context)));
+ return -1;
+ }
+
+ requested_latency = pa_sink_get_requested_latency_within_thread(u->sink);
+ if (requested_latency == (pa_usec_t) -1)
+ requested_latency = u->sink->thread_info.max_latency;
+
+ reset_bufferattr(&bufferattr);
+ bufferattr.tlength = pa_usec_to_bytes(requested_latency, &u->sink->sample_spec);
+
+ pa_stream_set_state_callback(u->stream, stream_state_cb, u);
+ pa_stream_set_buffer_attr_callback(u->stream, stream_changed_buffer_attr_cb, u);
+ if (pa_stream_connect_playback(u->stream,
+ u->remote_sink_name,
+ &bufferattr,
+ PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_DONT_MOVE | PA_STREAM_START_CORKED | PA_STREAM_AUTO_TIMING_UPDATE,
+ NULL,
+ NULL) < 0) {
+ pa_log_error("Could not connect stream: %s", pa_strerror(pa_context_errno(u->context)));
+ return -1;
}
+ return 0;
}

static void sink_update_requested_latency_cb(pa_sink *s) {
@@ -435,13 +489,6 @@ static int sink_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t of
*((int64_t*) data) = remote_latency;
return 0;
}
-
- /* Delivered from the IO thread, handled in the main thread. */
- case SINK_MESSAGE_PUT: {
- if (u->connected)
- pa_sink_put(u->sink);
- return 0;
- }
}
return pa_sink_process_msg(o, code, data, offset, chunk);
}
@@ -480,10 +527,31 @@ static int sink_set_state_in_io_thread_cb(pa_sink *s, pa_sink_state_t new_state,
return 0;
}

+/* Handled in the main thread. */
+static int sink_create(struct userdata *u) {
+ if (!(u->sink = pa_sink_new(u->module->core, u->sink_data, PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY | PA_SINK_NETWORK))) {
+ pa_log("Failed to create sink.");
+ return -1;
+ }
+
+ u->sink->userdata = u;
+ u->sink->parent.process_msg = sink_process_msg_cb;
+ u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
+ u->sink->update_requested_latency = sink_update_requested_latency_cb;
+ pa_sink_set_latency_range(u->sink, 0, MAX_LATENCY_USEC);
+
+ /* set thread message queue */
+ pa_sink_set_asyncmsgq(u->sink, u->thread_mq->inq);
+ pa_sink_set_rtpoll(u->sink, u->rtpoll);
+
+ pa_sink_put(u->sink);
+
+ return 0;
+}
+
int pa__init(pa_module *m) {
struct userdata *u = NULL;
pa_modargs *ma = NULL;
- pa_sink_new_data sink_data;
pa_sample_spec ss;
pa_channel_map map;
const char *remote_server = NULL;
@@ -539,47 +607,36 @@ int pa__init(pa_module *m) {
* with module-tunnel-sink-new. */
u->rtpoll = pa_rtpoll_new();

- /* Create sink */
- pa_sink_new_data_init(&sink_data);
- sink_data.driver = __FILE__;
- sink_data.module = m;
+ /* Create sink data */
+ u->sink_data = pa_xnew(pa_sink_new_data, 1);
+ pa_sink_new_data_init(u->sink_data);
+ u->sink_data->driver = __FILE__;
+ u->sink_data->module = m;

default_sink_name = pa_sprintf_malloc("tunnel-sink-new.%s", remote_server);
sink_name = pa_modargs_get_value(ma, "sink_name", default_sink_name);

- pa_sink_new_data_set_name(&sink_data, sink_name);
- pa_sink_new_data_set_sample_spec(&sink_data, &ss);
- pa_sink_new_data_set_channel_map(&sink_data, &map);
+ pa_sink_new_data_set_name(u->sink_data, sink_name);
+ pa_sink_new_data_set_sample_spec(u->sink_data, &ss);
+ pa_sink_new_data_set_channel_map(u->sink_data, &map);

- pa_proplist_sets(sink_data.proplist, PA_PROP_DEVICE_CLASS, "sound");
- pa_proplist_setf(sink_data.proplist,
+ pa_proplist_sets(u->sink_data->proplist, PA_PROP_DEVICE_CLASS, "sound");
+ pa_proplist_setf(u->sink_data->proplist,
PA_PROP_DEVICE_DESCRIPTION,
_("Tunnel to %s/%s"),
remote_server,
pa_strempty(u->remote_sink_name));

- if (pa_modargs_get_proplist(ma, "sink_properties", sink_data.proplist, PA_UPDATE_REPLACE) < 0) {
+ if (pa_modargs_get_proplist(ma, "sink_properties", u->sink_data->proplist, PA_UPDATE_REPLACE) < 0) {
pa_log("Invalid properties");
- pa_sink_new_data_done(&sink_data);
- goto fail;
- }
- if (!(u->sink = pa_sink_new(m->core, &sink_data, PA_SINK_LATENCY | PA_SINK_DYNAMIC_LATENCY | PA_SINK_NETWORK))) {
- pa_log("Failed to create sink.");
- pa_sink_new_data_done(&sink_data);
goto fail;
}

- pa_sink_new_data_done(&sink_data);
- u->sink->userdata = u;
- u->sink->parent.process_msg = sink_process_msg_cb;
- u->sink->set_state_in_io_thread = sink_set_state_in_io_thread_cb;
- u->sink->update_requested_latency = sink_update_requested_latency_cb;
- pa_sink_set_latency_range(u->sink, 0, MAX_LATENCY_USEC);
-
- /* set thread message queue */
- pa_sink_set_asyncmsgq(u->sink, u->thread_mq->inq);
- pa_sink_set_rtpoll(u->sink, u->rtpoll);
+ /* Setup initial message handler */
+ u->msg = pa_msgobject_new(tunnel_msg);
+ u->msg->parent.process_msg = tunnel_process_msg_cb;

+ /* start IO thread */
if (!(u->thread = pa_thread_new("tunnel-sink", thread_func, u))) {
pa_log("Failed to create thread.");
goto fail;
@@ -635,6 +692,14 @@ void pa__done(pa_module *m) {
if (u->remote_server)
pa_xfree(u->remote_server);

+ if (u->msg)
+ pa_xfree(u->msg);
+
+ if (u->sink_data) {
+ pa_sink_new_data_done(u->sink_data);
+ pa_xfree(u->sink_data);
+ }
+
if (u->sink)
pa_sink_unref(u->sink);

diff --git a/src/modules/module-tunnel-source-new.c b/src/modules/module-tunnel-source-new.c
index bff2f6a1..df9c1757 100644
--- a/src/modules/module-tunnel-source-new.c
+++ b/src/modules/module-tunnel-source-new.c
@@ -59,15 +59,22 @@ PA_MODULE_USAGE(

#define TUNNEL_THREAD_FAILED_MAINLOOP 1

-enum {
- SOURCE_MESSAGE_PUT = PA_SOURCE_MESSAGE_MAX,
-};
-
static void stream_state_cb(pa_stream *stream, void *userdata);
static void stream_read_cb(pa_stream *s, size_t length, void *userdata);
static void context_state_cb(pa_context *c, void *userdata);
static void source_update_requested_latency_cb(pa_source *s);

+enum {
+ TUNNEL_MESSAGE_CREATE_SOURCE,
+ TUNNEL_MESSAGE_CREATE_STREAM,
+};
+
+typedef struct {
+ pa_msgobject parent;
+} tunnel_msg;
+
+PA_DEFINE_PRIVATE_CLASS(tunnel_msg, pa_msgobject);
+
struct userdata {
pa_module *module;
pa_source *source;
@@ -87,8 +94,16 @@ struct userdata {
char *cookie_file;
char *remote_server;
char *remote_source_name;
+
+ tunnel_msg *msg;
+
+ pa_source_new_data *source_data;
};

+static int stream_create(struct userdata *u);
+static int source_create(struct userdata *u);
+static void read_new_samples(struct userdata *u);
+
static const char* const valid_modargs[] = {
"source_name",
"source_properties",
@@ -133,63 +148,39 @@ static pa_proplist* tunnel_new_proplist(struct userdata *u) {
return proplist;
}

-static void stream_read_cb(pa_stream *s, size_t length, void *userdata) {
- struct userdata *u = userdata;
- u->new_data = true;
-}
-
-/* called from io context to read samples from the stream into our source */
-static void read_new_samples(struct userdata *u) {
- const void *p;
- size_t readable = 0;
- pa_memchunk memchunk;
+static int tunnel_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t offset, pa_memchunk *chunk) {
+ struct userdata *u = data;

pa_assert(u);
- u->new_data = false;
-
- pa_memchunk_reset(&memchunk);
-
- if (PA_UNLIKELY(!u->connected || pa_stream_get_state(u->stream) != PA_STREAM_READY))
- return;

- readable = pa_stream_readable_size(u->stream);
- while (readable > 0) {
- size_t nbytes = 0;
- if (PA_UNLIKELY(pa_stream_peek(u->stream, &p, &nbytes) != 0)) {
- pa_log("pa_stream_peek() failed: %s", pa_strerror(pa_context_errno(u->context)));
- u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
- return;
- }
-
- if (PA_LIKELY(p)) {
- /* we have valid data */
- memchunk.memblock = pa_memblock_new_fixed(u->module->core->mempool, (void *) p, nbytes, true);
- memchunk.length = nbytes;
- memchunk.index = 0;
-
- pa_source_post(u->source, &memchunk);
- pa_memblock_unref_fixed(memchunk.memblock);
- } else {
- size_t bytes_to_generate = nbytes;
-
- /* we have a hole. generate silence */
- memchunk = u->source->silence;
- pa_memblock_ref(memchunk.memblock);
-
- while (bytes_to_generate > 0) {
- if (bytes_to_generate < memchunk.length)
- memchunk.length = bytes_to_generate;
+ switch (code) {

- pa_source_post(u->source, &memchunk);
- bytes_to_generate -= memchunk.length;
+ /* Delivered from the IO thread, handled in the main thread. */
+ case TUNNEL_MESSAGE_CREATE_SOURCE:
+ pa_log_debug("Creating source.");
+ if (source_create(u) < 0) {
+ pa_module_unload_request(u->module, true);
+ return -1;
}
+ pa_source_new_data_done(u->source_data);
+ pa_xfree(u->source_data);
+ u->source_data = NULL;
+ pa_asyncmsgq_post(u->thread_mq->inq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_STREAM, u, 0, NULL, NULL);
+ break;

- pa_memblock_unref(memchunk.memblock);
- }
+ /* Delivered from the main thread, handled in the IO thread. */
+ case TUNNEL_MESSAGE_CREATE_STREAM:
+ pa_log_debug("Creating stream.");
+ if (stream_create(u) < 0) {
+ u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
+ return -1;
+ }
+ u->connected = true;
+ break;

- pa_stream_drop(u->stream);
- readable -= nbytes;
}
+
+ return 0;
}

static void thread_func(void *userdata) {
@@ -259,6 +250,34 @@ finish:
pa_log_debug("Thread shutting down");
}

+static void context_state_cb(pa_context *c, void *userdata) {
+ struct userdata *u = userdata;
+ pa_assert(u);
+
+ switch (pa_context_get_state(c)) {
+ case PA_CONTEXT_UNCONNECTED:
+ case PA_CONTEXT_CONNECTING:
+ case PA_CONTEXT_AUTHORIZING:
+ case PA_CONTEXT_SETTING_NAME:
+ break;
+ case PA_CONTEXT_READY: {
+ pa_log_debug("Context successfully connected.");
+ pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->msg), TUNNEL_MESSAGE_CREATE_SOURCE, u, 0, NULL, NULL);
+ break;
+ }
+ case PA_CONTEXT_FAILED:
+ pa_log_debug("Context failed: %s.", pa_strerror(pa_context_errno(u->context)));
+ u->connected = false;
+ u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
+ break;
+ case PA_CONTEXT_TERMINATED:
+ pa_log_debug("Context terminated.");
+ u->connected = false;
+ u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
+ break;
+ }
+}
+
static void stream_state_cb(pa_stream *stream, void *userdata) {
struct userdata *u = userdata;

@@ -289,76 +308,110 @@ static void stream_state_cb(pa_stream *stream, void *userdata) {
}
}

-static void context_state_cb(pa_context *c, void *userdata) {
+static void stream_read_cb(pa_stream *s, size_t length, void *userdata) {
struct userdata *u = userdata;
+ u->new_data = true;
+}
+
+/* called from io context to read samples from the stream into our source */
+static void read_new_samples(struct userdata *u) {
+ const void *p;
+ size_t readable = 0;
+ pa_memchunk memchunk;
+
pa_assert(u);
+ u->new_data = false;

- switch (pa_context_get_state(c)) {
- case PA_CONTEXT_UNCONNECTED:
- case PA_CONTEXT_CONNECTING:
- case PA_CONTEXT_AUTHORIZING:
- case PA_CONTEXT_SETTING_NAME:
- break;
- case PA_CONTEXT_READY: {
- pa_proplist *proplist;
- pa_buffer_attr bufferattr;
- pa_usec_t requested_latency;
- char *username = pa_get_user_name_malloc();
- char *hostname = pa_get_host_name_malloc();
- /* TODO: old tunnel put here the remote source_name into stream name e.g. 'Null Output for ***@lazus' */
- char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
- pa_xfree(username);
- pa_xfree(hostname);
-
- pa_log_debug("Connection successful. Creating stream.");
- pa_assert(!u->stream);
-
- proplist = tunnel_new_proplist(u);
- u->stream = pa_stream_new_with_proplist(u->context,
- stream_name,
- &u->source->sample_spec,
- &u->source->channel_map,
- proplist);
- pa_proplist_free(proplist);
- pa_xfree(stream_name);
+ pa_memchunk_reset(&memchunk);

- if (!u->stream) {
- pa_log_error("Could not create a stream: %s", pa_strerror(pa_context_errno(u->context)));
- u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
- return;
- }
+ if (PA_UNLIKELY(!u->connected || pa_stream_get_state(u->stream) != PA_STREAM_READY))
+ return;

- requested_latency = pa_source_get_requested_latency_within_thread(u->source);
- if (requested_latency == (uint32_t) -1)
- requested_latency = u->source->thread_info.max_latency;
+ readable = pa_stream_readable_size(u->stream);
+ while (readable > 0) {
+ size_t nbytes = 0;
+ if (PA_UNLIKELY(pa_stream_peek(u->stream, &p, &nbytes) != 0)) {
+ pa_log("pa_stream_peek() failed: %s", pa_strerror(pa_context_errno(u->context)));
+ u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
+ return;
+ }

- reset_bufferattr(&bufferattr);
- bufferattr.fragsize = pa_usec_to_bytes(requested_latency, &u->source->sample_spec);
+ if (PA_LIKELY(p)) {
+ /* we have valid data */
+ memchunk.memblock = pa_memblock_new_fixed(u->module->core->mempool, (void *) p, nbytes, true);
+ memchunk.length = nbytes;
+ memchunk.index = 0;

- pa_stream_set_state_callback(u->stream, stream_state_cb, userdata);
- pa_stream_set_read_callback(u->stream, stream_read_cb, userdata);
- if (pa_stream_connect_record(u->stream,
- u->remote_source_name,
- &bufferattr,
- PA_STREAM_INTERPOLATE_TIMING|PA_STREAM_DONT_MOVE|PA_STREAM_AUTO_TIMING_UPDATE|PA_STREAM_START_CORKED) < 0) {
- pa_log_debug("Could not create stream: %s", pa_strerror(pa_context_errno(u->context)));
- u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
+ pa_source_post(u->source, &memchunk);
+ pa_memblock_unref_fixed(memchunk.memblock);
+ } else {
+ size_t bytes_to_generate = nbytes;
+
+ /* we have a hole. generate silence */
+ memchunk = u->source->silence;
+ pa_memblock_ref(memchunk.memblock);
+
+ while (bytes_to_generate > 0) {
+ if (bytes_to_generate < memchunk.length)
+ memchunk.length = bytes_to_generate;
+
+ pa_source_post(u->source, &memchunk);
+ bytes_to_generate -= memchunk.length;
}
- u->connected = true;
- pa_asyncmsgq_post(u->thread_mq->outq, PA_MSGOBJECT(u->source), SOURCE_MESSAGE_PUT, NULL, 0, NULL, NULL);
- break;
+
+ pa_memblock_unref(memchunk.memblock);
}
- case PA_CONTEXT_FAILED:
- pa_log_debug("Context failed with err %s.", pa_strerror(pa_context_errno(u->context)));
- u->connected = false;
- u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
- break;
- case PA_CONTEXT_TERMINATED:
- pa_log_debug("Context terminated.");
- u->connected = false;
- u->thread_mainloop_api->quit(u->thread_mainloop_api, TUNNEL_THREAD_FAILED_MAINLOOP);
- break;
+
+ pa_stream_drop(u->stream);
+ readable -= nbytes;
+ }
+}
+
+/* Handled in the IO thread. */
+static int stream_create(struct userdata *u) {
+ pa_proplist *proplist;
+ pa_buffer_attr bufferattr;
+ pa_usec_t requested_latency;
+ char *username = pa_get_user_name_malloc();
+ char *hostname = pa_get_host_name_malloc();
+ /* TODO: old tunnel put here the remote source_name into stream name e.g. 'Null Output for ***@lazus' */
+ char *stream_name = pa_sprintf_malloc(_("Tunnel for %s@%s"), username, hostname);
+ pa_xfree(username);
+ pa_xfree(hostname);
+
+ pa_assert(!u->stream);
+
+ proplist = tunnel_new_proplist(u);
+ u->stream = pa_stream_new_with_proplist(u->context,
+ stream_name,
+ &u->source->sample_spec,
+ &u->source->channel_map,
+ proplist);
+ pa_proplist_free(proplist);
+ pa_xfree(stream_name);
+
+ if (!u->stream) {
+ pa_log_error("Could not create stream: %s", pa_strerror(pa_context_errno(u->context)));
+ return -1;
}
+
+ requested_latency = pa_source_get_requested_latency_within_thread(u->source);
+ if (requested_latency == (pa_usec_t) -1)
+ requested_latency = u->source->thread_info.max_latency;
+
+ reset_bufferattr(&bufferattr);
+ bufferattr.fragsize = pa_usec_to_bytes(requested_latency, &u->source->sample_spec);
+
+ pa_stream_set_state_callback(u->stream, stream_state_cb, u);
+ pa_stream_set_read_callback(u->stream, stream_read_cb, u);
+ if (pa_stream_connect_record(u->stream,
+ u->remote_source_name,
+ &bufferattr,
+ PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_DONT_MOVE | PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_START_CORKED) < 0) {
+ pa_log_error("Could not connect stream: %s", pa_strerror(pa_context_errno(u->context)));
+ return -1;
+ }
+ return 0;
}

static void source_update_requested_latency_cb(pa_source *s) {
@@ -434,13 +487,6 @@ static int source_process_msg_cb(pa_msgobject *o, int code, void *data, int64_t

return 0;
}
-
- /* Delivered from the IO thread, handled in the main thread. */
- case SOURCE_MESSAGE_PUT: {
- if (u->connected)
- pa_source_put(u->source);
- return 0;
- }
}
return pa_source_process_msg(o, code, data, offset, chunk);
}
@@ -479,10 +525,30 @@ static int source_set_state_in_io_thread_cb(pa_source *s, pa_source_state_t new_
return 0;
}

+/* Handled in the main thread. */
+static int source_create(struct userdata *u) {
+ if (!(u->source = pa_source_new(u->module->core, u->source_data, PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY | PA_SOURCE_NETWORK))) {
+ pa_log("Failed to create source.");
+ return -1;
+ }
+
+ u->source->userdata = u;
+ u->source->parent.process_msg = source_process_msg_cb;
+ u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
+ u->source->update_requested_latency = source_update_requested_latency_cb;
+
+ /* set thread message queue */
+ pa_source_set_asyncmsgq(u->source, u->thread_mq->inq);
+ pa_source_set_rtpoll(u->source, u->rtpoll);
+
+ pa_source_put(u->source);
+
+ return 0;
+}
+
int pa__init(pa_module *m) {
struct userdata *u = NULL;
pa_modargs *ma = NULL;
- pa_source_new_data source_data;
pa_sample_spec ss;
pa_channel_map map;
const char *remote_server = NULL;
@@ -535,45 +601,36 @@ int pa__init(pa_module *m) {
* only works because it calls pa_asyncmsq_process_one(). */
u->rtpoll = pa_rtpoll_new();

- /* Create source */
- pa_source_new_data_init(&source_data);
- source_data.driver = __FILE__;
- source_data.module = m;
+ /* Create source data */
+ u->source_data = pa_xnew(pa_source_new_data, 1);
+ pa_source_new_data_init(u->source_data);
+ u->source_data->driver = __FILE__;
+ u->source_data->module = m;

default_source_name = pa_sprintf_malloc("tunnel-source-new.%s", remote_server);
source_name = pa_modargs_get_value(ma, "source_name", default_source_name);

- pa_source_new_data_set_name(&source_data, source_name);
- pa_source_new_data_set_sample_spec(&source_data, &ss);
- pa_source_new_data_set_channel_map(&source_data, &map);
+ pa_source_new_data_set_name(u->source_data, source_name);
+ pa_source_new_data_set_sample_spec(u->source_data, &ss);
+ pa_source_new_data_set_channel_map(u->source_data, &map);

- pa_proplist_sets(source_data.proplist, PA_PROP_DEVICE_CLASS, "sound");
- pa_proplist_setf(source_data.proplist,
+ pa_proplist_sets(u->source_data->proplist, PA_PROP_DEVICE_CLASS, "sound");
+ pa_proplist_setf(u->source_data->proplist,
PA_PROP_DEVICE_DESCRIPTION,
_("Tunnel to %s/%s"),
remote_server,
pa_strempty(u->remote_source_name));

- if (pa_modargs_get_proplist(ma, "source_properties", source_data.proplist, PA_UPDATE_REPLACE) < 0) {
+ if (pa_modargs_get_proplist(ma, "source_properties", u->source_data->proplist, PA_UPDATE_REPLACE) < 0) {
pa_log("Invalid properties");
- pa_source_new_data_done(&source_data);
- goto fail;
- }
- if (!(u->source = pa_source_new(m->core, &source_data, PA_SOURCE_LATENCY | PA_SOURCE_DYNAMIC_LATENCY | PA_SOURCE_NETWORK))) {
- pa_log("Failed to create source.");
- pa_source_new_data_done(&source_data);
goto fail;
}

- pa_source_new_data_done(&source_data);
- u->source->userdata = u;
- u->source->parent.process_msg = source_process_msg_cb;
- u->source->set_state_in_io_thread = source_set_state_in_io_thread_cb;
- u->source->update_requested_latency = source_update_requested_latency_cb;
-
- pa_source_set_asyncmsgq(u->source, u->thread_mq->inq);
- pa_source_set_rtpoll(u->source, u->rtpoll);
+ /* setup initial message handler */
+ u->msg = pa_msgobject_new(tunnel_msg);
+ u->msg->parent.process_msg = tunnel_process_msg_cb;

+ /* start IO thread */
if (!(u->thread = pa_thread_new("tunnel-source", thread_func, u))) {
pa_log("Failed to create thread.");
goto fail;
@@ -629,6 +686,14 @@ void pa__done(pa_module *m) {
if (u->remote_server)
pa_xfree(u->remote_server);

+ if (u->msg)
+ pa_xfree(u->msg);
+
+ if (u->source_data) {
+ pa_source_new_data_done(u->source_data);
+ pa_xfree(u->source_data);
+ }
+
if (u->source)
pa_source_unref(u->source);

diff --git a/src/modules/module-tunnel.c b/src/modules/module-tunnel.c
index 960f8533..9ea1ab33 100644
--- a/src/modules/module-tunnel.c
+++ b/src/modules/module-tunnel.c
@@ -131,7 +131,8 @@ enum {
SINK_MESSAGE_REQUEST = PA_SINK_MESSAGE_MAX,
SINK_MESSAGE_REMOTE_SUSPEND,
SINK_MESSAGE_UPDATE_LATENCY,
- SINK_MESSAGE_POST
+ SINK_MESSAGE_POST,
+ SINK_MESSAGE_CREATED,
};

#define DEFAULT_TLENGTH_MSEC 150
@@ -142,7 +143,7 @@ enum {
enum {
SOURCE_MESSAGE_POST = PA_SOURCE_MESSAGE_MAX,
SOURCE_MESSAGE_REMOTE_SUSPEND,
- SOURCE_MESSAGE_UPDATE_LATENCY
+ SOURCE_MESSAGE_UPDATE_LATENCY,
};

#define DEFAULT_FRAGSIZE_MSEC 25
@@ -196,10 +197,13 @@ struct userdata {

char *server_name;
#ifdef TUNNEL_SINK
+ pa_sink_new_data *sink_data;
char *sink_name;
pa_sink *sink;
size_t requested_bytes;
+ bool sink_created;
#else
+ pa_source_new_data *source_data;
char *source_name;
pa_source *source;
pa_mcalign *mcalign;
@@ -241,6 +245,11 @@ struct userdata {
};

static void request_latency(struct userdata *u);
+#ifdef TUNNEL_SINK
+static int sink_create(struct userdata *u);
+#else
+static int source_create(struct userdata *u);
+#endif

/* Called from main context */
static void command_stream_or_client_event(pa_pdispatch *pd, uint32_t command, uint32_t tag, pa_tagstruct *t, void *userdata) {
@@ -550,6 +559,11 @@ static int sink_process_msg(pa_msgobject *o, int code, void *data, int64_t offse
return 0;
}

+ case SINK_MESSAGE_CREATED:
+
+ u->sink_created = true;
+ return 0;
+
case SINK_MESSAGE_POST:

/* OK, This might be a bit confusing. This message is
@@ -717,7 +731,7 @@ static void thread_func(void *userdata) {
int ret;

#ifdef TUNNEL_SINK
- if (PA_UNLIKELY(u->sink->thread_info.rewind_requested))
+ if (PA_UNLIKELY(u->sink_created && u->sink->thread_info.rewind_requested))
pa_sink_process_rewind(u->sink, 0);
#endif

@@ -1599,9 +1613,17 @@ static void setup_complete_callback(pa_pdispatch *pd, uint32_t command, uint32_t
}

#ifdef TUNNEL_SINK
- pa_sink_put(u->sink);
+ if (sink_create(u) < 0)
+ goto fail;
+ pa_sink_new_data_done(u->sink_data);
+ pa_xfree(u->sink_data);
+ u->sink_data = NULL;
#else
- pa_source_put(u->source);
+ if (source_create(u) < 0)
+ goto fail;
+ pa_source_new_data_done(u->source_data);
+ pa_xfree(u->source_data);
+ u->source_data = NULL;
#endif

/* Starting with protocol version 13 the MSB of the version tag
@@ -1922,6 +1944,58 @@ static void sink_set_mute(pa_sink *sink) {

#endif

+#ifdef TUNNEL_SINK
+/* Called from main context */
+static int sink_create(struct userdata *u) {
+ if (!(u->sink = pa_sink_new(u->module->core, u->sink_data, PA_SINK_NETWORK|PA_SINK_LATENCY))) {
+ pa_log("Failed to create sink.");
+ return -1;
+ }
+
+ u->sink->userdata = u;
+ u->sink->parent.process_msg = sink_process_msg;
+ u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
+ pa_sink_set_set_volume_callback(u->sink, sink_set_volume);
+ pa_sink_set_set_mute_callback(u->sink, sink_set_mute);
+
+ u->sink->refresh_volume = u->sink->refresh_muted = false;
+
+/* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
+
+ pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
+ pa_sink_set_rtpoll(u->sink, u->rtpoll);
+
+ pa_sink_put(u->sink);
+
+ pa_asyncmsgq_post(u->sink->asyncmsgq, PA_MSGOBJECT(u->sink), SINK_MESSAGE_CREATED, NULL, 0, NULL, NULL);
+
+ return 0;
+}
+#else
+/* Called from main context */
+static int source_create(struct userdata *u) {
+ if (!(u->source = pa_source_new(u->module->core, u->source_data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY))) {
+ pa_log("Failed to create source.");
+ return -1;
+ }
+
+ u->source->userdata = u;
+ u->source->parent.process_msg = source_process_msg;
+ u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb;
+
+/* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
+
+ pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
+ pa_source_set_rtpoll(u->source, u->rtpoll);
+
+ u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
+
+ pa_source_put(u->source);
+
+ return 0;
+}
+#endif
+
int pa__init(pa_module*m) {
pa_modargs *ma = NULL;
struct userdata *u = NULL;
@@ -1930,11 +2004,6 @@ int pa__init(pa_module*m) {
pa_sample_spec ss;
pa_channel_map map;
char *dn = NULL;
-#ifdef TUNNEL_SINK
- pa_sink_new_data data;
-#else
- pa_source_new_data data;
-#endif
bool automatic;
#ifdef HAVE_X11
xcb_connection_t *xcb = NULL;
@@ -2130,90 +2199,49 @@ int pa__init(pa_module*m) {
pa_socket_client_set_callback(u->client, on_connection, u);

#ifdef TUNNEL_SINK
+ u->sink_data = pa_xnew(pa_sink_new_data, 1);
+ pa_sink_new_data_init(u->sink_data);

if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "sink_name", NULL))))
dn = pa_sprintf_malloc("tunnel-sink.%s", u->server_name);

- pa_sink_new_data_init(&data);
- data.driver = __FILE__;
- data.module = m;
- data.namereg_fail = false;
- pa_sink_new_data_set_name(&data, dn);
- pa_sink_new_data_set_sample_spec(&data, &ss);
- pa_sink_new_data_set_channel_map(&data, &map);
- pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
- pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
+ u->sink_data->driver = __FILE__;
+ u->sink_data->module = m;
+ u->sink_data->namereg_fail = false;
+ pa_sink_new_data_set_name(u->sink_data, dn);
+ pa_sink_new_data_set_sample_spec(u->sink_data, &ss);
+ pa_sink_new_data_set_channel_map(u->sink_data, &map);
+ pa_proplist_setf(u->sink_data->proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->sink_name), u->sink_name ? " on " : "", u->server_name);
+ pa_proplist_sets(u->sink_data->proplist, "tunnel.remote.server", u->server_name);
if (u->sink_name)
- pa_proplist_sets(data.proplist, "tunnel.remote.sink", u->sink_name);
+ pa_proplist_sets(u->sink_data->proplist, "tunnel.remote.sink", u->sink_name);

- if (pa_modargs_get_proplist(ma, "sink_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
+ if (pa_modargs_get_proplist(ma, "sink_properties", u->sink_data->proplist, PA_UPDATE_REPLACE) < 0) {
pa_log("Invalid properties");
- pa_sink_new_data_done(&data);
- goto fail;
- }
-
- u->sink = pa_sink_new(m->core, &data, PA_SINK_NETWORK|PA_SINK_LATENCY);
- pa_sink_new_data_done(&data);
-
- if (!u->sink) {
- pa_log("Failed to create sink.");
goto fail;
}
-
- u->sink->parent.process_msg = sink_process_msg;
- u->sink->userdata = u;
- u->sink->set_state_in_main_thread = sink_set_state_in_main_thread_cb;
- pa_sink_set_set_volume_callback(u->sink, sink_set_volume);
- pa_sink_set_set_mute_callback(u->sink, sink_set_mute);
-
- u->sink->refresh_volume = u->sink->refresh_muted = false;
-
-/* pa_sink_set_latency_range(u->sink, MIN_NETWORK_LATENCY_USEC, 0); */
-
- pa_sink_set_asyncmsgq(u->sink, u->thread_mq.inq);
- pa_sink_set_rtpoll(u->sink, u->rtpoll);
-
#else
+ u->source_data = pa_xnew(pa_source_new_data, 1);
+ pa_source_new_data_init(u->source_data);

if (!(dn = pa_xstrdup(pa_modargs_get_value(ma, "source_name", NULL))))
dn = pa_sprintf_malloc("tunnel-source.%s", u->server_name);

- pa_source_new_data_init(&data);
- data.driver = __FILE__;
- data.module = m;
- data.namereg_fail = false;
- pa_source_new_data_set_name(&data, dn);
- pa_source_new_data_set_sample_spec(&data, &ss);
- pa_source_new_data_set_channel_map(&data, &map);
- pa_proplist_setf(data.proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
- pa_proplist_sets(data.proplist, "tunnel.remote.server", u->server_name);
+ u->source_data->driver = __FILE__;
+ u->source_data->module = m;
+ u->source_data->namereg_fail = false;
+ pa_source_new_data_set_name(u->source_data, dn);
+ pa_source_new_data_set_sample_spec(u->source_data, &ss);
+ pa_source_new_data_set_channel_map(u->source_data, &map);
+ pa_proplist_setf(u->source_data->proplist, PA_PROP_DEVICE_DESCRIPTION, "%s%s%s", pa_strempty(u->source_name), u->source_name ? " on " : "", u->server_name);
+ pa_proplist_sets(u->source_data->proplist, "tunnel.remote.server", u->server_name);
if (u->source_name)
- pa_proplist_sets(data.proplist, "tunnel.remote.source", u->source_name);
+ pa_proplist_sets(u->source_data->proplist, "tunnel.remote.source", u->source_name);

- if (pa_modargs_get_proplist(ma, "source_properties", data.proplist, PA_UPDATE_REPLACE) < 0) {
+ if (pa_modargs_get_proplist(ma, "source_properties", u->source_data->proplist, PA_UPDATE_REPLACE) < 0) {
pa_log("Invalid properties");
- pa_source_new_data_done(&data);
- goto fail;
- }
-
- u->source = pa_source_new(m->core, &data, PA_SOURCE_NETWORK|PA_SOURCE_LATENCY);
- pa_source_new_data_done(&data);
-
- if (!u->source) {
- pa_log("Failed to create source.");
goto fail;
}
-
- u->source->parent.process_msg = source_process_msg;
- u->source->set_state_in_main_thread = source_set_state_in_main_thread_cb;
- u->source->userdata = u;
-
-/* pa_source_set_latency_range(u->source, MIN_NETWORK_LATENCY_USEC, 0); */
-
- pa_source_set_asyncmsgq(u->source, u->thread_mq.inq);
- pa_source_set_rtpoll(u->source, u->rtpoll);
-
- u->mcalign = pa_mcalign_new(pa_frame_size(&u->source->sample_spec));
#endif

u->time_event = NULL;
@@ -2300,6 +2328,18 @@ void pa__done(pa_module*m) {
pa_source_unref(u->source);
#endif

+#ifdef TUNNEL_SINK
+ if (u->sink_data) {
+ pa_sink_new_data_done(u->sink_data);
+ pa_xfree(u->sink_data);
+ }
+#else
+ if (u->source_data) {
+ pa_source_new_data_done(u->source_data);
+ pa_xfree(u->source_data);
+ }
+#endif
+
if (u->rtpoll)
pa_rtpoll_free(u->rtpoll);
--
2.14.4
Yclept Nemo
2018-07-17 00:25:43 UTC
Permalink
---
src/pulsecore/object.h | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/pulsecore/object.h b/src/pulsecore/object.h
index b3d500e2..15e8365b 100644
--- a/src/pulsecore/object.h
+++ b/src/pulsecore/object.h
@@ -37,7 +37,7 @@ struct pa_object {
};

pa_object *pa_object_new_internal(size_t size, const char *type_id, bool (*check_type)(const char *type_id));
-#define pa_object_new(type) ((type*) pa_object_new_internal(sizeof(type), type##_type_id, type##_check_type)
+#define pa_object_new(type) ((type*) pa_object_new_internal(sizeof(type), type##_type_id, type##_check_type))

#define pa_object_free ((void (*) (pa_object* _obj)) pa_xfree)
--
2.14.4
Tanu Kaskinen
2018-08-12 12:18:58 UTC
Permalink
Post by Yclept Nemo
---
src/pulsecore/object.h | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/pulsecore/object.h b/src/pulsecore/object.h
index b3d500e2..15e8365b 100644
--- a/src/pulsecore/object.h
+++ b/src/pulsecore/object.h
@@ -37,7 +37,7 @@ struct pa_object {
};
pa_object *pa_object_new_internal(size_t size, const char *type_id, bool (*check_type)(const char *type_id));
-#define pa_object_new(type) ((type*) pa_object_new_internal(sizeof(type), type##_type_id, type##_check_type)
+#define pa_object_new(type) ((type*) pa_object_new_internal(sizeof(type), type##_type_id, type##_check_type))
#define pa_object_free ((void (*) (pa_object* _obj)) pa_xfree)
Thanks! Applied.
--
Tanu

https://www.patreon.com/tanuk
https://liberapay.com/tanuk
Tanu Kaskinen
2018-08-12 12:24:42 UTC
Permalink
Post by Yclept Nemo
Module zeroconf-discover would add the same sink twice: once over IPv4, once
* Options to disable IPv4 and IPv6. If both options are provided, the module
fails to load.
* Option to only load one tunnel per avahi service name and type.
All the tunnel modules would immediately add a sink or source, even before
* Add the sink/source only after the context is successfully connected.
* zeroconf-discover: fix memory leak and prevent double-free.
* pa_object_new: fix missing close paren.
zeroconf-discover: add arguments to disable ipv4/6
zeroconf-discover: fix memory issues
zeroconf-discover: add argument 'one_per_name_type'
tunnel*: put sink/source after authentication
Fixes
tunnel*: redo 'put sink/source after auth..'
Fix pa_object_new macro (missing close paren)
zeroconf-discover: fix 'one_per_name_type'
Once I started to review patches 5 and 6, I found out that the later
patches undo a lot of things of the earlier patches. Please clean up
the commits before submitting your work, so that I get to review
finished patches instead of the trial-and-error development process.

Nevertheless, thanks for working on this!
--
Tanu

https://www.patreon.com/tanuk
https://liberapay.com/tanuk
Continue reading on narkive:
Loading...