Browse Source

Make sure we don't use a closed connection anymore.

master
Jeroen Vreeken 4 years ago
parent
commit
8d00a355f8
5 changed files with 72 additions and 26 deletions
  1. +2
    -2
      dml_host.c
  2. +7
    -2
      dml_httpd.c
  3. +9
    -4
      dml_reflector.c
  4. +34
    -6
      dml_stream_client_simple.c
  5. +20
    -12
      dml_trx.c

+ 2
- 2
dml_host.c View File

@ -388,9 +388,9 @@ static int client_connection_close(struct dml_connection *dc, void *arg)
dml_poll_add(host, NULL, NULL, client_reconnect);
dml_poll_timeout(host, &(struct timespec){ 1, 0 });
if (dc)
if (dc) {
return dml_connection_destroy(dc);
else
} else
return 0;
}


+ 7
- 2
dml_httpd.c View File

@ -411,8 +411,13 @@ static int callback_http(struct lws *wsi, enum lws_callback_reasons reason,
uint16_t packet_id = (rcv[0] << 8) | rcv[1];
if (data_len > 0) {
printf("Send packet (id %d, len %zd)\n", packet_id, data_len);
dml_connection_send(ws_client->dc, payload_data, packet_id, data_len);
struct dml_connection *dc = ws_client->dc;
if (dc) {
printf("Send packet (id %d, len %zd)\n", packet_id, data_len);
dml_connection_send(ws_client->dc, payload_data, packet_id, data_len);
} else {
r = -1;
}
}
break;


+ 9
- 4
dml_reflector.c View File

@ -127,7 +127,9 @@ void send_data(void *data, size_t size, uint64_t timestamp)
prev_timestamp = timestamp;
printf("+ %016"PRIx64"\n", timestamp);
dml_packet_send_data(dml_host_connection_get(host), packet_id, data, size, timestamp, dk);
struct dml_connection *con = dml_host_connection_get(host);
if (con)
dml_packet_send_data(con, packet_id, data, size, timestamp, dk);
}
struct parrot_data {
@ -146,6 +148,7 @@ int parrot_dequeue(void *data)
{
uint64_t parrot_timestamp;
uint16_t packet_id = dml_stream_data_id_get(stream_dv);
struct dml_connection *con = dml_host_connection_get(host);
if (parrot_queue) {
struct parrot_data *entry = parrot_queue;
@ -175,8 +178,9 @@ int parrot_dequeue(void *data)
parrot_timestamp = dml_ts2timestamp(&parrot_ts);
printf("e %016"PRIx64" %ld %ld %d\n", parrot_timestamp, diff, waitms, entry->duration);
dml_packet_send_data(dml_host_connection_get(host), packet_id,
entry->data, entry->size, parrot_timestamp, dk);
if (con)
dml_packet_send_data(con, packet_id,
entry->data, entry->size, parrot_timestamp, dk);
parrot_ts.tv_nsec += entry->duration * 1000000;
if (parrot_ts.tv_nsec >= 1000000000) {
@ -197,7 +201,8 @@ printf("e %016"PRIx64" %ld %ld %d\n", parrot_timestamp, diff, waitms, entry->dur
parrot_timestamp = dml_ts2timestamp(&parrot_ts);
parrot_timestamp++;
printf("= %016"PRIx64"\n", parrot_timestamp);
dml_packet_send_data(dml_host_connection_get(host), packet_id, data, 8, parrot_timestamp, dk);
if (con)
dml_packet_send_data(con, packet_id, data, 8, parrot_timestamp, dk);
parrot_ts.tv_sec = 0;
}


+ 34
- 6
dml_stream_client_simple.c View File

@ -36,6 +36,7 @@
struct dml_stream_client_simple {
bool header_written;
struct dml_client *client;
struct dml_connection *dc;
uint8_t req_id[DML_ID_SIZE];
@ -49,6 +50,10 @@ static int keepalive_cb(void *arg)
{
struct dml_stream_client_simple *dss = arg;
if (!dss->dc) {
return 0;
}
fprintf(stderr, "No data for %d seconds, send keepalive connect\n", DML_STREAM_CLIENT_SIMPLE_KEEPALIVE);
dml_packet_send_connect(dss->dc, dss->req_id, DML_PACKET_DATA);
@ -178,9 +183,31 @@ static void rx_packet(struct dml_connection *dc, void *arg,
return;
}
static int client_reconnect(void *arg)
{
struct dml_stream_client_simple *dss = arg;
if (dml_client_connect(dss->client)) {
printf("Reconnect to DML server failed\n");
dml_poll_timeout(dss, &(struct timespec){ 2, 0 });
}
dml_poll_add(dss, NULL, NULL, keepalive_cb);
return 0;
}
static int client_connection_close(struct dml_connection *dc, void *arg)
{
return dml_connection_destroy(dc);
struct dml_stream_client_simple *dss = arg;
dml_poll_add(dss, NULL, NULL, client_reconnect);
dml_poll_timeout(dss, &(struct timespec){ 1, 0 });
if (dc)
dml_connection_destroy(dc);
dss->dc = NULL;
return 0;
}
static void client_connect(struct dml_client *client, void *arg)
@ -213,7 +240,7 @@ struct dml_stream_client_simple *dml_stream_client_simple_create(
bool verify)
{
struct dml_stream_client_simple *dss;
struct dml_client *dc;
struct dml_client *client;
dss = calloc(1, sizeof(struct dml_stream_client_simple));
if (!dss)
@ -224,11 +251,12 @@ struct dml_stream_client_simple *dml_stream_client_simple_create(
dss->verify = verify;
dss->arg = arg;
dc = dml_client_create(server, 0, client_connect, dss);
if (!dc)
client = dml_client_create(server, 0, client_connect, dss);
if (!client)
goto err_create;
dss->client = client;
if (dml_client_connect(dc))
if (dml_client_connect(client))
goto err_connect;
dml_poll_add(dss, NULL, NULL, keepalive_cb);
@ -236,7 +264,7 @@ struct dml_stream_client_simple *dml_stream_client_simple_create(
return dss;
err_connect:
dml_client_destroy(dc);
dml_client_destroy(client);
err_create:
free(dss);
err_calloc:


+ 20
- 12
dml_trx.c View File

@ -223,6 +223,7 @@ static int send_data(void *data, size_t size, void *sender_arg)
uint64_t timestamp;
struct timespec ts;
uint16_t packet_id = dml_stream_data_id_get(sender);
struct dml_connection *con = dml_host_connection_get(host);
if (!packet_id)
return -1;
@ -230,7 +231,8 @@ static int send_data(void *data, size_t size, void *sender_arg)
clock_gettime(CLOCK_REALTIME, &ts);
timestamp = dml_ts2timestamp(&ts);
dml_packet_send_data(dml_host_connection_get(host), packet_id, data, size, timestamp, dk);
if (con)
dml_packet_send_data(con, packet_id, data, size, timestamp, dk);
return 0;
}
@ -353,11 +355,12 @@ static int fprs_timer(void *arg)
fprs_frame_destroy(fprs_frame);
}
if (cur_db) {
dml_packet_send_connect(dml_host_connection_get(host),
struct dml_connection *con = dml_host_connection_get(host);
if (cur_db && con) {
dml_packet_send_connect(con,
dml_stream_id_get(cur_db),
dml_stream_data_id_get(cur_db));
dml_packet_send_req_reverse(dml_host_connection_get(host), dml_stream_id_get(cur_db),
dml_packet_send_req_reverse(con, dml_stream_id_get(cur_db),
dml_stream_id_get(stream_fprs),
DML_PACKET_REQ_REVERSE_CONNECT,
DML_STATUS_OK);
@ -695,6 +698,7 @@ static void command_cb_handle(char *command)
bool do_connect = false;
bool nokey = false;
bool notfound = false;
struct dml_connection *con = dml_host_connection_get(host);
/* Skip empty commands */
if (!strlen(command))
@ -738,21 +742,25 @@ static void command_cb_handle(char *command)
cur_con ? dml_stream_name_get(cur_con) : "NONE");
if (do_disconnect && cur_con) {
dml_packet_send_req_disc(dml_host_connection_get(host), dml_stream_id_get(cur_con));
dml_packet_send_req_reverse(dml_host_connection_get(host), dml_stream_id_get(cur_con),
dml_stream_id_get(stream_dv),
DML_PACKET_REQ_REVERSE_DISC,
DML_STATUS_OK);
if (con) {
dml_packet_send_req_disc(con, dml_stream_id_get(cur_con));
dml_packet_send_req_reverse(con, dml_stream_id_get(cur_con),
dml_stream_id_get(stream_dv),
DML_PACKET_REQ_REVERSE_DISC,
DML_STATUS_OK);
}
cur_con = NULL;
fprs_update_status(dml_stream_name_get(stream_dv), "");
}
if (do_connect) {
dml_packet_send_req_header(dml_host_connection_get(host), dml_stream_id_get(ds));
dml_host_connect(host, ds);
if (con) {
dml_packet_send_req_header(con, dml_stream_id_get(ds));
dml_host_connect(host, ds);
}
cur_con = ds;
fprs_update_status(dml_stream_name_get(stream_dv), dml_stream_name_get(cur_con));
dml_packet_send_req_reverse(dml_host_connection_get(host), dml_stream_id_get(ds),
dml_packet_send_req_reverse(con, dml_stream_id_get(ds),
dml_stream_id_get(stream_dv),
DML_PACKET_REQ_REVERSE_CONNECT,
DML_STATUS_OK);


Loading…
Cancel
Save