Browse Source

Streaming support in javascript code.

Added webm to dml_streamer
Various small stuff
master
Jeroen Vreeken 5 years ago
parent
commit
a336e9979e
19 changed files with 1079 additions and 317 deletions
  1. +1
    -1
      .gitignore
  2. +6
    -5
      Makefile
  3. +11
    -0
      dml_client.c
  4. +1
    -0
      dml_client.h
  5. +1
    -0
      dml_connection.c
  6. +6
    -32
      dml_httpd.c
  7. +27
    -0
      dml_packet.c
  8. +2
    -0
      dml_packet.h
  9. +10
    -136
      dml_stream_client.c
  10. +232
    -0
      dml_stream_client_simple.c
  11. +30
    -0
      dml_stream_client_simple.h
  12. +73
    -130
      dml_streamer.c
  13. +2
    -2
      dml_streamer.conf
  14. +11
    -0
      examples.txt
  15. +155
    -11
      htdocs/index.html
  16. +246
    -0
      matroska.c
  17. +35
    -0
      matroska.h
  18. +191
    -0
      ogg.c
  19. +39
    -0
      ogg.h

+ 1
- 1
.gitignore View File

@ -5,6 +5,6 @@ dmld
dml_httpd
dml_list
dml_reflector
dml_streamer_ogg
dml_streamer
dml_stream_client
dml_trx

+ 6
- 5
Makefile View File

@ -13,7 +13,8 @@ DML_SRCS = \
dml_poll.c \
dml_route.c \
dml_server.c \
dml_stream.c
dml_stream.c \
dml_stream_client_simple.c
TRX_SRCS = \
trx_codec2.c \
@ -28,7 +29,7 @@ DML_OBJS = $(DML_SRCS:.c=.o)
TRX_OBJS = $(TRX_SRCS:.c=.o)
ETH_AR_OBJS = $(ETH_AR_SRCS:.c=.o)
all: dmld dml_list dml_reflector dml_streamer_ogg dml_stream_client dml_trx dml_httpd
all: dmld dml_list dml_reflector dml_streamer dml_stream_client dml_trx dml_httpd
SRCS += $(DML_SRCS) $(TRX_SRCS) $(ETH_AR_SRCS)
@ -45,8 +46,8 @@ SRCS += dml_trx.c trx_sound.c
dml_trx_LDFLAGS += -lasound -lcodec2
dml_trx: $(DML_OBJS) $(TRX_OBJS) $(ETH_AR_OBJS) dml_trx.o
SRCS += dml_streamer_ogg.c
dml_streamer_ogg: $(DML_OBJS) dml_streamer_ogg.o
SRCS += dml_streamer.c matroska.c ogg.c
dml_streamer: $(DML_OBJS) dml_streamer.o matroska.o ogg.o
SRCS += dml_stream_client.c
dml_stream_client: $(DML_OBJS) dml_stream_client.o
@ -66,7 +67,7 @@ clean:
rm -rf $(OBJS) \
dml_list \
dml_reflector \
dml_streamer_ogg \
dml_streamer \
dml_stream_client \
dml_trx

+ 11
- 0
dml_client.c View File

