Browse Source

Deprecate dml_poll in favor of glib main loop

master
Jeroen Vreeken 1 year ago
parent
commit
32c635f27e
23 changed files with 222 additions and 621 deletions
  1. +3
    -4
      Makefile.am
  2. +7
    -0
      configure.ac
  3. +1
    -0
      dml/dml.h
  4. +2
    -1
      dml/dml_connection.h
  5. +0
    -45
      dml/dml_poll.h
  6. +3
    -1
      dml/dml_server.h
  7. +11
    -8
      dml_client.c
  8. +17
    -11
      dml_connection.c
  9. +10
    -18
      dml_fprs_db.c
  10. +5
    -8
      dml_host.c
  11. +24
    -20
      dml_httpd.c
  12. +1
    -2
      dml_list.c
  13. +0
    -339
      dml_poll.c
  14. +14
    -23
      dml_reflector.c
  15. +8
    -7
      dml_server.c
  16. +2
    -2
      dml_stream_client.c
  17. +1
    -2
      dml_stream_client_codec2.c
  18. +11
    -15
      dml_stream_client_simple.c
  19. +12
    -12
      dml_streamer.c
  20. +28
    -35
      dml_trx.c
  21. +28
    -31
      dmld.c
  22. +13
    -14
      fprs_aprsis.c
  23. +21
    -23
      trx_dv.c

+ 3
- 4
Makefile.am View File

@ -1,9 +1,9 @@
ACLOCAL_AMFLAGS=-I m4
CFLAGS+= -Wall -Werror -O3
CFLAGS+= -Wall -Werror -O3 @GLIB_CFLAGS@
nobase_include_HEADERS = dml/dml.h dml/dml_client.h dml/dml_connection.h dml/dml_crypto.h dml/dml_host.h dml/dml_id.h dml/dml_packet.h dml/dml_poll.h dml/dml_route.h dml/dml_stream.h dml/dml_server.h
nobase_include_HEADERS = dml/dml.h dml/dml_client.h dml/dml_connection.h dml/dml_crypto.h dml/dml_host.h dml/dml_id.h dml/dml_packet.h dml/dml_route.h dml/dml_stream.h dml/dml_server.h
lib_LTLIBRARIES=libdml.la
@ -16,13 +16,12 @@ libdml_la_SOURCES = \
dml_host.c \
dml_id.c \
dml_packet.c \
dml_poll.c \
dml_route.c \
dml_server.c \
dml_stream.c
libdml_la_CFLAGS=-fPIC
libdml_la_LDFLAGS= -shared -fPIC -version-info 0:0:0 @LIB_LDFLAGS@
libdml_la_LDFLAGS= -shared -fPIC -version-info 0:0:0 @LIB_LDFLAGS@ @GLIB_LIBS@
bin_PROGRAMS = dmld dml_list dml_streamer dml_stream_client dml_stream_client_codec2 dml_fprs_db


+ 7
- 0
configure.ac View File

