Initial support for XEP-0198 stream management

Request stream management for the session by default. The rate of requests
and acks is controlled by the -I option. With stream management, the
client can check when all stanzas have been flushed to the server. For
example, the following works:

    cat /path/textfile | xmppipe -o transfer -r sender

    # on another system
    xmppipe -o transfer -r receiver > tmpfile

The behaviour is to always attempt enabling stream management. The XEP
is ambiguous about the response to an enable when stream management is
not offered. Presumably the server will respond with "failure".

So the code should probably have 2 checks:

* if stream management is offered as a feature, send an enable
* if the server responds with "enabled", handle requests
pull/1/head
Michael Santos 9 years ago
parent a233caf32c
commit fdab8a8901

@ -32,17 +32,24 @@ int handle_message(xmpp_conn_t * const, xmpp_stanza_t * const, void * const);
int handle_presence(xmpp_conn_t * const, xmpp_stanza_t * const, void * const);
int handle_presence_error(xmpp_conn_t * const, xmpp_stanza_t * const,
void * const);
int handle_sm_request(xmpp_conn_t * const, xmpp_stanza_t * const, void * const);
int handle_sm_enabled(xmpp_conn_t * const, xmpp_stanza_t * const, void * const);
int handle_sm_ack(xmpp_conn_t * const, xmpp_stanza_t * const, void * const);
int handle_null(xmpp_conn_t * const, xmpp_stanza_t * const, void * const);
int xmppipe_connect_init(xmppipe_state_t *);
int xmppipe_stream_init(xmppipe_state_t *);
int xmppipe_muc_init(xmppipe_state_t *);
int xmppipe_presence_init(xmppipe_state_t *);
void event_loop(xmppipe_state_t *);
int handle_stdin(xmppipe_state_t *, int, char *, size_t);
void xmppipe_stream_close(xmppipe_state_t *);
void xmppipe_muc_join(xmppipe_state_t *);
void xmppipe_muc_unlock(xmppipe_state_t *);
void xmppipe_muc_subject(xmppipe_state_t *, char *);
void xmppipe_send_message(xmppipe_state_t *, char *, char *, char *);
void xmppipe_send(xmppipe_state_t *, xmpp_stanza_t *const);
void xmppipe_ping(xmppipe_state_t *);
int
@ -65,11 +72,12 @@ main(int argc, char **argv)
state->poll = 10;
state->keepalive = 60 * 1000;
state->keepalive_limit = 3;
state->sm_request_interval = 5;
jid = xmppipe_getenv("XMPPIPE_USERNAME");
pass = xmppipe_getenv("XMPPIPE_PASSWORD");
while ( (ch = getopt(argc, argv, "a:dDehk:K:m:o:P:p:r:sS:u:v")) != -1) {
while ( (ch = getopt(argc, argv, "a:dDehI:k:K:m:o:P:p:r:sS:u:v")) != -1) {
switch (ch) {
case 'u':
/* username/jid */
@ -103,12 +111,16 @@ main(int argc, char **argv)
state->verbose++;
break;
case 'I':
/* XEP-0198: stream management request interval */
state->sm_request_interval = (u_int32_t)atoi(optarg);
break;
case 'k':
/* keepalives */
/* XEP-0199: XMPP ping keepalives */
state->keepalive = (u_int32_t)atoi(optarg) * 1000;
break;
case 'K':
/* number of keepalive without a reply */
/* XEP-0199: number of keepalive without a reply */
state->keepalive_limit = (u_int32_t)atoi(optarg);
break;
case 'm':
@ -181,6 +193,9 @@ main(int argc, char **argv)
if (xmppipe_connect_init(state) < 0)
errx(EXIT_FAILURE, "connection failed");
if (xmppipe_stream_init(state) < 0)
errx(EXIT_FAILURE, "enabling stream management failed");
if (xmppipe_muc_init(state) < 0)
errx(EXIT_FAILURE, "failed to join MUC");
@ -192,6 +207,7 @@ main(int argc, char **argv)
event_loop(state);
xmppipe_stream_close(state);
xmpp_conn_release(conn);
xmpp_ctx_free(state->ctx);
xmpp_shutdown();
@ -215,6 +231,39 @@ xmppipe_connect_init(xmppipe_state_t *state)
}
}
int
xmppipe_stream_init(xmppipe_state_t *state)
{
xmpp_stanza_t *enable = NULL;
if (state->sm_request_interval == 0)
return 0;
/* <enable xmlns='urn:xmpp:sm:3'/> */
enable = xmpp_stanza_new(state->ctx);
xmpp_stanza_set_name(enable, "enable");
xmpp_stanza_set_ns(enable, "urn:xmpp:sm:3");
xmpp_send(state->conn, enable);
xmpp_stanza_release(enable);
xmpp_handler_add(state->conn, handle_sm_enabled,
"urn:xmpp:sm:3", "enabled", NULL, state);
xmpp_handler_add(state->conn, handle_sm_request,
"urn:xmpp:sm:3", "r", NULL, state);
xmpp_handler_add(state->conn, handle_sm_ack,
"urn:xmpp:sm:3", "a", NULL, state);
/* XXX multiple handlers can be called for each event
* XXX
* XXX * is the order handlers are called determinisitc?
* XXX * the NULL handler needs to installed as soon as stream management is enabled
* XXX * a handler has to exist for unsupported events
*/
xmpp_handler_add(state->conn, handle_null, NULL, NULL, NULL, state);
return 0;
}
int
xmppipe_muc_init(xmppipe_state_t *state)
{
@ -250,7 +299,7 @@ xmppipe_muc_init(xmppipe_state_t *state)
xmpp_stanza_add_child(iq, query);
xmpp_send(state->conn, iq);
xmppipe_send(state, iq);
xmpp_stanza_release(iq);
state->status = XMPPIPE_S_MUC_SERVICE_LOOKUP;
@ -259,7 +308,7 @@ xmppipe_muc_init(xmppipe_state_t *state)
/* Send initial <presence/> so that we appear online to contacts */
presence = xmpp_stanza_new(state->ctx);
xmpp_stanza_set_name(presence, "presence");
xmpp_send(state->conn, presence);
xmppipe_send(state, presence);
xmpp_stanza_release(presence);
if (state->out) {
@ -303,14 +352,25 @@ event_loop(xmppipe_state_t *state)
if (state->status == XMPPIPE_S_DISCONNECTED)
goto XMPPIPE_EXIT;
if (eof)
goto XMPPIPE_POLL;
if (eof) {
if (state->opt & XMPPIPE_OPT_EOF)
goto XMPPIPE_POLL;
if (state->sm_enabled && (state->sm_ack_sent < state->sm_request)) {
if (state->verbose)
(void)fprintf(stderr, "POLLING: request: %d ack: %d\n",
state->sm_request, state->sm_ack_sent);
goto XMPPIPE_POLL;
}
else
goto XMPPIPE_EXIT;
}
switch (handle_stdin(state, fd, buf, state->bufsz-1)) {
case -1:
goto XMPPIPE_EXIT;
case 0:
if (!(state->opt & XMPPIPE_OPT_EOF))
if (!(state->opt & XMPPIPE_OPT_EOF) && !state->sm_enabled)
goto XMPPIPE_EXIT;
eof = 1;
@ -403,7 +463,7 @@ handle_stdin(xmppipe_state_t *state, int fd, char *buf, size_t len)
}
void
handle_connection(xmpp_conn_t * const conn, const xmpp_conn_event_t status,
handle_connection(xmpp_conn_t * const conn, const xmpp_conn_event_t status,
const int error, xmpp_stream_error_t * const stream_error,
void * const userdata)
{
@ -423,6 +483,99 @@ handle_connection(xmpp_conn_t * const conn, const xmpp_conn_event_t status,
}
}
int
handle_null(xmpp_conn_t * const conn, xmpp_stanza_t * const stanza,
void * const userdata)
{
xmppipe_state_t *state = userdata;
char *name = NULL;
name = xmpp_stanza_get_name(stanza);
if (!name)
return 1;
if (XMPPIPE_STREQ(name, "iq")
|| XMPPIPE_STREQ(name, "message")
|| XMPPIPE_STREQ(name, "presence"))
state->sm_ack_recv++;
return 1;
}
int
handle_sm_enabled(xmpp_conn_t * const conn, xmpp_stanza_t * const stanza,
void * const userdata)
{
xmppipe_state_t *state = userdata;
state->sm_enabled = 1;
return 0;
}
int
handle_sm_request(xmpp_conn_t * const conn, xmpp_stanza_t * const stanza,
void * const userdata)
{
xmppipe_state_t *state = userdata;
xmpp_stanza_t *a = NULL;
char h[11] = {0};
if (state->sm_request % state->sm_request_interval != 0)
return 1;
(void)snprintf(h, sizeof(h), "%u", state->sm_ack_recv);
/* <a xmlns='urn:xmpp:sm:3' h='1'/> */
a = xmpp_stanza_new(state->ctx);
xmpp_stanza_set_name(a, "a");
xmpp_stanza_set_ns(a, "urn:xmpp:sm:3");
xmpp_stanza_set_attribute(a, "h", h);
xmpp_send(state->conn, a);
xmpp_stanza_release(a);
return 1;
}
int
handle_sm_ack(xmpp_conn_t * const conn, xmpp_stanza_t * const stanza,
void * const userdata)
{
xmppipe_state_t *state = userdata;
char *h = NULL;
u_int32_t ack = 0;
h = xmpp_stanza_get_attribute(stanza, "h");
ack = (u_int32_t)atoi(h); /* XXX */
if (state->verbose)
(void)fprintf(stderr, "SM: request=%u ack=%u last=%u\n",
state->sm_request, ack, state->sm_ack_sent);
/* Number of stanzas received by server exceeds the number sent by
* the client.
*/
if (ack > state->sm_request)
goto XMPPIPE_STREAMERR;
/* Server count not incremented since last request (stanzas may have
* been dropped).
*
* Could resend dropped stanzas.
*
*/
if (ack == state->sm_ack_sent)
goto XMPPIPE_STREAMERR;
state->sm_ack_sent = ack;
return 1;
XMPPIPE_STREAMERR:
xmppipe_stream_close(state);
errx(EXIT_FAILURE, "ack sequence mismatch: request=%u, ack=%u\n",
state->sm_request, state->sm_ack_sent);
}
int
handle_disco_items(xmpp_conn_t * const conn, xmpp_stanza_t * const stanza,
void * const userdata)
@ -459,7 +612,7 @@ handle_disco_items(xmpp_conn_t * const conn, xmpp_stanza_t * const stanza,
xmpp_stanza_add_child(iq, reply);
xmpp_send(conn, iq);
xmppipe_send(state, iq);
xmpp_stanza_release(iq);
}
@ -547,7 +700,7 @@ handle_version(xmpp_conn_t * const conn, xmpp_stanza_t * const stanza,
xmpp_stanza_add_child(reply, query);
xmpp_send(conn, reply);
xmppipe_send(state, reply);
xmpp_stanza_release(reply);
return 1;
}
@ -748,7 +901,7 @@ xmppipe_muc_join(xmppipe_state_t *state)
xmpp_stanza_add_child(presence, x);
xmpp_send(state->conn, presence);
xmppipe_send(state, presence);
xmpp_stanza_release(presence);
}
@ -777,7 +930,7 @@ xmppipe_muc_unlock(xmppipe_state_t *state)
xmpp_stanza_add_child(q, x);
xmpp_stanza_add_child(iq, q);
xmpp_send(state->conn, iq);
xmppipe_send(state, iq);
xmpp_stanza_release(iq);
}
@ -802,7 +955,7 @@ xmppipe_muc_subject(xmppipe_state_t *state, char *buf)
xmpp_stanza_add_child(subject, text);
xmpp_stanza_add_child(message, subject);
xmpp_send(state->conn, message);
xmppipe_send(state, message);
xmpp_stanza_release(message);
}
@ -831,7 +984,7 @@ xmppipe_send_message(xmppipe_state_t *state, char *to, char *type, char *buf)
xmpp_stanza_add_child(body, text);
xmpp_stanza_add_child(message, body);
xmpp_send(state->conn, message);
xmppipe_send(state, message);
xmpp_stanza_release(message);
free(id);
}
@ -854,13 +1007,43 @@ xmppipe_ping(xmppipe_state_t *state)
xmpp_stanza_add_child(iq, ping);
xmpp_send(state->conn, iq);
xmppipe_send(state, iq);
xmpp_stanza_release(iq);
state->keepalive_fail++;
xmpp_id_handler_add(state->conn, handle_ping_reply, "c2s1", state);
}
void
xmppipe_send(xmppipe_state_t *state, xmpp_stanza_t *const stanza)
{
xmpp_stanza_t *r = NULL;
state->sm_request++;
xmpp_send(state->conn, stanza);
if (!state->sm_enabled)
return;
if (state->sm_request % state->sm_request_interval != 0)
return;
r = xmpp_stanza_new(state->ctx);
xmpp_stanza_set_name(r, "r");
xmpp_stanza_set_ns(r, "urn:xmpp:sm:3");
xmpp_send(state->conn, r);
xmpp_stanza_release(r);
}
void
xmppipe_stream_close(xmppipe_state_t *state)
{
if (state->sm_enabled)
xmpp_send_raw_string(state->conn, "</stream:stream>");
}
static void
usage(xmppipe_state_t *state)
{
@ -880,6 +1063,7 @@ usage(xmppipe_state_t *state)
" -e ignore stdin EOF\n"
" -s exit when MUC is empty\n"
" -I <interval> request stream management status ever interval messages\n"
" -k <ms> periodically send a keepalive\n"
" -K <count> number of keepalive failures before exiting\n"
" -m <size> size of read buffer\n"

@ -21,7 +21,7 @@
#include <strophe.h>
#define XMPPIPE_VERSION "0.4.0"
#define XMPPIPE_VERSION "0.5.0"
#define XMPPIPE_STREQ(a,b) !strcmp((a),(b))
#define XMPPIPE_STRNEQ(a,b) strcmp((a),(b))
@ -52,6 +52,7 @@ enum {
typedef struct {
xmpp_ctx_t *ctx;
xmpp_conn_t *conn;
int handled;
char *room; /* room, room@conference.xmpp.example.com */
char *server; /* xmpp.example.com */
@ -70,6 +71,14 @@ typedef struct {
u_int32_t interval; /* time since last keepalive (milliseconds) */
size_t bufsz; /* size of read buffer */
int sm_enabled; /* stanzas: iq, message, presence */
u_int32_t sm_request; /* count of sent stanzas */
u_int32_t sm_request_interval; /* request ack every interval stanzas */
u_int32_t sm_ack_recv; /* count of stanzas received from server */
u_int32_t sm_ack_sent; /* count of stanzas sent from client from server */
int opt;
int verbose;
} xmppipe_state_t;

Loading…
Cancel
Save