@ -70,6 +70,17 @@ err_calloc:
return NULL;
}
int dml_client_destroy(struct dml_client *dc)
{
if (dc->fd >= 0)
close(dc->fd);
free(dc->host);
free(dc);
return 0;
}
int dml_client_connect(struct dml_client *dc)
{
struct addrinfo *result;


+ 1
- 0
dml_client.h View File

@ -23,5 +23,6 @@ struct dml_client;
struct dml_client *dml_client_create(char *host, unsigned short port, void (*cb)(struct dml_client *, void *arg), void *arg);
int dml_client_fd_get(struct dml_client *dc);
int dml_client_connect(struct dml_client *dc);
int dml_client_destroy(struct dml_client *dc);
#endif /* _INCLUDE_DML_CLIENT_H_ */

+ 1
- 0
dml_connection.c View File

@ -90,6 +90,7 @@ err_calloc:
int dml_connection_destroy(struct dml_connection *dc)
{
// printf("close %p fd: %d\n", dc, dc->fd);
dml_poll_remove(dc);
close(dc->fd);
free(dc);


+ 6
- 32
dml_httpd.c View File

@ -197,7 +197,7 @@ void ws_client_flush(struct ws_client *client)
wb = writebuf_next(client);
libwebsocket_write(client->wsi, (unsigned char *)wb->msg, wb->msg_len, LWS_WRITE_BINARY);
writebuf_free(wb);
}
@ -250,6 +250,7 @@ void rx_packet(struct dml_connection *dc, void *arg,
struct ws_client *ws_client;
struct writebuf *wb;
printf("Received packet, id %d, len %d\n", id, len);
ws_client = ws_client_get_by_dc(dc);
wb = writebuf_alloc(len + 4);
msg = (uint8_t *)wb->msg;
@ -519,9 +520,10 @@ static int callback_http(struct libwebsocket_context *context,
static struct libwebsocket_protocols protocols[] = {
// first protocol must always be HTTP handler
{
"http-only", // name
callback_http, // callback
0 // per_session_data_size
name: "http-only", // name
callback: callback_http, // callback
per_session_data_size: 0, // per_session_data_size
rx_buffer_size: 65536,
},
{
NULL, NULL, 0 // end of list
@ -567,34 +569,6 @@ int main(int argc, char **argv)
dml_poll_loop();
/* while (1) {
int n;
n = poll(pollfds, count_pollfds, 1000);
if (n < 0)
break;
for (n = 0; n < count_pollfds; n++)
if (pollfds[n].revents) {
// struct ws_client *entry;
*/
/*
for (entry = ws_client_list; entry; entry = entry->next) {
int fdi = srcp_fd_get(entry->srcp_info);
int fdc = srcp_fd_get(entry->srcp_cmd);
if (fdi == pollfds[n].fd) {
srcp_info_handle(entry->srcp_info);
}
if (fdc == pollfds[n].fd) {
srcp_info_handle(entry->srcp_cmd);
}
}
*/
// libwebsocket_service_fd(context, &pollfds[n]);
/* }
}
*/
libwebsocket_context_destroy(lws_context);
magic_close(magic);


+ 27
- 0
dml_packet.c View File

@ -460,3 +460,30 @@ int dml_packet_parse_data(uint8_t *data, uint16_t len,
return 0;
}
int dml_packet_parse_data_unverified(uint8_t *data, uint16_t len,
void **payload_data, size_t *payload_len, uint64_t *timestamp)
{
if (len < DML_SIG_SIZE + sizeof(uint64_t))
return -1;
size_t plen = len - DML_SIG_SIZE - sizeof(uint64_t);
*payload_len = plen;
*payload_data = malloc(plen);
if (!*payload_data)
return -1;
memcpy(*payload_data, data, plen);
*timestamp =
((uint64_t)data[plen + 0] << 56) |
((uint64_t)data[plen + 1] << 48) |
((uint64_t)data[plen + 2] << 40) |
((uint64_t)data[plen + 3] << 32) |
((uint64_t)data[plen + 4] << 24) |
((uint64_t)data[plen + 5] << 16) |
((uint64_t)data[plen + 6] << 8) |
((uint64_t)data[plen + 7]);
return 0;
}

+ 2
- 0
dml_packet.h View File

@ -124,5 +124,7 @@ int dml_packet_send_data(struct dml_connection *dc,
int dml_packet_parse_data(uint8_t *data, uint16_t len,
void **payload_data, size_t *payload_len, uint64_t *timestamp,
struct dml_crypto_key *dk);
int dml_packet_parse_data_unverified(uint8_t *data, uint16_t len,
void **payload_data, size_t *payload_len, uint64_t *timestamp);
#endif /* _DML_PACKET_H_ */

+ 10
- 136
dml_stream_client.c View File

@ -1,5 +1,5 @@
/*
Copyright Jeroen Vreeken (jeroen@vreeken.net), 2015
Copyright Jeroen Vreeken (jeroen@vreeken.net), 2015, 2016
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
@ -15,15 +15,12 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dml_client.h"
#include "dml_connection.h"
#include "dml_poll.h"
#include "dml_packet.h"
#include "dml.h"
#include "dml_id.h"
#include "dml_crypto.h"
#include "dml_stream.h"
#include "dml_config.h"
#include "dml_stream_client_simple.h"
#include <stdlib.h>
#include <unistd.h>
@ -31,144 +28,22 @@
#include <string.h>
#include <openssl/pem.h>
bool header_written = false;
uint8_t req_id[DML_ID_SIZE];
void rx_packet(struct dml_connection *dc, void *arg,
uint16_t id, uint16_t len, uint8_t *data)
{
// fprintf(stderr, "got id: %d\n", id);
switch(id) {
case DML_PACKET_DESCRIPTION: {
if (!dml_stream_update_description(data, len))
break;
fprintf(stderr, "Request certificate\n");
dml_packet_send_req_certificate(dc, req_id);
break;
}
case DML_PACKET_CERTIFICATE: {
uint8_t cid[DML_ID_SIZE];
void *cert;
size_t size;
fprintf(stderr, "Parse certificate\n");
if (dml_packet_parse_certificate(data, len, cid, &cert, &size))
break;
if (!dml_crypto_cert_add_verify(cert, size, cid)) {
fprintf(stderr, "Request header\n");
dml_packet_send_req_header(dc, req_id);
}
free(cert);
break;
}
case DML_PACKET_HEADER: {
uint8_t hid[DML_ID_SIZE];
uint8_t sig[DML_SIG_SIZE];
void *header;
size_t header_size;
struct dml_stream *ds;
struct dml_crypto_key *dk;
if (dml_packet_parse_header(data, len, hid, sig, &header, &header_size))
break;
if ((ds = dml_stream_by_id(hid))) {
if ((dk = dml_stream_crypto_get(ds))) {
bool verified = dml_crypto_verify(header, header_size, sig, dk);
if (verified) {
write(1, header, header_size);
header_written = true;
dml_stream_data_id_set(ds, DML_PACKET_DATA);
dml_packet_send_connect(dc, req_id, DML_PACKET_DATA);
} else {
fprintf(stderr, "Failed to verify header signature\n");
}
}
}
free(header);
break;
}
default: {
if (id < DML_PACKET_DATA)
break;
if (len < DML_SIG_SIZE + sizeof(uint64_t))
break;
uint64_t timestamp;
size_t payload_len;
void *payload_data;
struct dml_crypto_key *dk;
struct dml_stream *ds;
ds = dml_stream_by_data_id(id);
if (!ds) {
fprintf(stderr, "Could not find dml stream\n");
break;
}
dk = dml_stream_crypto_get(ds);
if (dml_packet_parse_data(data, len,
&payload_data, &payload_len, &timestamp, dk)) {
fprintf(stderr, "Decoding failed\n");
} else {
if (timestamp <= dml_stream_timestamp_get(ds)) {
fprintf(stderr, "Timestamp mismatch %"PRIx64" <= %"PRIx64"\n",
timestamp, dml_stream_timestamp_get(ds));
} else {
dml_stream_timestamp_set(ds, timestamp);
// fprintf(stderr, "Received %zd ok\n", payload_len);
write(1, payload_data, payload_len);
}
free(payload_data);
}
break;
}
}
return;
}
int client_connection_close(struct dml_connection *dc, void *arg)
{
//TODO timeout and reconnect!
return dml_connection_destroy(dc);
}
void client_connect(struct dml_client *client, void *arg)
static int data_cb(void *arg, void *data, size_t datasize)
{
struct dml_connection *dc;
int fd;
write(1, data, datasize);
fd = dml_client_fd_get(client);
dc = dml_connection_create(fd, client, rx_packet, client_connection_close);
dml_packet_send_hello(dc, DML_PACKET_HELLO_LEAF, "dml_stream_client " DML_VERSION);
dml_packet_send_req_description(dc, req_id);
struct dml_stream *ds = dml_stream_by_id_alloc(req_id);
uint64_t timestamp;
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
timestamp = (ts.tv_sec - DML_TIME_MARGIN) << 16;
dml_stream_timestamp_set(ds, timestamp);
return 0;
}
int main(int argc, char **argv)
{
struct dml_client *dc;
char *file = "dml_stream_client.conf";
char *ca;
char *server;
char *req_id_str;
uint8_t req_id[DML_ID_SIZE];
struct dml_stream_client_simple *dss;
if (argc > 2)
file = argv[2];
@ -191,10 +66,9 @@ int main(int argc, char **argv)
dml_str_id(req_id, req_id_str);
dc = dml_client_create(server, 0, client_connect, NULL);
if (dml_client_connect(dc)) {
printf("Could not connect to server\n");
dss = dml_stream_client_simple_create(server, req_id, NULL, data_cb, true);
if (!dss) {
printf("Could not create stream\n");
return -1;
}


+ 232
- 0
dml_stream_client_simple.c View File

@ -0,0 +1,232 @@
/*
Copyright Jeroen Vreeken (jeroen@vreeken.net), 2016
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dml_client.h"
#include "dml_connection.h"
#include "dml_poll.h"
#include "dml_packet.h"
#include "dml.h"
#include "dml_id.h"
#include "dml_crypto.h"
#include "dml_stream.h"
#include "dml_config.h"
#include "dml_stream_client_simple.h"
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <openssl/pem.h>
struct dml_stream_client_simple {
bool header_written;
struct dml_connection *dc;
uint8_t req_id[DML_ID_SIZE];
bool verify;
void *arg;
int (*data_cb)(void *arg, void *data, size_t datasize);
};
static void rx_packet(struct dml_connection *dc, void *arg,
uint16_t id, uint16_t len, uint8_t *data)
{
struct dml_stream_client_simple *dss = arg;
// fprintf(stderr, "got id: %d\n", id);
switch(id) {
case DML_PACKET_DESCRIPTION: {
if (!dml_stream_update_description(data, len))
break;
fprintf(stderr, "Request certificate\n");
dml_packet_send_req_certificate(dc, dss->req_id);
break;
}
case DML_PACKET_CERTIFICATE: {
uint8_t cid[DML_ID_SIZE];
void *cert;
size_t size;
fprintf(stderr, "Parse certificate\n");
if (dml_packet_parse_certificate(data, len, cid, &cert, &size))
break;
if (!dss->verify || !dml_crypto_cert_add_verify(cert, size, cid)) {
fprintf(stderr, "Request header\n");
dml_packet_send_req_header(dc, dss->req_id);
}
free(cert);
break;
}
case DML_PACKET_HEADER: {
uint8_t hid[DML_ID_SIZE];
uint8_t sig[DML_SIG_SIZE];
void *header;
size_t header_size;
struct dml_stream *ds;
struct dml_crypto_key *dk;
if (dml_packet_parse_header(data, len, hid, sig, &header, &header_size))
break;
if (!dss->verify) {
dss->data_cb(dss->arg, header, header_size);
} else if ((ds = dml_stream_by_id(hid))) {
if ((dk = dml_stream_crypto_get(ds))) {
bool verified = dml_crypto_verify(header, header_size, sig, dk);
if (verified) {
dss->data_cb(dss->arg, header, header_size);
dss->header_written = true;
dml_stream_data_id_set(ds, DML_PACKET_DATA);
dml_packet_send_connect(dc, dss->req_id, DML_PACKET_DATA);
} else {
fprintf(stderr, "Failed to verify header signature (%zd bytes)\n", header_size);
}
}
}
free(header);
break;
}
default: {
if (id < DML_PACKET_DATA)
break;
if (len < DML_SIG_SIZE + sizeof(uint64_t))
break;
uint64_t timestamp;
size_t payload_len;
void *payload_data;
struct dml_crypto_key *dk;
struct dml_stream *ds;
ds = dml_stream_by_data_id(id);
if (!ds) {
fprintf(stderr, "Could not find dml stream\n");
break;
}
bool parsed = false;
if (dss->verify) {
dk = dml_stream_crypto_get(ds);
if (dml_packet_parse_data(data, len,
&payload_data, &payload_len, &timestamp, dk)) {
fprintf(stderr, "Decoding failed\n");
} else {
parsed = true;
}
} else {
if (dml_packet_parse_data_unverified(data, len,
&payload_data, &payload_len, &timestamp)) {
} else {
parsed = true;
}
}
if (parsed) {
if (timestamp <= dml_stream_timestamp_get(ds)) {
fprintf(stderr, "Timestamp mismatch %"PRIx64" <= %"PRIx64"\n",
timestamp, dml_stream_timestamp_get(ds));
} else {
dml_stream_timestamp_set(ds, timestamp);
// fprintf(stderr, "Received %zd ok\n", payload_len);
dss->data_cb(dss->arg, payload_data, payload_len);
}
free(payload_data);
}
break;
}
}
return;
}
static int client_connection_close(struct dml_connection *dc, void *arg)
{
return dml_connection_destroy(dc);
}
static void client_connect(struct dml_client *client, void *arg)
{
struct dml_stream_client_simple *dss = arg;
struct dml_connection *dc;
int fd;
fd = dml_client_fd_get(client);
dc = dml_connection_create(fd, arg, rx_packet, client_connection_close);
dml_packet_send_hello(dc, DML_PACKET_HELLO_LEAF, "dml_stream_client " DML_VERSION);
dml_packet_send_req_description(dc, dss->req_id);
struct dml_stream *ds = dml_stream_by_id_alloc(dss->req_id);
uint64_t timestamp;
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
timestamp = (ts.tv_sec - DML_TIME_MARGIN) << 16;
dml_stream_timestamp_set(ds, timestamp);
dss->dc = dc;
}
struct dml_stream_client_simple *dml_stream_client_simple_create(
char *server, uint8_t req_id[DML_ID_SIZE],
void *arg,
int (*data_cb)(void *arg, void *, size_t),
bool verify)
{
struct dml_stream_client_simple *dss;
struct dml_client *dc;
dss = calloc(1, sizeof(struct dml_stream_client_simple));
if (!dss)
goto err_calloc;
memcpy(dss->req_id, req_id, DML_ID_SIZE);
dss->data_cb = data_cb;
dss->verify = verify;
dss->arg = arg;
dc = dml_client_create(server, 0, client_connect, dss);
if (!dc)
goto err_create;
if (dml_client_connect(dc))
goto err_connect;
return dss;
err_connect:
dml_client_destroy(dc);
err_create:
free(dss);
err_calloc:
return NULL;
}
int dml_stream_client_simple_destroy(struct dml_stream_client_simple *dss)
{
dml_connection_destroy(dss->dc);
free(dss);
return 0;
}

+ 30
- 0
dml_stream_client_simple.h View File

@ -0,0 +1,30 @@
/*
Copyright Jeroen Vreeken (jeroen@vreeken.net), 2016
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _INCLUDE_DML_STREAM_CLIENT_SIMPLE_H_
#define _INCLUDE_DML_STREAM_CLIENT_SIMPLE_H_
struct dml_stream_client;
struct dml_stream_client_simple *dml_stream_client_simple_create(
char *server, uint8_t req_id[DML_ID_SIZE],
void *arg,
int (*data_cb)(void *arg, void *, size_t),
bool verify);
#endif /* _INCLUDE_DML_STREAM_CLIENT_SIMPLE_H_ */

dml_streamer_ogg.c → dml_streamer.c View File

@ -24,6 +24,9 @@
#include "dml_crypto.h"
#include "dml_config.h"
#include "ogg.h"
#include "matroska.h"
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
@ -43,6 +46,7 @@ uint32_t bps = 0;
uint16_t packet_id = 0;
struct dml_connection *dml_con;
bool header_done = false;
uint8_t *header;
size_t header_size = 0;
@ -87,7 +91,7 @@ void rx_packet(struct dml_connection *dc, void *arg,
}
case DML_PACKET_REQ_HEADER: {
uint8_t header_sig[DML_SIG_SIZE];
dml_crypto_sign(header_sig, header, header_size, dk);
dml_packet_send_header(dc, ref_id, header_sig, header, header_size);
@ -168,139 +172,69 @@ void send_data(void *data, size_t size)
int fd_ogg = 0;
uint8_t ogg_page[65536];
size_t ogg_pos = 0;
uint8_t ogg_segments;
size_t ogg_total_segments;
uint8_t *pkt_data;
size_t pkt_size;
enum ogg_state {
OGG_STATE_HEADER,
OGG_STATE_SEGMENT_TABLE,
OGG_STATE_DATA,
} ogg_state = OGG_STATE_HEADER;
ssize_t data_cb(void *data, size_t size)
{
if (!header_done) {
header = realloc(header, header_size + size);
memcpy(header + header_size, data, size);
header_size += size;
} else {
pkt_data = realloc(pkt_data, pkt_size + size);
memcpy(pkt_data + pkt_size, data, size);
pkt_size += size;
}
uint32_t vorbis_header;
uint32_t theora_header;
return size;
}
int ogg_in(void *arg)
int trigger_cb_m(enum matroska_trigger trig)
{
ssize_t r;
bool repeat;
r = read(fd_ogg, ogg_page + ogg_pos, sizeof(ogg_page) - ogg_pos);
if (r < 0)
return -1;
if (trig == MATROSKA_TRIGGER_HEADER_COMPLETE) {
header_done = true;
} else {
send_data(pkt_data, pkt_size);
free(pkt_data);
pkt_data = NULL;
pkt_size = 0;
}
ogg_pos += r;
return 0;
}
int trigger_cb_o(enum ogg_trigger trig)
{
if (trig == OGG_TRIGGER_HEADER_COMPLETE) {
header_done = true;
} else {
send_data(pkt_data, pkt_size);
free(pkt_data);
pkt_data = NULL;
pkt_size = 0;
}
do {
repeat = false;
switch (ogg_state) {
case OGG_STATE_HEADER: {
if (ogg_pos >= 27) {
ogg_segments = ogg_page[26];
repeat = true;
ogg_state = OGG_STATE_SEGMENT_TABLE;
}
break;
}
case OGG_STATE_SEGMENT_TABLE: {
if (ogg_pos >= 27 + ogg_segments) {
int i;
ogg_total_segments = 27 + ogg_segments;
for (i = 0; i < ogg_segments; i++) {
ogg_total_segments += ogg_page[27 + i];
}
// printf("%zd segment end ", ogg_total_segments);
repeat = true;
ogg_state = OGG_STATE_DATA;
}
break;
}
case OGG_STATE_DATA: {
if (ogg_pos >= ogg_total_segments) {
uint32_t serial;
if (ogg_page[0] == 'O' &&
ogg_page[1] == 'g' &&
ogg_page[2] == 'g' &&
ogg_page[3] == 'S') {
// printf("Found OggS pattern ");
}
serial = ogg_page[14];
serial |= ogg_page[15] << 8;
serial |= ogg_page[16] << 16;
serial |= ogg_page[17] << 24;
if (ogg_page[5] & 0x02 &&
ogg_page[27 + ogg_segments + 1] == 'v' &&
ogg_page[27 + ogg_segments + 2] == 'o' &&
ogg_page[27 + ogg_segments + 3] == 'r' &&
ogg_page[27 + ogg_segments + 4] == 'b' &&
ogg_page[27 + ogg_segments + 5] == 'i' &&
ogg_page[27 + ogg_segments + 6] == 's') {
printf("Start of Vorbis stream ");
vorbis_header = serial;
}
if (ogg_page[5]& 0x02 &&
ogg_page[27 + ogg_segments + 1] == 't' &&
ogg_page[27 + ogg_segments + 2] == 'h' &&
ogg_page[27 + ogg_segments + 3] == 'e' &&
ogg_page[27 + ogg_segments + 4] == 'o' &&
ogg_page[27 + ogg_segments + 5] == 'r' &&
ogg_page[27 + ogg_segments + 6] == 'a') {
printf("Start of Theora stream ");
theora_header = serial;
}
// printf("bitflags: %02x segments: %d serial: %08x ", ogg_page[5], ogg_segments, serial);
// printf(" %02x\n", ogg_page[27 + ogg_segments]);
if (vorbis_header == serial) {
if (!(ogg_page[27 + ogg_segments] & 1)) {
printf("First vorbis data\n");
vorbis_header = 0;
} else {
printf("Vorbis header\n");
header = realloc(header, header_size + ogg_pos);
memcpy(header + header_size, ogg_page, ogg_pos);
header_size += ogg_pos;
}
}
if (theora_header == serial) {
if (!(ogg_page[27 + ogg_segments] & 0x80)) {
printf("First theora data\n");
theora_header = 0;
} else {
printf("Theora header\n");
header = realloc(header, header_size + ogg_pos);
memcpy(header + header_size, ogg_page, ogg_pos);
header_size += ogg_pos;
}
}
int i;
for (i = 0; i < ogg_pos; i += 1024) {
int size = ogg_pos - i;
if (size > 1024)
size = 1024;
send_data(ogg_page + i, size);
}
memmove(ogg_page, ogg_page + ogg_total_segments, ogg_pos - ogg_total_segments);
ogg_pos -= ogg_total_segments;
repeat = true;
ogg_state = OGG_STATE_HEADER;
}
break;
}
}
} while (repeat);
return 0;
}
struct ogg *ogg;
struct matroska *mat;
int fd_in(void *arg)
{
char buffer[4096];
ssize_t r;
r = read(fd_ogg, buffer, sizeof(buffer));
if (r > 0) {
if (mat)
return matroska_parse(mat, buffer, r);
else
return ogg_parse(ogg, buffer, r);
}
return 0;
}
@ -308,10 +242,11 @@ int ogg_in(void *arg)
int main(int argc, char **argv)
{
struct dml_client *dc;
char *file = "dml_streamer_ogg.conf";
char *file = "dml_streamer.conf";
char *certificate;
char *key;
char *server;
bool use_ogg = true;
if (argc > 1)
file = argv[1];
@ -321,9 +256,11 @@ int main(int argc, char **argv)
return -1;
}
mime = dml_config_value("mime", NULL, "application/ogg");
if (strcmp(mime + strlen(mime) - 3, "ogg"))
use_ogg = false;
name = dml_config_value("name", NULL, "example");
alias = dml_config_value("alias", NULL, "");
description = dml_config_value("description", NULL, "Test ogg stream");
description = dml_config_value("description", NULL, "Test stream");
bps = atoi(dml_config_value("bps", NULL, "300000"));
server = dml_config_value("server", NULL, "localhost");
@ -354,7 +291,13 @@ int main(int argc, char **argv)
return -1;
}
dml_poll_add(&fd_ogg, ogg_in, NULL, NULL);
if (use_ogg)
ogg = ogg_create(data_cb, trigger_cb_o);
else
mat = matroska_create(data_cb, trigger_cb_m);
dml_poll_add(&fd_ogg, fd_in, NULL, NULL);
dml_poll_fd_set(&fd_ogg, 0);
dml_poll_in_set(&fd_ogg, true);

dml_streamer_ogg.conf → dml_streamer.conf View File

@ -1,8 +1,8 @@
# test stream
mime = application/ogg
mime = video/webm
name = testvideo.pe1rxq.ampr.org
#alias =
description=Test ogg stream with theora video and vorbis audio
description=Test webm stream with VP video and vorbis audio
bps = 306000
server = localhost

+ 11
- 0
examples.txt View File

@ -40,6 +40,17 @@ Stream audio with ogg:
-codec:a vorbis -strict -2 -qscale:a 5 \
-f ogg - | ./dml_streamer_ogg
Stream video with ogg:
ffmpeg \
-f lavfi -i testsrc=size=cif:rate=25 \
-f lavfi -i sine -af arealtime \
-c:v libtheora \
-c:a libvorbis \
-q:v 9 -g:v 25 -q:a 5 \
-f ogg - | ./dml_streamer_ogg
Receive ogg stream and play:
dml_stream_client <stream_id> | mplayer -


+ 155
- 11
htdocs/index.html View File

@ -4,7 +4,10 @@
<p>
dml_httpd works!
</p>
<div id = "hello">No HELLO from server received.</div>
<div id="hello">No HELLO from server received.</div>
<div><video style="border: 2px solid" id="video"></video></div>
<div id = "routes"></div>
</body>
@ -26,7 +29,18 @@ var DML = {
REQ_HEADER: 36,
REQ_REVERSE: 37,
REQ_DISC: 38,
}
DATA: 4096,
},
ID: {
SIZE: 32,
},
SIG: {
SIZE: ((256 * 2) / 8),
},
TIMESTAMP: {
SIZE: 8,
},
};
function char2hex(c)
@ -83,6 +97,17 @@ function arraybufcmp(ab1, ab2)
return false;
}
function ab_copy(dst_ab, dst_off, src_ab, src_off, len)
{
var dst = new DataView(dst_ab);
var src = new DataView(src_ab);
var i;
for (i = 0; i < len; i++) {
dst.setUint8(i + dst_off, src.getUint8(i + src_off));
}
}
function dml()
{
url = "ws://" + location.host;
@ -93,9 +118,11 @@ function dml()
var packet_hello_cb = function(flags, ident) {}
var packet_route_cb = function(hops, id) {}
var packet_description_cb = function(id, version, bps, mime, name, alias, description) {}
var packet_header_cb = function(header_id, header_data, header_sig) {}
var packet_data_cb = function(data, timestamp, signature) {}
ws.onmessage = function(msg) {
console.log("ws.onmessage: " + msg.data.byteLength);
// console.log("ws.onmessage: " + msg.data.byteLength);
header = new DataView(msg.data, 0, 4);
id = header.getUint16(0, false);
@ -103,7 +130,7 @@ function dml()
data = new DataView(msg.data, 4);
pos = 4;
console.log("len: " + len + " id: " + id);
// console.log("len: " + len + " id: " + id);
switch(id) {
case DML.PACKET.HELLO: {
@ -117,14 +144,14 @@ function dml()
}
case DML.PACKET.ROUTE: {
hops = data.getUint8(32);
route_id = msg.data.slice(pos, pos + 32);
route_id = msg.data.slice(pos, pos + DML.ID.SIZE);
dml_this.packet_route_cb(hops, route_id);
break;
}
case DML.PACKET.DESCRIPTION: {
desc_id = msg.data.slice(pos, pos + 32);
pos += 32;
desc_id = msg.data.slice(pos, pos + DML.ID.SIZE);
pos += DML.ID.SIZE;
data = new DataView(msg.data, pos);
desc_version = data.getUint8(0);
desc_bps = data.getUint32(1, false);
@ -147,6 +174,26 @@ function dml()
desc_name, desc_alias, desc_description);
break;
}
case DML.PACKET.HEADER: {
header_id = msg.data.slice(pos, pos + DML.ID.SIZE);
header_data = msg.data.slice(pos + DML.ID.SIZE, pos + len - DML.SIG.SIZE);
header_sig = msg.data.slice(pos + len - DML.SIG.SIZE, pos + len);
dml_this.packet_header_cb(
header_id, header_data, header_sig);
break;
}
default: {
if (id == connected_data_id) {
data = msg.data.slice(pos, pos + len - DML.SIG.SIZE - DML.TIMESTAMP.SIZE);
signature = msg.data.slice(pos + len - DML.SIG.SIZE, pos + len);
timestampdata = new DataView(msg.data, pos + len - DML.SIG.SIZE - DML.TIMESTAMP.SIZE);
timestamp = timestampdata.getUint32(0, false) << 32;
timestamp += timestampdata.getUint32(4, false);
dml_this.packet_data_cb(data, timestamp, signature);
}
}
}
}
@ -163,7 +210,7 @@ function dml()
console.log("ws.onerror(): " + event.data);
}
this.send = function (id, payload_arraybuffer) {
this.send = function dml_connection_send(id, payload_arraybuffer) {
data = new ArrayBuffer(payload_arraybuffer.byteLength + 4);
dataview = new DataView(data);
dataview.setUint16(0, id, false);
@ -178,9 +225,22 @@ function dml()
ws.send(data);
}
this.send_req_description = function (id) {
this.send_req_description = function dml_packet_send_req_description(id) {
dml_this.send(DML.PACKET.REQ_DESCRIPTION, id);
}
this.send_req_header = function dml_packet_send_req_header(id) {
dml_this.send(DML.PACKET.REQ_HEADER, id);
}
this.send_req_disc = function dml_packet_send_req_disc(id) {
dml_this.send(DML.PACKET.REQ_DISC, id);
}
this.send_connect = function dml_packet_send_connect(id, packet_id) {
data = new ArrayBuffer(id.byteLength + 2);
ab_copy(data, 0, id, 0, DML.ID.SIZE);
dataview = new DataView(data);
dataview.setUint16(id.byteLength, packet_id, false);
dml_this.send(DML.PACKET.CONNECT, data);
}
}
@ -189,9 +249,11 @@ var routes = new Array();
function update_routes()
{
var i;
str = "<pre>";
str = "";
str += "<button onclick=\"disconnect()\">Disconnect</button>";
for (i = 0; i < routes.length; i++) {
str += "<pre>";
str += "id: " + sha2str(routes[i].id) + " hops: " + routes[i].hops + "\n";
if (routes[i].description) {
str += "\tbps:\t" + routes[i].description.bps + "\n";
@ -200,13 +262,69 @@ function update_routes()
str += "\talias:\t" + routes[i].description.alias + "\n";
str += "\tdescription:\t" + routes[i].description.description + "\n";
}
str += "</pre>";
str += "<button onclick=\"connect(" + i + ")\">Connect</button>";
}
str += "</pre>";
document.getElementById("routes").innerHTML = str;
}
video = document.getElementById("video");
connection = new dml();
connected = false;
connected_id = null;
connected_data_id = DML.PACKET.DATA;
connected_timestamp = 0;
var mediaSource = new MediaSource();
var sourceBuffer;
function disconnect()
{
if (connected) {
connection.send_req_disc(connected_id);
connected_data_id++;
connected = false;
video.pause();
}
}
function connect(id)
{
if (connected && arraybufcmp(routes[id].id, connected_id)) {
connection.send_req_disc(routes[id].id);
connected_data_id++;
connected = false;
}
var can_play = MediaSource.isTypeSupported(routes[id].description.mime);
if (can_play) {
connected = true;
connected_id = routes[id].id;
video.src = URL.createObjectURL(mediaSource);
video.play();
console.log("mediaSource.readyState: " + mediaSource.readyState);
mediaSource.addEventListener('sourceopen', sourceOpen);
} else {
alert("Your browser does not support the mime type '" + routes[id].description.mime + "'.");
}
}
function sourceOpen (_) {
console.log("mediaSource.readyState: " + mediaSource.readyState);
sourceBuffer = mediaSource.addSourceBuffer("video/webm");
//"application/ogg");
connection.send_req_header(connected_id);
console.log("Requested header");
};
connection.packet_hello_cb = function(flags, ident) {
document.getElementById("hello").innerHTML = "Server identity: " + ident + "<br>Server flags: " + flags;
}
@ -252,6 +370,32 @@ connection.packet_description_cb = function(id, version, bps, mime, name, alias,
update_routes();
}
connection.packet_header_cb = function(header_id, header_data, header_sig) {
console.log("header " + header_id.byteLength + " " + header_data.byteLength + " " + header_sig.byteLength);
if (!arraybufcmp(connected_id, header_id)) {
connection.send_connect(connected_id, connected_data_id);
if (header_data.byteLength) {
sourceBuffer.appendBuffer(header_data);
}
}
}
connection.packet_data_cb = function(data, timestamp, signature) {
var i;
if (timestamp <= connected_timestamp) {
console.log("timestamp invalid");
}
sourceBuffer.appendBuffer(data);
if (video.buffered.length) {
if (video.buffered.start(0) > video.currentTime)
video.currentTime = video.buffered.start(0);
}
}
</script>
</html>

+ 246
- 0
matroska.c View File

@ -0,0 +1,246 @@
/*
Copyright Jeroen Vreeken (jeroen@vreeken.net), 2016
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include <stdio.h>
#include <stdbool.h>
#include "matroska.h"
enum matroska_state {
MATROSKA_ELEMENTID_FIRSTOCTET,
MATROSKA_ELEMENTID_OCTETS,
MATROSKA_SIZE_FIRSTOCTET,
MATROSKA_SIZE_OCTETS,
MATROSKA_ELEMENT_OCTETS,
};
#define MATROSKA_LEVEL_MAX 5
struct matroska_element {
enum matroska_state state;
uint8_t id[4];
int id_pos;
int id_size;
uint64_t size;
int size_pos;
int size_size;
uint64_t pos;
};
struct matroska {
int level;
struct matroska_element level_state[MATROSKA_LEVEL_MAX+1];
ssize_t (*data_cb)(void *data, size_t size);
int (*trigger_cb)(enum matroska_trigger trig);
};
bool matroska_element_dive(struct matroska *mat)
{
struct matroska_element *em = &mat->level_state[mat->level];
if (em->id[0] == 0x18 &&
em->id[1] == 0x53 &&
em->id[2] == 0x80 &&
em->id[3] == 0x67)
return true;
return false;
}
int matroska_element_trigger(struct matroska *mat)
{
struct matroska_element *em = &mat->level_state[mat->level];
if (em->id[0] == 0x16 &&
em->id[1] == 0x54 &&
em->id[2] == 0xae &&
em->id[3] == 0x6b)
mat->trigger_cb(MATROSKA_TRIGGER_HEADER_COMPLETE);
if (em->id[0] == 0x1f &&
em->id[1] == 0x43 &&
em->id[2] == 0xb6 &&
em->id[3] == 0x75)
mat->trigger_cb(MATROSKA_TRIGGER_PACKET_COMPLETE);
return 0;
}
#define PUSH(d) do { mat->data_cb(bufo + pos, (d)); pos += (d); } while(0)
int matroska_parse(struct matroska *mat, void *buffer, size_t size)
{
uint8_t *bufo = buffer;
size_t pos = 0;
while (pos < size) {
struct matroska_element *em = &mat->level_state[mat->level];
switch(em->state) {
case MATROSKA_ELEMENTID_FIRSTOCTET: {
em->pos = 0;
memset(em->id, 0, 4);
em->id[0] = bufo[pos];
em->id_pos = 1;
if (bufo[pos] & 0x80) {
em->id_size = 1;
em->state = MATROSKA_SIZE_FIRSTOCTET;
} else if (bufo[pos] & 0x40) {
em->id_size = 2;
em->state = MATROSKA_ELEMENTID_OCTETS;
} else if (bufo[pos] & 0x20) {
em->id_size = 3;
em->state = MATROSKA_ELEMENTID_OCTETS;
} else if (bufo[pos] & 0x10) {
em->id_size = 4;
em->state = MATROSKA_ELEMENTID_OCTETS;
}
PUSH(1);
break;
}
case MATROSKA_ELEMENTID_OCTETS: {
em->id[em->id_pos] = bufo[pos];
em->id_pos++;
if (em->id_pos == em->id_size) {
em->state = MATROSKA_SIZE_FIRSTOCTET;
}
PUSH(1);
break;
}
case MATROSKA_SIZE_FIRSTOCTET: {
memset(&em->size, 0, 8);
if (bufo[pos] & 0x80) {
em->size_size = 1;
em->size_pos = 1;
em->size = bufo[pos] & 0x7f;
em->state = MATROSKA_ELEMENT_OCTETS;
} else if (bufo[pos] & 0x40) {
em->size_size = 2;
em->size_pos = 1;
em->size = bufo[pos] & 0x3f;
em->state = MATROSKA_SIZE_OCTETS;
} else if (bufo[pos] & 0x20) {
em->size_size = 3;
em->size_pos = 1;
em->size = bufo[pos] & 0x1f;
em->state = MATROSKA_SIZE_OCTETS;
} else if (bufo[pos] & 0x10) {
em->size_size = 4;
em->size_pos = 1;
em->size = bufo[pos] & 0x0f;
em->state = MATROSKA_SIZE_OCTETS;
} else if (bufo[pos] & 0x08) {
em->size_size = 5;
em->size_pos = 1;
em->size = bufo[pos] & 0x07;
em->state = MATROSKA_SIZE_OCTETS;
} else if (bufo[pos] & 0x04) {
em->size_size = 6;
em->size_pos = 1;
em->size = bufo[pos] & 0x03;
em->state = MATROSKA_SIZE_OCTETS;
} else if (bufo[pos] & 0x02) {
em->size_size = 7;
em->size_pos = 1;
em->size = bufo[pos] & 0x01;
em->state = MATROSKA_SIZE_OCTETS;
} else if (bufo[pos] & 0x01) {
em->size_size = 8;
em->size_pos = 1;
em->size = 0;
em->state = MATROSKA_SIZE_OCTETS;
}
if (em->state == MATROSKA_ELEMENT_OCTETS)
printf("%02x %02x %02x %02x: %ld 0x%016lx\n",
em->id[0], em->id[1], em->id[2], em->id[3],
(long)em->size, (long)em->size);
PUSH(1);
break;
}
case MATROSKA_SIZE_OCTETS: {
em->size <<= 8;
em->size |= bufo[pos];
em->size_pos++;
if (em->size_pos == em->size_size) {
em->state = MATROSKA_ELEMENT_OCTETS;
}
if (em->state == MATROSKA_ELEMENT_OCTETS)
printf("%02x %02x %02x %02x: %ld 0x%016lx\n",
em->id[0], em->id[1], em->id[2], em->id[3],
(long)em->size, (long)em->size);
PUSH(1);
break;
}
case MATROSKA_ELEMENT_OCTETS: {
if (em->pos == em->size) {
matroska_element_trigger(mat);
em->state = MATROSKA_ELEMENTID_FIRSTOCTET;
if (mat->level) {
mat->level--;
}
} else if (matroska_element_dive(mat)) {
mat->level++;
} else {
size_t needed = em->size - em->pos;
size_t avail = size - pos;
size_t inc;
if (needed < avail)
inc = needed;
else
inc = avail;
em->pos += inc;
PUSH(inc);
}
break;
}
}
}
return 0;
}
struct matroska *matroska_create(
ssize_t (*data_cb)(void *data, size_t size),
int (*trigger_cb)(enum matroska_trigger trig)
)
{
struct matroska *mat;
mat = calloc(1, sizeof(struct matroska));
if (!mat)
goto err_calloc;
mat->data_cb = data_cb;
mat->trigger_cb = trigger_cb;
return mat;
err_calloc:
return NULL;
}

+ 35
- 0
matroska.h View File

@ -0,0 +1,35 @@
/*
Copyright Jeroen Vreeken (jeroen@vreeken.net), 2016
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _INCLUDE_MATROSKA_H_
#define _INCLUDE_MATROSKA_H_
struct matroska;
enum matroska_trigger {
MATROSKA_TRIGGER_HEADER_COMPLETE,
MATROSKA_TRIGGER_PACKET_COMPLETE,
};
int matroska_parse(struct matroska *mat, void *buffer, size_t size);
struct matroska *matroska_create(
ssize_t (*data_cb)(void *data, size_t size),
int (*trigger_cb)(enum matroska_trigger trig)
);
#endif /* _INCLUDE_MATROSKA_H_ */

+ 191
- 0
ogg.c View File

@ -0,0 +1,191 @@
/*
Copyright Jeroen Vreeken (jeroen@vreeken.net), 2016
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "ogg.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
static struct ogg {
ssize_t (*data_cb)(void *data, size_t size);
int (*trigger_cb)(enum ogg_trigger trig);
} ogg;
uint8_t ogg_page[65536];
size_t ogg_pos = 0;
uint8_t ogg_segments;
size_t ogg_total_segments;
enum ogg_state {
OGG_STATE_HEADER,
OGG_STATE_SEGMENT_TABLE,
OGG_STATE_DATA,
} ogg_state = OGG_STATE_HEADER;
uint32_t vorbis_header;
uint32_t theora_header;
int ogg_in(ssize_t r)
{
bool repeat;
ogg_pos += r;
do {
repeat = false;
switch (ogg_state) {
case OGG_STATE_HEADER: {
if (ogg_pos >= 27) {
ogg_segments = ogg_page[26];
repeat = true;
ogg_state = OGG_STATE_SEGMENT_TABLE;
}
break;
}
case OGG_STATE_SEGMENT_TABLE: {
if (ogg_pos >= 27 + ogg_segments) {
int i;
ogg_total_segments = 27 + ogg_segments;
for (i = 0; i < ogg_segments; i++) {
ogg_total_segments += ogg_page[27 + i];
}
// printf("%zd segment end ", ogg_total_segments);
repeat = true;
ogg_state = OGG_STATE_DATA;
}
break;
}
case OGG_STATE_DATA: {
if (ogg_pos >= ogg_total_segments) {
uint32_t serial;
if (ogg_page[0] == 'O' &&
ogg_page[1] == 'g' &&
ogg_page[2] == 'g' &&
ogg_page[3] == 'S') {
// printf("Found OggS pattern ");
}
serial = ogg_page[14];
serial |= ogg_page[15] << 8;
serial |= ogg_page[16] << 16;
serial |= ogg_page[17] << 24;
if (ogg_page[5] & 0x02 &&
ogg_page[27 + ogg_segments + 1] == 'v' &&
ogg_page[27 + ogg_segments + 2] == 'o' &&
ogg_page[27 + ogg_segments + 3] == 'r' &&
ogg_page[27 + ogg_segments + 4] == 'b' &&
ogg_page[27 + ogg_segments + 5] == 'i' &&
ogg_page[27 + ogg_segments + 6] == 's') {
printf("Start of Vorbis stream ");
vorbis_header = serial;
}
if (ogg_page[5]& 0x02 &&
ogg_page[27 + ogg_segments + 1] == 't' &&
o