@ -98,6 +98,13 @@ AS_IF([test "x$libmagic_found_headers" = "xyes" && test "x$libmagic_found_lib" =
[enable_dml_httpd="yes"], [enable_dml_httpd="no"]
)
# glib
PKG_CHECK_MODULES([GLIB], [glib-2.0])
# build selection logic
AS_IF([test "x$EMSCRIPTEN_FOUND" = "xno" ],
[enable_dml_trx="yes"; enable_dml_reflector="yes" ],
[enable_dml_trx="no"; enable_dml_reflector="no" ]


+ 1
- 0
dml/dml.h View File

@ -21,6 +21,7 @@
#include <stdint.h>
#include <stdbool.h>
#include <time.h>
#include <glib.h>
#define DML_VERSION "0.2"


+ 2
- 1
dml/dml_connection.h View File

@ -20,6 +20,7 @@
#include <stdint.h>
#include <stdbool.h>
#include <glib.h>
struct dml_connection;
@ -31,7 +32,7 @@ struct dml_connection *dml_connection_create(int fd,
int dml_connection_destroy(struct dml_connection *dc);
int dml_connection_fd_get(struct dml_connection *dc);
int dml_connection_handle(struct dml_connection *dc);
gboolean dml_connection_handle(GIOChannel *source, GIOCondition condition, gpointer arg);
int dml_connection_send(struct dml_connection *dc, void *datav, uint16_t id, uint16_t len);
bool dml_connection_send_empty(struct dml_connection *dc);
int dml_connection_send_data(struct dml_connection *dc, void *datav, uint16_t id, uint16_t len);


+ 0
- 45
dml/dml_poll.h View File

@ -1,45 +0,0 @@
/*
Copyright Jeroen Vreeken (jeroen@vreeken.net), 2015
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _INCLUDE_DML_POLL_H_
#define _INCLUDE_DML_POLL_H_
#include <time.h>
#include <stdbool.h>
#include <poll.h>
int dml_poll_add(void *arg,
int (*in_cb)(void *arg),
int (*out_cb)(void *arg),
int (*time_cb)(void *arg)
);
int dml_poll_add_multiple(void *arg,
int (*in_cb)(void *arg),
int (*out_cb)(void *arg),
int (*time_cb)(void *arg),
short (*revents_cb)(void *arg, struct pollfd *fds, int count),
int nr_fds,
struct pollfd **fds
);
int dml_poll_remove(void *arg);
int dml_poll_fd_set(void *arg, int fd);
int dml_poll_in_set(void *arg, bool enable);
int dml_poll_out_set(void *arg, bool enable);
int dml_poll_timeout(void *arg, struct timespec *ts);
int dml_poll_loop(void);
#endif /* _INCLUDE_DML_POLL_H_ */

+ 3
- 1
dml/dml_server.h View File

@ -18,11 +18,13 @@
#ifndef _INCLUDE_DML_SERVER_H_
#define _INCLUDE_DML_SERVER_H_
#include <glib.h>
struct dml_server;
struct dml_server *dml_server_create(void (*cb)(void *arg, int fd), void *arg);
int dml_server_fd_get(struct dml_server *ds);
int dml_server_handle(struct dml_server *ds);
gboolean dml_server_handle(GIOChannel *source, GIOCondition condition, gpointer arg);
#define DML_SERVER_PORT 7373


+ 11
- 8
dml_client.c View File

@ -19,11 +19,11 @@
#include <dml/dml_client.h>
#include <dml/dml_server.h>
#include <dml/dml_poll.h>
#include <dml_config.h>
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/socket.h>
@ -38,6 +38,7 @@
struct dml_client {
int fd;
GIOChannel *io;
char *host;
unsigned short port;
@ -65,6 +66,7 @@ struct dml_client *dml_client_create(char *host, unsigned short port, void (*cb)
dc->port = port;
dc->fd = -1;
dc->io = NULL;
dc->connect_cb = cb;
dc->arg = arg;
@ -79,6 +81,7 @@ int dml_client_destroy(struct dml_client *dc)
{
if (dc->fd >= 0)
close(dc->fd);
g_io_channel_unref(dc->io);
free(dc->host);
free(dc);
@ -87,11 +90,11 @@ int dml_client_destroy(struct dml_client *dc)
}
static int dml_client_connect_success(void *arg)
static gboolean dml_client_connect_success(GIOChannel *source, GIOCondition condition, gpointer arg)
{
struct dml_client *dc = arg;
dml_poll_remove(dc);
g_source_remove_by_user_data(dc);
setsockopt (dc->fd, IPPROTO_TCP, TCP_NODELAY, &(int){1}, sizeof (int));
setsockopt (dc->fd, SOL_SOCKET, SO_KEEPALIVE, &(int){1}, sizeof (int));
@ -101,7 +104,8 @@ static int dml_client_connect_success(void *arg)
dc->connect_cb(dc, dc->arg);
return 0;
//TODO
return FALSE;
}
int dml_client_connect(struct dml_client *dc)
@ -155,11 +159,10 @@ int dml_client_connect(struct dml_client *dc)
free(port);
dc->fd = sock;
dc->io = g_io_channel_unix_new (sock);
g_io_channel_set_encoding(dc->io, NULL, NULL);
dml_poll_add(dc, NULL, dml_client_connect_success, NULL);
dml_poll_fd_set(dc, sock);
dml_poll_in_set(dc, false);
dml_poll_out_set(dc, true);
g_io_add_watch(dc->io, G_IO_OUT, dml_client_connect_success, dc);
return 0;


+ 17
- 11
dml_connection.c View File

@ -17,7 +17,6 @@
*/
#include <dml/dml_connection.h>
#include <dml/dml_packet.h>
#include <dml/dml_poll.h>
#include <stdlib.h>
#include <unistd.h>
@ -36,6 +35,7 @@ enum connection_rx_state {
struct dml_connection {
int fd;
GIOChannel *io;
enum connection_rx_state rx_state;
uint8_t rx_data[DML_PACKET_SIZE_MAX];
@ -71,16 +71,15 @@ struct dml_connection *dml_connection_create(int fd,
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
dc->fd = fd;
dc->io = g_io_channel_unix_new (fd);
g_io_channel_set_encoding(dc->io, NULL, NULL);
dc->rx_state = CONNECTION_HEADER;
dc->rx_cb = rx_cb;
dc->close_cb = close_cb;
dc->arg = arg;
dml_poll_add(dc, (int (*)(void *))dml_connection_handle, (int (*)(void *))dml_connection_handle, NULL);
// printf("new connection fd: %d\n", fd);
dml_poll_fd_set(dc, fd);
dml_poll_in_set(dc, true);
dml_poll_out_set(dc, false);
g_io_add_watch(dc->io, G_IO_IN, dml_connection_handle, dc);
return dc;
err_fcntl:
@ -92,8 +91,10 @@ err_calloc:
int dml_connection_destroy(struct dml_connection *dc)
{
// printf("close %p fd: %d\n", dc, dc->fd);
dml_poll_remove(dc);
g_source_remove_by_user_data(dc);
g_source_remove_by_user_data(dc);
close(dc->fd);
g_io_channel_unref(dc->io);
dc->rx_cb = NULL;
free(dc);
@ -116,7 +117,9 @@ static int dml_connection_output(struct dml_connection *dc)
dc->tx_pos += r;
}
if (dc->tx_pos >= dc->tx_len) {
dml_poll_out_set(dc, false);
g_source_remove_by_user_data(dc);
g_source_remove_by_user_data(dc);
g_io_add_watch(dc->io, G_IO_IN, dml_connection_handle, dc);
dc->tx_len = 0;
dc->tx_pos = 0;
}
@ -125,8 +128,9 @@ static int dml_connection_output(struct dml_connection *dc)
return 0;
}
int dml_connection_handle(struct dml_connection *dc)
gboolean dml_connection_handle(GIOChannel *source, GIOCondition condition, gpointer arg)
{
struct dml_connection *dc = arg;
// printf("handle %p\n", dc);
ssize_t r = 0;
@ -163,8 +167,10 @@ int dml_connection_handle(struct dml_connection *dc)
}
}
//TODO
if (r == 0 || (r < 0 && errno != EAGAIN)) {
dml_poll_remove(dc);
g_source_remove_by_user_data(dc);
g_source_remove_by_user_data(dc);
if (dc->close_cb)
return dc->close_cb(dc, dc->arg);
@ -174,7 +180,7 @@ int dml_connection_handle(struct dml_connection *dc)
dml_connection_output(dc);
return 0;
return TRUE;
}
int dml_connection_send(struct dml_connection *dc, void *datav, uint16_t id, uint16_t len)
@ -203,7 +209,7 @@ int dml_connection_send(struct dml_connection *dc, void *datav, uint16_t id, uin
dml_connection_output(dc);
if (dc->tx_len)
dml_poll_out_set(dc, true);
g_io_add_watch(dc->io, G_IO_OUT, dml_connection_handle, dc);
return 0;
}


+ 10
- 18
dml_fprs_db.c View File

@ -18,7 +18,6 @@
#include <dml/dml_client.h>
#include <dml/dml_connection.h>
#include <dml/dml_host.h>
#include <dml/dml_poll.h>
#include <dml/dml_packet.h>
#include <dml/dml.h>
#include <dml/dml_id.h>
@ -192,7 +191,7 @@ void message_cb(struct fprs_frame *frame)
aprs_msg_nr++;
}
static int fprs_timer(void *arg)
static gboolean fprs_timer(void *arg)
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
@ -217,23 +216,21 @@ static int fprs_timer(void *arg)
}
}
dml_poll_timeout(&fprs_timer,
&(struct timespec){ DML_FPRS_DB_TIMER, 0});
return 0;
g_timeout_add_seconds(DML_FPRS_DB_TIMER, fprs_timer, &fprs_timer);
return G_SOURCE_REMOVE;
}
static int fprs_req_timer(void *arg)
static gboolean fprs_req_timer(void *arg)
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
fprs_parse_request_flush(send_data, NULL);
dml_poll_timeout(&fprs_timer,
&(struct timespec){ DML_FPRS_REQ_TIMER, 0});
g_timeout_add_seconds(DML_FPRS_REQ_TIMER, fprs_req_timer, &fprs_req_timer);
return 0;
return G_SOURCE_REMOVE;
}
int main(int argc, char **argv)
@ -334,15 +331,10 @@ int main(int argc, char **argv)
dml_host_stream_req_reverse_connect_cb_set(host, stream_req_reverse_connect_cb, NULL);
dml_host_stream_req_reverse_disconnect_cb_set(host, stream_req_reverse_disconnect_cb, NULL);
dml_poll_add(&fprs_timer, NULL, NULL, fprs_timer);
dml_poll_add(&fprs_req_timer, NULL, NULL, fprs_req_timer);
dml_poll_timeout(&fprs_timer,
&(struct timespec){ DML_FPRS_DB_TIMER, 0});
dml_poll_timeout(&fprs_req_timer,
&(struct timespec){ DML_FPRS_REQ_TIMER, 0});
g_timeout_add_seconds(DML_FPRS_DB_TIMER, fprs_timer, &fprs_timer);
g_timeout_add_seconds(DML_FPRS_REQ_TIMER, fprs_req_timer, &fprs_req_timer);
dml_poll_loop();
g_main_loop_run(g_main_loop_new(NULL, false));
return 0;
}

+ 5
- 8
dml_host.c View File

@ -22,7 +22,6 @@
#include <dml/dml_connection.h>
#include <dml/dml_crypto.h>
#include <dml/dml_packet.h>
#include <dml/dml_poll.h>
#include <string.h>
#include <stdio.h>
@ -361,16 +360,16 @@ int dml_host_connect(struct dml_host *host, struct dml_stream *ds)
return 0;
}
static int client_reconnect(void *arg)
static gboolean client_reconnect(void *arg)
{
struct dml_host *host = arg;
if (dml_client_connect(host->client)) {
printf("Reconnect to DML server failed\n");
dml_poll_timeout(host, &(struct timespec){ 2, 0 });
g_timeout_add_seconds(2, client_reconnect, host);
}
return 0;
return G_SOURCE_REMOVE;
}
@ -389,8 +388,7 @@ static int client_connection_close(struct dml_connection *dc, void *arg)
if (host->connection_closed_cb)
host->connection_closed_cb(host, host->connection_closed_cb_arg);
dml_poll_add(host, NULL, NULL, client_reconnect);
dml_poll_timeout(host, &(struct timespec){ 1, 0 });
g_timeout_add_seconds(1, client_reconnect, host);
if (dc) {
return dml_connection_destroy(dc);
@ -510,8 +508,7 @@ struct dml_host *dml_host_create(char *server)
if (dml_client_connect(host->client)) {
printf("Could not connect to server\n");
dml_poll_add(host, NULL, NULL, client_reconnect);
dml_poll_timeout(host, &(struct timespec){ 2, 0 });
g_timeout_add_seconds(2, client_reconnect, host);
}
err_alloc:


+ 24
- 20
dml_httpd.c View File

@ -36,7 +36,6 @@
#include "dml_config.h"
#include <dml/dml_connection.h>
#include <dml/dml_packet.h>
#include <dml/dml_poll.h>
magic_t magic;
@ -57,6 +56,7 @@ struct writebuf {
struct ws_client {
struct lws *wsi;
GIOChannel *io;
struct writebuf *writeq;
@ -368,19 +368,19 @@ err_ws:
return;
}
int wsi_in_cb(void *arg)
gboolean wsi_in_cb(GIOChannel *source, GIOCondition condition, gpointer arg)
{
// struct lws *wsi = arg;
lws_service(lws_context, 0);
return 0;
return TRUE;
}
int wsi_out_cb(void *arg)
gboolean wsi_out_cb(GIOChannel *source, GIOCondition condition, gpointer arg)
{
// struct lws *wsi = arg;
lws_service(lws_context, 0);
return 0;
return TRUE;
}
@ -498,25 +498,29 @@ static int callback_http(struct lws *wsi, enum lws_callback_reasons reason,
r = -1;
break;
case LWS_CALLBACK_CHANGE_MODE_POLL_FD: {
g_source_remove_by_user_data(wsi);
g_source_remove_by_user_data(wsi);
//fallthrough
}
case LWS_CALLBACK_ADD_POLL_FD: {
struct ws_client *ws_client = ws_client_get_by_wsi(wsi);
if (!ws_client) {
ws_client = ws_client_add(wsi);
}
struct lws_pollargs *args = in;
dml_poll_add(wsi, wsi_in_cb, wsi_out_cb, NULL);
dml_poll_fd_set(wsi, args->fd);
dml_poll_in_set(wsi, args->events & POLLIN);
dml_poll_out_set(wsi, args->events & POLLOUT);
ws_client->io = g_io_channel_unix_new(args->fd);
g_io_channel_set_encoding(ws_client->io, NULL, NULL);
if (args->events & POLLIN)
g_io_add_watch(ws_client->io, G_IO_IN, wsi_in_cb, wsi);
if (args->events & POLLOUT)
g_io_add_watch(ws_client->io, G_IO_OUT, wsi_out_cb, wsi);
break;
}
case LWS_CALLBACK_DEL_POLL_FD: {
dml_poll_remove(wsi);
break;
}
case LWS_CALLBACK_CHANGE_MODE_POLL_FD: {
struct lws_pollargs *args = in;
dml_poll_fd_set(wsi, args->fd);
dml_poll_in_set(wsi, args->events & POLLIN);
dml_poll_out_set(wsi, args->events & POLLOUT);
g_source_remove_by_user_data(wsi);
g_source_remove_by_user_data(wsi);
break;
}
@ -593,7 +597,7 @@ int main(int argc, char **argv)
printf("starting server...\n");
dml_poll_loop();
g_main_loop_run(g_main_loop_new(NULL, false));
lws_context_destroy(lws_context);
magic_close(magic);


+ 1
- 2
dml_list.c View File

@ -17,7 +17,6 @@
*/
#include <dml/dml_client.h>
#include <dml/dml_connection.h>
#include <dml/dml_poll.h>
#include <dml/dml_packet.h>
#include <dml/dml.h>
#include <dml/dml_id.h>
@ -149,7 +148,7 @@ int main(int argc, char **argv)
return -1;
}
dml_poll_loop();
g_main_loop_run(g_main_loop_new(NULL, false));
return 0;
}

+ 0
- 339
dml_poll.c View File

@ -1,339 +0,0 @@
/*
Copyright Jeroen Vreeken (jeroen@vreeken.net), 2015
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <dml/dml_poll.h>
#include <poll.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdint.h>
//#define DEBUG_T
struct dml_poll {
struct dml_poll *next;
int pfd_nr;
int pfd_size;
bool use_revents_cb;
void *arg;
int (*in_cb)(void *arg);
int (*out_cb)(void *arg);
int (*time_cb)(void *arg);
struct timespec timeout;
short (*revents_cb)(void *arg, struct pollfd *fds, int count);
};
static struct dml_poll *dml_poll_list;
static struct pollfd *pfds = NULL;
static nfds_t nfds = 0;
int dml_poll_add(void *arg,
int (*in_cb)(void *arg),
int (*out_cb)(void *arg),
int (*time_cb)(void *arg)
)
{
struct dml_poll *dp;
int pfd_nr;
// printf("+add: %p\n", arg);
for (dp = dml_poll_list; dp; dp = dp->next) {
if (dp->arg == arg)
break;
}
if (!dp) {
pfds = realloc(pfds, sizeof(struct pollfd) * (nfds + 1));
pfd_nr = nfds;
memset(pfds + pfd_nr, 0, sizeof(struct pollfd));
pfds[pfd_nr].fd = -1;
nfds++;
dp = calloc(1, sizeof(struct dml_poll));
dp->pfd_nr = pfd_nr;
dp->pfd_size = 1;
dp->arg = arg;
dp->next = dml_poll_list;
dml_poll_list = dp;
}
dp->in_cb = in_cb;
dp->out_cb = out_cb;
dp->time_cb = time_cb;
// printf("=add: %p\n", dp);
return 0;
}
int dml_poll_add_multiple(void *arg,
int (*in_cb)(void *arg),
int (*out_cb)(void *arg),
int (*time_cb)(void *arg),
short (*revents_cb)(void *arg, struct pollfd *fds, int count),
int nr_fds,
struct pollfd **fds
)
{
struct dml_poll *dp;
int pfd_nr;
for (dp = dml_poll_list; dp; dp = dp->next) {
if (dp->arg == arg)
break;
}
if (!dp) {
int i;
pfds = realloc(pfds, sizeof(struct pollfd) * (nfds + nr_fds));
pfd_nr = nfds;
memset(pfds + pfd_nr, 0, sizeof(struct pollfd) * nr_fds);
for (i = 0; i < nr_fds; i++)
pfds[pfd_nr + i].fd = -1;
*fds = pfds + pfd_nr;
nfds += nr_fds;
dp = calloc(1, sizeof(struct dml_poll));
dp->pfd_nr = pfd_nr;
dp->pfd_size = nr_fds;
dp->arg = arg;
dp->use_revents_cb = true;
dp->next = dml_poll_list;
dml_poll_list = dp;
}
dp->in_cb = in_cb;
dp->out_cb = out_cb;
dp->time_cb = time_cb;
dp->revents_cb = revents_cb;
// printf("=add: %p\n", dp);
return 0;
}
int dml_poll_remove(void *arg)
{
struct dml_poll **dp;
int pfd_nr = -1;
int pfd_size = 0;
for (dp = &dml_poll_list; *dp; dp = &(*dp)->next) {
if ((*dp)->arg == arg) {
struct dml_poll *old = *dp;
pfd_nr = old->pfd_nr;
pfd_size = old->pfd_size;
*dp = old->next;
free(old);
break;
}
}
// printf("remove prd_nr %d %d\n", (int)pfd_nr, (int) nfds);
if (pfd_nr < 0)
return 0;
for (dp = &dml_poll_list; *dp; dp = &(*dp)->next) {
// printf("- %p %d\n", *dp, (*dp)->pfd_nr);
if ((*dp)->pfd_nr > pfd_nr)
(*dp)->pfd_nr -= pfd_size;
}
memmove(pfds + pfd_nr, pfds + pfd_nr + pfd_size, sizeof(*pfds) * (nfds - pfd_nr - pfd_size));
nfds -= pfd_size;
pfds = realloc(pfds, sizeof(*pfds) * nfds);
return 0;
}
int dml_poll_fd_set(void *arg, int fd)
{
struct dml_poll *dp;
for (dp = dml_poll_list; dp; dp = dp->next) {
if (dp->arg == arg)
pfds[dp->pfd_nr].fd = fd;
}
return 0;
}
int dml_poll_in_set(void *arg, bool enable)
{
struct dml_poll *dp;
for (dp = dml_poll_list; dp; dp = dp->next) {
if (dp->arg == arg) {
pfds[dp->pfd_nr].events &= ~POLLIN;
pfds[dp->pfd_nr].events |= enable ? POLLIN : 0;
}
}
return 0;
}
int dml_poll_out_set(void *arg, bool enable)
{
struct dml_poll *dp;
for (dp = dml_poll_list; dp; dp = dp->next) {
if (dp->arg == arg) {
pfds[dp->pfd_nr].events &= ~POLLOUT;
pfds[dp->pfd_nr].events |= enable ? POLLOUT : 0;
}
}
return 0;
}
int dml_poll_timeout(void *arg, struct timespec *ts)
{
struct dml_poll *dp;
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
for (dp = dml_poll_list; dp; dp = dp->next) {
if (dp->arg == arg) {
if (!ts->tv_sec && !ts->tv_nsec) {
dp->timeout.tv_sec = 0;
return 0;
}
dp->timeout.tv_sec = ts->tv_sec + now.tv_sec;
dp->timeout.tv_nsec = ts->tv_nsec + now.tv_nsec;
if (dp->timeout.tv_nsec >= 1000000000) {
dp->timeout.tv_nsec -= 1000000000;
dp->timeout.tv_sec++;
}
// printf("set timeout %d %d\n", (int)dp->timeout.tv_sec, (int)dp->timeout.tv_nsec);
return 0;
}
}
return 0;
}
#ifdef DEBUG_T
static struct timespec dml_poll_t;
static double t_max = 0;
static void dml_poll_tstart(void)
{
clock_gettime(CLOCK_MONOTONIC, &dml_poll_t);
}
static void dml_poll_tend(char *name)
{
struct timespec t;
clock_gettime(CLOCK_MONOTONIC, &t);
double dur = t.tv_sec - dml_poll_t.tv_sec;
dur += (double)(t.tv_nsec - dml_poll_t.tv_nsec) / 1000000000.0;
if (dur > t_max || dur > 0.002) {
t_max = dur;
printf("dml_poll callback '%s' duration: %f\n", name, dur);
}
}
#else
static void dml_poll_tstart(void)
{
}
static void dml_poll_tend(char *name)
{
}
#endif
int dml_poll_loop(void)
{
do {
int64_t t = 0;
struct dml_poll *dp;
for (dp = dml_poll_list; dp; dp = dp->next) {
if (dp->timeout.tv_sec) {
int64_t dptimeout = (int64_t)dp->timeout.tv_sec * 1000;
dptimeout += ((int64_t)dp->timeout.tv_nsec + 999999) / 1000000;
if (t)
t = t > dptimeout ? dptimeout : t;
else
t = dptimeout;
}
// printf("%d %d\n", (int)dp->timeout, (int)t);
}
int timeout;
if (t) {
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
t -= (int64_t)now.tv_sec * 1000;
t -= (int64_t)now.tv_nsec / 1000000;
timeout = t;
if (timeout < 0)
timeout = 0;
} else {
timeout = -1;
}
// printf("Poll with %d fds and timeout: %d\n", (int)nfds, timeout);
poll(pfds, nfds, timeout);
for (dp = dml_poll_list; dp; dp = dp->next) {
struct pollfd *p = &pfds[dp->pfd_nr];
// printf("%p %p nr: %d fd: %d\n", dp, dp->arg, (int)dp->pfd_nr, p->fd);
if (p->fd >= 0) {
short revents;
if (!dp->use_revents_cb) {
revents = p->revents;
} else {
dml_poll_tstart();
revents = dp->revents_cb(dp->arg, p, dp->pfd_size);
dml_poll_tend("revents_cb");
}
// printf("%p %d: %x %x\n", dp, dp->pfd_nr, p->revents, p->events);
if (revents & POLLIN) {
dml_poll_tstart();
dp->in_cb(dp->arg);
dml_poll_tend("in_cb");
break;
}
if (revents & POLLOUT) {
dml_poll_tstart();
dp->out_cb(dp->arg);
dml_poll_tend("out_cb");
break;
}
}
if (dp->timeout.tv_sec) {
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
if (dp->timeout.tv_sec < now.tv_sec ||
(dp->timeout.tv_sec == now.tv_sec &&
dp->timeout.tv_nsec <= now.tv_nsec)) {
dp->timeout.tv_sec = 0;
dml_poll_tstart();
dp->time_cb(dp->arg);
dml_poll_tend("time_cb");
break;
}
}
}
} while (1);
return 0;
}

+ 14
- 23
dml_reflector.c View File

@ -17,7 +17,6 @@
*/
#include <dml/dml_client.h>
#include <dml/dml_connection.h>
#include <dml/dml_poll.h>
#include <dml/dml_packet.h>
#include <dml/dml.h>
#include <dml/dml_id.h>
@ -38,7 +37,7 @@
#include <time.h>
#define DML_REFLECTOR_PARROT_WAIT (500*1000*1000)
#define DML_REFLECTOR_PARROT_WAIT (500)
#define DML_REFLECTOR_PARROT_MAX (60*60*50)
#define DML_REFLECTOR_DATA_KEEPALIVE 10
@ -56,7 +55,7 @@ struct dml_host *host;
struct dml_crypto_key *dk;
void send_beep(void);
static int watchdog(void *arg);
static gboolean watchdog(void *arg);
enum sound_msg {
SOUND_MSG_HEADER,
@ -109,8 +108,8 @@ void send_data(void *data, size_t size, uint64_t timestamp)
uint64_t tmax;
uint16_t packet_id = dml_stream_data_id_get(stream_dv);
dml_poll_timeout(&watchdog,
&(struct timespec){ DML_REFLECTOR_DATA_KEEPALIVE, 0});
g_source_remove_by_user_data(&watchdog);
g_timeout_add_seconds(DML_REFLECTOR_DATA_KEEPALIVE, watchdog, &watchdog);
if (!packet_id)
return;
@ -147,7 +146,7 @@ struct parrot_data *parrot_queue = NULL;
struct timespec parrot_ts;
int parrot_dequeue(void *data)
static gboolean parrot_dequeue(void *data)
{
uint64_t parrot_timestamp;
uint16_t packet_id = dml_stream_data_id_get(stream_dv);
@ -156,8 +155,7 @@ int parrot_dequeue(void *data)
if (parrot_queue) {
struct parrot_data *entry = parrot_queue;
dml_poll_timeout(&watchdog,
&(struct timespec){ DML_REFLECTOR_DATA_KEEPALIVE, 0});
g_timeout_add_seconds(DML_REFLECTOR_DATA_KEEPALIVE, parrot_dequeue, &watchdog);
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
@ -176,8 +174,7 @@ int parrot_dequeue(void *data)
waitms = 1;
}
dml_poll_timeout(&parrot_queue,
&(struct timespec){ 0, waitms * 1000000});
g_timeout_add(waitms, parrot_dequeue, &parrot_queue);
parrot_timestamp = dml_ts2timestamp(&parrot_ts);
printf("e %016"PRIx64" %ld %ld %d\n", parrot_timestamp, diff, waitms, entry->duration);
@ -209,7 +206,7 @@ printf("= %016"PRIx64"\n", parrot_timestamp);
parrot_ts.tv_sec = 0;
}
return 0;
return G_SOURCE_REMOVE;
}
void parrot_queue_add(void *data, size_t size, int duration)
@ -234,8 +231,8 @@ void parrot_queue_add(void *data, size_t size, int duration)
*listp = entry;
dml_poll_timeout(&parrot_queue,
&(struct timespec){ 0, DML_REFLECTOR_PARROT_WAIT });
g_source_remove_by_user_data(&parrot_queue);
g_timeout_add(DML_REFLECTOR_PARROT_WAIT, parrot_dequeue, &parrot_queue);
}
@ -298,7 +295,7 @@ printf("%ld ", ts.tv_sec);
send_data(data, beepsize + 8, timestamp);
}
static int watchdog(void *arg)
static gboolean watchdog(void *arg)
{
struct timespec ts;
uint64_t timestamp;
@ -318,7 +315,7 @@ printf("%ld ", ts.tv_sec);
send_data(data, 8, timestamp);
return 0;
return G_SOURCE_REMOVE;
}
@ -410,15 +407,9 @@ int main(int argc, char **argv)
}
}
if (parrot)
dml_poll_add(&parrot_queue, NULL, NULL, parrot_dequeue);
dml_poll_add(&watchdog, NULL, NULL, watchdog);
dml_poll_timeout(&watchdog,
&(struct timespec){ DML_REFLECTOR_DATA_KEEPALIVE, 0});
g_timeout_add_seconds(DML_REFLECTOR_DATA_KEEPALIVE, watchdog, &watchdog);
dml_poll_loop();
g_main_loop_run(g_main_loop_new(NULL, false));
return 0;
}

+ 8
- 7
dml_server.c View File

@ -16,7 +16,6 @@
*/
#include <dml/dml_server.h>
#include <dml/dml_poll.h>
#include <string.h>
#include <malloc.h>
@ -30,6 +29,7 @@
struct dml_server {
int fd;
GIOChannel *io;
void (*connection_cb)(void *arg, int fd);
void *connection_cb_arg;
};
@ -64,12 +64,12 @@ struct dml_server *dml_server_create(void (*cb)(void *arg, int fd), void *arg)
goto err_calloc;
ds->fd = listensock6;
ds->io = g_io_channel_unix_new (listensock6);
g_io_channel_set_encoding(ds->io, NULL, NULL);
ds->connection_cb = cb;
ds->connection_cb_arg = arg;
dml_poll_add(ds, (int (*)(void *))dml_server_handle, NULL, NULL);
dml_poll_fd_set(ds, listensock6);
dml_poll_in_set(ds, true);
g_io_add_watch(ds->io, G_IO_IN, dml_server_handle, ds);
return ds;
err_calloc:
@ -84,8 +84,9 @@ int dml_server_fd_get(struct dml_server *ds)
return ds->fd;
}
int dml_server_handle(struct dml_server *ds)
gboolean dml_server_handle(GIOChannel *source, GIOCondition condition, gpointer arg)
{
struct dml_server *ds = arg;
int acceptsock;
struct sockaddr_in6 from6;
socklen_t len6 = sizeof(from6);
@ -93,7 +94,7 @@ int dml_server_handle(struct dml_server *ds)
acceptsock = accept(ds->fd, (struct sockaddr *)&from6, &len6);
if (acceptsock < 0)
return -1;
return TRUE;
setsockopt(acceptsock, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof (int));
setsockopt(acceptsock, IPPROTO_TCP, TCP_NODELAY, &(int){1}, sizeof (int));
@ -104,6 +105,6 @@ int dml_server_handle(struct dml_server *ds)
ds->connection_cb(ds->connection_cb_arg, acceptsock);
return 0;
return TRUE;
}

+ 2
- 2
dml_stream_client.c View File

@ -15,7 +15,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <dml/dml_poll.h>
#include <dml/dml.h>
#include <dml/dml_id.h>
#include <dml/dml_crypto.h>
@ -91,7 +91,7 @@ int main(int argc, char **argv)
return -1;
}
dml_poll_loop();
g_main_loop_run(g_main_loop_new(NULL, false));
return 0;
}

+ 1
- 2
dml_stream_client_codec2.c View File

@ -17,7 +17,6 @@
*/
#define _GNU_SOURCE
#include <dml/dml_poll.h>
#include <dml/dml.h>
#include <dml/dml_id.h>
#include <dml/dml_crypto.h>
@ -282,7 +281,7 @@ int main(int argc, char **argv)
return -1;
}
dml_poll_loop();
g_main_loop_run(g_main_loop_new(NULL, false));
return 0;
}

+ 11
- 15
dml_stream_client_simple.c View File

@ -17,7 +17,6 @@
*/
#include <dml/dml_client.h>
#include <dml/dml_connection.h>
#include <dml/dml_poll.h>
#include <dml/dml_packet.h>
#include <dml/dml.h>
#include <dml/dml_id.h>
@ -51,7 +50,7 @@ struct dml_stream_client_simple {
char *mime;
};
static int keepalive_cb(void *arg)
static gboolean keepalive_cb(void *arg)
{
struct dml_stream_client_simple *dss = arg;
@ -66,10 +65,9 @@ static int keepalive_cb(void *arg)
//TODO What is the best way to trigger discovery?
}
dml_poll_timeout(dss,
&(struct timespec){ DML_STREAM_CLIENT_SIMPLE_KEEPALIVE, 0});
g_timeout_add_seconds(DML_STREAM_CLIENT_SIMPLE_KEEPALIVE, keepalive_cb, dss);
return 0;
return G_SOURCE_REMOVE;
}
static void rx_packet(struct dml_connection *dc, void *arg,
@ -228,8 +226,8 @@ static void rx_packet(struct dml_connection *dc, void *arg,
// fprintf(stderr, "Received %zd ok\n", payload_len);
dss->data_cb(dss->arg, payload_data, payload_len);
dml_poll_timeout(dss,
&(struct timespec){ DML_STREAM_CLIENT_SIMPLE_KEEPALIVE, 0});
g_source_remove_by_user_data(dss);
g_timeout_add_seconds(DML_STREAM_CLIENT_SIMPLE_KEEPALIVE, keepalive_cb, dss);
}
}
break;
@ -239,28 +237,26 @@ static void rx_packet(struct dml_connection *dc, void *arg,
return;
}
static int client_reconnect(void *arg)
static gboolean client_reconnect(void *arg)
{
struct dml_stream_client_simple *dss = arg;
if (dml_client_connect(dss->client)) {
printf("Reconnect to DML server failed\n");
dml_poll_timeout(dss, &(struct timespec){ DML_STREAM_CLIENT_SIMPLE_RECONNECT, 0 });
g_timeout_add_seconds(DML_STREAM_CLIENT_SIMPLE_RECONNECT, client_reconnect, dss);
} else {
printf("Reconnect to DML server successfull\n");
dml_poll_add(dss, NULL, NULL, keepalive_cb);
g_timeout_add_seconds(DML_STREAM_CLIENT_SIMPLE_KEEPALIVE, keepalive_cb, dss);
}
return 0;
return G_SOURCE_REMOVE;
}
static int client_connection_close(struct dml_connection *dc, void *arg)
{
struct dml_stream_client_simple *dss = arg;
dml_poll_add(dss, NULL, NULL, client_reconnect);
dml_poll_timeout(dss, &(struct timespec){ 1, 0 });
g_timeout_add_seconds(1, client_reconnect, dss);
if (dc)
dml_connection_destroy(dc);
@ -333,7 +329,7 @@ struct dml_stream_client_simple *dml_stream_client_simple_search_create(
if (dml_client_connect(client))
goto err_connect;
dml_poll_add(dss, NULL, NULL, keepalive_cb);
g_timeout_add_seconds(DML_STREAM_CLIENT_SIMPLE_KEEPALIVE, keepalive_cb, dss);
return dss;


+ 12
- 12
dml_streamer.c View File

@ -17,7 +17,6 @@
*/
#include <dml/dml_client.h>
#include <dml/dml_connection.h>
#include <dml/dml_poll.h>
#include <dml/dml_packet.h>
#include <dml/dml.h>
#include <dml/dml_id.h>
@ -105,16 +104,16 @@ void rx_packet(struct dml_connection *dc, void *arg,
return;
}
int client_reconnect(void *clientv)
gboolean client_reconnect(void *clientv)
{
struct dml_client *client = clientv;
if (dml_client_connect(client)) {
printf("Reconnect to DML server failed\n");
dml_poll_timeout(client, &(struct timespec){ 2, 0 });
g_timeout_add_seconds(2, client_reconnect, client);
}
return 0;
return G_SOURCE_REMOVE;
}
int client_connection_close(struct dml_connection *dc, void *arg)
@ -122,8 +121,7 @@ int client_connection_close(struct dml_connection *dc, void *arg)
dml_con = NULL;
packet_id = 0;
dml_poll_add(arg, NULL, NULL, client_reconnect);
dml_poll_timeout(arg, &(struct timespec){ 1, 0 });
g_timeout_add_seconds(1, client_reconnect, arg);
if (dc)
return dml_connection_destroy(dc);
@ -228,7 +226,7 @@ int trigger_cb(enum fileparse_trigger trig)
struct fileparse *fileparse;
int (*parse)(struct fileparse *ogg, void *buffer, size_t size);
int fd_in(void *arg)
gboolean fd_in(GIOChannel *source, GIOCondition condition, gpointer arg)
{
char buffer[4096];
@ -239,7 +237,7 @@ int fd_in(void *arg)
return parse(fileparse, buffer, r);
}
return 0;
return TRUE;
}
@ -252,6 +250,7 @@ int main(int argc, char **argv)
char *server;
bool use_ogg = true;
bool use_isom = false;
GIOChannel *io = NULL;
if (argc > 1)
file = argv[1];
@ -306,11 +305,12 @@ int main(int argc, char **argv)
else
fileparse = matroska_create(data_cb, trigger_cb, &parse);
dml_poll_add(&fd_ogg, fd_in, NULL, NULL);
dml_poll_fd_set(&fd_ogg, 0);
dml_poll_in_set(&fd_ogg, true);
io = g_io_channel_unix_new(fd_ogg);
g_io_channel_set_encoding(io, NULL, NULL);
g_io_add_watch(io, G_IO_IN, fd_in, &fd_ogg);
dml_poll_loop();
g_main_loop_run(g_main_loop_new(NULL, false));
g_io_channel_unref(io);
return 0;
}

+ 28
- 35
dml_trx.c View File

@ -19,7 +19,6 @@
#include <dml/dml_client.h>
#include <dml/dml_connection.h>
#include <dml/dml_poll.h>
#include <dml/dml_packet.h>
#include <dml/dml.h>
#include <dml/dml_host.h>
@ -311,7 +310,7 @@ static int fprs_update_mac(uint8_t mac[6])
return 0;
}
static int fprs_timer(void *arg)
static gboolean fprs_timer(void *arg)
{
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
@ -368,13 +367,12 @@ static int fprs_timer(void *arg)
DML_STATUS_OK);
}
dml_poll_timeout(&fprs_timer,
&(struct timespec){ DML_TRX_FPRS_TIMER, 0});
g_timeout_add_seconds(DML_TRX_FPRS_TIMER, fprs_timer, &fprs_timer);
return 0;
return G_SOURCE_REMOVE;
}
static int fprs_db_check(void *arg)
static gboolean fprs_db_check(void *arg)
{
if (!cur_db) {
struct dml_stream *ds = NULL;
@ -407,10 +405,9 @@ static int fprs_db_check(void *arg)
fprs_parse_request_flush(send_data_fprs, NULL);
}
dml_poll_timeout(&cur_db,
&(struct timespec){ DML_TRX_FPRS_DB_TIMER, 0 });
g_timeout_add_seconds(DML_TRX_FPRS_DB_TIMER, fprs_db_check, &cur_db);
return 0;
return G_SOURCE_REMOVE;
}
static void stream_removed_cb(struct dml_host *host, struct dml_stream *ds, void *arg)
@ -605,7 +602,7 @@ static void recv_data(void *data, size_t size)
}
}
static int rx_watchdog(void *arg)
static gboolean rx_watchdog(void *arg)
{
printf("No activity, sending state off packet\n");
@ -654,10 +651,9 @@ static int rx_watchdog(void *arg)
free(e);
}
dml_poll_timeout(&rx_state,
&(struct timespec){ DML_TRX_DATA_KEEPALIVE, 0});
g_timeout_add_seconds(DML_TRX_DATA_KEEPALIVE, rx_watchdog, &rx_state);
return 0;
return G_SOURCE_REMOVE;
}
static int dv_in_cb(void *arg, uint8_t from[6], uint8_t to[6], uint8_t *dv, size_t size, int mode, uint8_t level)
@ -679,9 +675,11 @@ static int dv_in_cb(void *arg, uint8_t from[6], uint8_t to[6], uint8_t *dv, size
fprs_update_mac(from);
dml_poll_timeout(&rx_state, rx_state ?
&(struct timespec){0, RXSTATE_CHECK_TIMER_NS} :
&(struct timespec){DML_TRX_DATA_KEEPALIVE, 0} );
g_source_remove_by_user_data(&rx_state);
if (rx_state)
g_timeout_add(RXSTATE_CHECK_TIMER_NS/1000000, rx_watchdog, &rx_state);
else
g_timeout_add_seconds(DML_TRX_DATA_KEEPALIVE, rx_watchdog, &rx_state);
return 0;
}
@ -829,7 +827,7 @@ static int command_cb(void *arg, uint8_t from[6], uint8_t to[6], char *ctrl, siz
return 0;
}
static int command_pipe_cb(void *arg)
static gboolean command_pipe_cb(GIOChannel *source, GIOCondition condition, gpointer arg)
{
int fd = *(int*)arg;
static char c;
@ -838,14 +836,14 @@ static int command_pipe_cb(void *arg)
if (r == 1) {
if (c == '\r')
return 0;
return TRUE;
if (c == '\n') {
if (command_pipe_len) {
command_pipe[command_pipe_len] = 0;
if (allow_commands)
command_cb_handle(command_pipe);
command_pipe_len = 0;
return 0;
return TRUE;
}
}
@ -856,7 +854,7 @@ static int command_pipe_cb(void *arg)
command_pipe_len = 0;
}
return 0;
return TRUE;
}
@ -976,6 +974,7 @@ int main(int argc, char **argv)
uint32_t bps = 6400;
struct dml_crypto_key *dk;
int fd_command;
GIOChannel *io_command = NULL;
if (argc > 1)
file = argv[1];
@ -1068,10 +1067,6 @@ int main(int argc, char **argv)
dml_host_stream_req_reverse_connect_cb_set(host, stream_req_reverse_connect_cb, NULL);
dml_host_stream_req_reverse_disconnect_cb_set(host, stream_req_reverse_disconnect_cb, NULL);
dml_poll_add(&rx_state, NULL, NULL, rx_watchdog);
dml_poll_add(&fprs_timer, NULL, NULL, fprs_timer);
dml_poll_add(&cur_db, NULL, NULL, fprs_db_check);
fprs_parse_hook_message(message_cb, NULL);
@ -1147,19 +1142,17 @@ int main(int argc, char **argv)
}
}
dml_poll_add(&fd_command, command_pipe_cb, NULL, NULL);
dml_poll_fd_set(&fd_command, fd_command);
dml_poll_in_set(&fd_command, true);
dml_poll_timeout(&rx_state,
&(struct timespec){ DML_TRX_DATA_KEEPALIVE, 0});
dml_poll_timeout(&fprs_timer,
&(struct timespec){ DML_TRX_FPRS_TIMER_INIT, 0});
io_command = g_io_channel_unix_new(fd_command);
g_io_channel_set_encoding(io_command, NULL, NULL);
g_io_add_watch(io_command, G_IO_IN, command_pipe_cb, &fd_command);
g_timeout_add_seconds(DML_TRX_DATA_KEEPALIVE, rx_watchdog, &rx_state);