add internal buffering and dedicated output thread

This commit is contained in:
Steve Markgraf 2025-03-11 00:54:22 +01:00
parent ae0d94eba6
commit ff55b5acc6
6 changed files with 195 additions and 64 deletions

View file

@ -114,11 +114,14 @@ typedef void(*hsdaoh_read_cb_t)(hsdaoh_data_info_t *data_info);
* \param dev the device handle given by hsdaoh_open() * \param dev the device handle given by hsdaoh_open()
* \param cb callback function to return received data * \param cb callback function to return received data
* \param ctx user specific context to pass via the callback function * \param ctx user specific context to pass via the callback function
* \param buf_num optional buffer count
* set to 0 for default buffer count (16)
* \return 0 on success * \return 0 on success
*/ */
HSDAOH_API int hsdaoh_start_stream(hsdaoh_dev_t *dev, HSDAOH_API int hsdaoh_start_stream(hsdaoh_dev_t *dev,
hsdaoh_read_cb_t cb, hsdaoh_read_cb_t cb,
void *ctx); void *ctx,
unsigned int buf_num);
/*! /*!
* Stop streaming data from the device. * Stop streaming data from the device.

View file

@ -7,6 +7,14 @@ enum hsdaoh_async_status {
HSDAOH_RUNNING HSDAOH_RUNNING
}; };
struct llist {
uint8_t *data;
size_t len;
uint16_t sid;
uint16_t format;
struct llist *next;
};
struct hsdaoh_dev { struct hsdaoh_dev {
libusb_context *ctx; libusb_context *ctx;
struct libusb_device_handle *devh; struct libusb_device_handle *devh;
@ -44,6 +52,16 @@ struct hsdaoh_dev {
unsigned int xfer_errors; unsigned int xfer_errors;
char manufact[256]; char manufact[256];
char product[256]; char product[256];
/* buffering */
pthread_t hsdaoh_output_worker_thread;
pthread_mutex_t ll_mutex;
pthread_cond_t cond;
unsigned int highest_numq;
unsigned int global_numq;
struct llist *ll_buffers;
unsigned int llbuf_num;
}; };
enum enum

View file

@ -40,7 +40,6 @@
#define FD_NUMS 4 #define FD_NUMS 4
static int do_exit = 0; static int do_exit = 0;
static uint32_t bytes_to_read = 0;
static hsdaoh_dev_t *dev = NULL; static hsdaoh_dev_t *dev = NULL;
typedef struct file_ctx { typedef struct file_ctx {
@ -53,8 +52,7 @@ void usage(void)
"hsdaoh_file, HDMI data acquisition tool\n\n" "hsdaoh_file, HDMI data acquisition tool\n\n"
"Usage:\n" "Usage:\n"
"\t[-d device_index (default: 0)]\n" "\t[-d device_index (default: 0)]\n"
"\t[-p ppm_error (default: 0)]\n" "\t[-b maximum number of buffers (default: 16)]\n"
"\t[-n number of samples to read (default: 0, infinite)]\n"
"\t[-0 to -3 filename of steam 0 to stream 3 (a '-' dumps samples to stdout)]\n" "\t[-0 to -3 filename of steam 0 to stream 3 (a '-' dumps samples to stdout)]\n"
"\tfilename (of stream 0) (a '-' dumps samples to stdout)\n\n"); "\tfilename (of stream 0) (a '-' dumps samples to stdout)\n\n");
exit(1); exit(1);
@ -99,12 +97,6 @@ static void hsdaoh_callback(hsdaoh_data_info_t *data_info)
if (!file) if (!file)
return; return;
if ((bytes_to_read > 0) && (bytes_to_read < len)) {
len = bytes_to_read;
do_exit = 1;
hsdaoh_stop_stream(dev);
}
while (nbytes < len) { while (nbytes < len) {
nbytes += fwrite(data_info->buf + nbytes, 1, len - nbytes, file); nbytes += fwrite(data_info->buf + nbytes, 1, len - nbytes, file);
@ -114,9 +106,6 @@ static void hsdaoh_callback(hsdaoh_data_info_t *data_info)
break; break;
} }
} }
if (bytes_to_read > 0)
bytes_to_read -= len;
} }
int main(int argc, char **argv) int main(int argc, char **argv)
@ -127,22 +116,19 @@ int main(int argc, char **argv)
char *filenames[FD_NUMS] = { NULL, }; char *filenames[FD_NUMS] = { NULL, };
int n_read; int n_read;
int r, opt; int r, opt;
int ppm_error = 0;
file_ctx_t f; file_ctx_t f;
int dev_index = 0; int dev_index = 0;
unsigned int num_bufs = 0;
bool fname0_used = false; bool fname0_used = false;
bool have_file = false; bool have_file = false;
while ((opt = getopt(argc, argv, "0:1:2:3:d:n:p:d:a:")) != -1) { while ((opt = getopt(argc, argv, "0:1:2:3:d:b:")) != -1) {
switch (opt) { switch (opt) {
case 'd': case 'd':
dev_index = (uint32_t)atoi(optarg); dev_index = (uint32_t)atoi(optarg);
break; break;
case 'p': case 'b':
ppm_error = atoi(optarg); num_bufs = (unsigned int)atoi(optarg);
break;
case 'n':
bytes_to_read = (uint32_t)atof(optarg) * 2;
break; break;
case '0': case '0':
fname0_used = true; fname0_used = true;
@ -216,8 +202,7 @@ int main(int argc, char **argv)
} }
} }
fprintf(stderr, "Reading samples...\n"); r = hsdaoh_start_stream(dev, hsdaoh_callback, (void *)&f, num_bufs);
r = hsdaoh_start_stream(dev, hsdaoh_callback, (void *)&f);
while (!do_exit) { while (!do_exit) {
usleep(50000); usleep(50000);

View file

@ -472,7 +472,7 @@ int main(int argc, char **argv)
r = pthread_create(&command_thread, &attr, command_worker, NULL); r = pthread_create(&command_thread, &attr, command_worker, NULL);
pthread_attr_destroy(&attr); pthread_attr_destroy(&attr);
r = hsdaoh_start_stream(dev, hsdaoh_callback, NULL); r = hsdaoh_start_stream(dev, hsdaoh_callback, NULL, 0);
while (!do_exit) { while (!do_exit) {
usleep(50000); usleep(50000);
} }

View file

@ -294,7 +294,7 @@ int main(int argc, char **argv)
fprintf(stderr, "Reporting PPM error measurement every %u seconds...\n", ppm_duration); fprintf(stderr, "Reporting PPM error measurement every %u seconds...\n", ppm_duration);
fprintf(stderr, "Press ^C after a few minutes.\n"); fprintf(stderr, "Press ^C after a few minutes.\n");
r = hsdaoh_start_stream(dev, hsdaoh_callback, NULL); r = hsdaoh_start_stream(dev, hsdaoh_callback, NULL, 0);
while (!do_exit) while (!do_exit)
usleep(50000); usleep(50000);

View file

@ -40,6 +40,7 @@
#endif #endif
#include <inttypes.h> #include <inttypes.h>
#include <pthread.h>
#include <libusb.h> #include <libusb.h>
#include <libuvc/libuvc.h> #include <libuvc/libuvc.h>
@ -51,6 +52,8 @@
#include <format_convert.h> #include <format_convert.h>
#include <crc.h> #include <crc.h>
#define DEFAULT_BUFFERS 16
typedef struct hsdaoh_adapter { typedef struct hsdaoh_adapter {
uint16_t vid; uint16_t vid;
uint16_t pid; uint16_t pid;
@ -525,34 +528,6 @@ int hsdaoh_set_output_format(hsdaoh_dev_t *dev, hsdaoh_output_format_t format)
return 0; return 0;
} }
/* callback for idle/filler data */
inline int hsdaoh_check_idle_cnt(hsdaoh_dev_t *dev, uint16_t *buf, size_t length)
{
int idle_counter_errors = 0;
if (length == 0)
return 0;
for (unsigned int i = 0; i < length; i++) {
if (buf[i] != ((dev->idle_cnt+1) & 0xffff))
idle_counter_errors++;
dev->idle_cnt = buf[i];
}
return idle_counter_errors;
}
/* Extract the metadata stored in the upper 4 bits of the last word of each line */
inline void hsdaoh_extract_metadata(uint8_t *data, metadata_t *metadata, unsigned int width)
{
int j = 0;
uint8_t *meta = (uint8_t *)metadata;
for (unsigned i = 0; i < sizeof(metadata_t)*2; i += 2)
meta[j++] = (data[((i+1)*width*2) - 1] >> 4) | (data[((i+2)*width*2) - 1] & 0xf0);
}
void hsdaoh_output(hsdaoh_dev_t *dev, uint16_t sid, int format, uint8_t *data, size_t len) void hsdaoh_output(hsdaoh_dev_t *dev, uint16_t sid, int format, uint8_t *data, size_t len)
{ {
hsdaoh_data_info_t data_info; hsdaoh_data_info_t data_info;
@ -583,10 +558,124 @@ void hsdaoh_output(hsdaoh_dev_t *dev, uint16_t sid, int format, uint8_t *data, s
} }
} }
static void *hsdaoh_output_worker(void *arg)
{
struct llist *curelem, *prev;
struct timespec ts;
struct timeval tp;
fd_set writefds;
int r = 0;
hsdaoh_dev_t *dev = (hsdaoh_dev_t *)arg;
while(1) {
if (dev->async_status != HSDAOH_RUNNING)
pthread_exit(NULL);
pthread_mutex_lock(&dev->ll_mutex);
gettimeofday(&tp, NULL);
ts.tv_sec = tp.tv_sec+1;
ts.tv_nsec = tp.tv_usec * 1000;
r = pthread_cond_timedwait(&dev->cond, &dev->ll_mutex, &ts);
if (r == ETIMEDOUT) {
pthread_mutex_unlock(&dev->ll_mutex);
continue;
}
curelem = dev->ll_buffers;
dev->ll_buffers = NULL;
pthread_mutex_unlock(&dev->ll_mutex);
while (curelem != NULL) {
// printf("got a buffer for sid %d with len %d\n", curelem->sid, bytesleft);
hsdaoh_output(dev, curelem->sid, curelem->format, curelem->data, curelem->len);
prev = curelem;
curelem = curelem->next;
free(prev->data);
free(prev);
}
}
}
void hsdaoh_enqueue_data(hsdaoh_dev_t *dev, uint16_t sid, int format, uint8_t *data, size_t len)
{
if (dev->async_status != HSDAOH_RUNNING) {
free(data);
return;
}
struct llist *rpt = (struct llist*)malloc(sizeof(struct llist));
rpt->data = data;
rpt->len = len;
rpt->sid = sid;
rpt->format = format;
rpt->next = NULL;
pthread_mutex_lock(&dev->ll_mutex);
if (dev->ll_buffers == NULL) {
dev->ll_buffers = rpt;
} else {
struct llist *cur = dev->ll_buffers;
unsigned int num_queued = 0;
while (cur->next != NULL) {
cur = cur->next;
num_queued++;
}
if (dev->llbuf_num && dev->llbuf_num == num_queued-2) {
struct llist *curelem;
fprintf(stderr, "Buffer dropped due to overrun!\n");
free(dev->ll_buffers->data);
curelem = dev->ll_buffers->next;
free(dev->ll_buffers);
dev->ll_buffers = curelem;
}
cur->next = rpt;
if (num_queued > dev->highest_numq) {
fprintf(stderr, "Maximum buffer queue length: %d\n", num_queued);
dev->highest_numq = num_queued;
}
dev->global_numq = num_queued;
}
pthread_cond_signal(&dev->cond);
pthread_mutex_unlock(&dev->ll_mutex);
}
/* callback for idle/filler data */
inline int hsdaoh_check_idle_cnt(hsdaoh_dev_t *dev, uint16_t *buf, size_t length)
{
int idle_counter_errors = 0;
if (length == 0)
return 0;
for (unsigned int i = 0; i < length; i++) {
if (buf[i] != ((dev->idle_cnt+1) & 0xffff))
idle_counter_errors++;
dev->idle_cnt = buf[i];
}
return idle_counter_errors;
}
/* Extract the metadata stored in the upper 4 bits of the last word of each line */
inline void hsdaoh_extract_metadata(uint8_t *data, metadata_t *metadata, unsigned int width)
{
uint8_t *meta = (uint8_t *)metadata;
for (unsigned i = 0; i < sizeof(metadata_t)*2; i += 2)
meta[i/2] = (data[((i+1)*width*2) - 1] >> 4) | (data[((i+2)*width*2) - 1] & 0xf0);
}
void hsdaoh_process_frame(hsdaoh_dev_t *dev, uint8_t *data, int size) void hsdaoh_process_frame(hsdaoh_dev_t *dev, uint8_t *data, int size)
{ {
uint32_t frame_payload_bytes = 0;
metadata_t meta; metadata_t meta;
hsdaoh_extract_metadata(data, &meta, dev->width); hsdaoh_extract_metadata(data, &meta, dev->width);
@ -612,7 +701,9 @@ void hsdaoh_process_frame(hsdaoh_dev_t *dev, uint8_t *data, int size)
dev->last_frame_cnt = meta.framecounter; dev->last_frame_cnt = meta.framecounter;
int frame_errors = 0; int frame_errors = 0;
unsigned int stream0_payload_bytes = 0;
uint16_t stream0_format = 0; uint16_t stream0_format = 0;
uint8_t *stream0_data = malloc(dev->width-1 * dev->height * 2);
for (unsigned int i = 0; i < dev->height; i++) { for (unsigned int i = 0; i < dev->height; i++) {
uint8_t *line_dat = data + (dev->width * sizeof(uint16_t) * i); uint8_t *line_dat = data + (dev->width * sizeof(uint16_t) * i);
@ -645,28 +736,32 @@ void hsdaoh_process_frame(hsdaoh_dev_t *dev, uint8_t *data, int size)
} else if ((meta.crc_config == CRC16_1_LINE) || (meta.crc_config == CRC16_2_LINE)) { } else if ((meta.crc_config == CRC16_1_LINE) || (meta.crc_config == CRC16_2_LINE)) {
uint16_t expected_crc = (meta.crc_config == CRC16_1_LINE) ? dev->last_crc[0] : dev->last_crc[1]; uint16_t expected_crc = (meta.crc_config == CRC16_1_LINE) ? dev->last_crc[0] : dev->last_crc[1];
if ((crc != expected_crc) && dev->stream_synced) { if ((crc != expected_crc) && dev->stream_synced)
frame_errors++; frame_errors++;
fprintf(stderr, "Checksum mismatch in line %d: %04x != %04x\n", i, crc, expected_crc);
}
dev->last_crc[1] = dev->last_crc[0]; dev->last_crc[1] = dev->last_crc[0];
dev->last_crc[0] = crc16_ccitt(line_dat, dev->width * sizeof(uint16_t)); dev->last_crc[0] = crc16_ccitt(line_dat, dev->width * sizeof(uint16_t));
} }
if ((payload_len > 0) && dev->stream_synced) { if ((payload_len > 0) && dev->stream_synced) {
unsigned int out_len = payload_len * sizeof(uint16_t);
if (!(meta.flags & FLAG_STREAM_ID_PRESENT) || stream_id == 0) { if (!(meta.flags & FLAG_STREAM_ID_PRESENT) || stream_id == 0) {
memmove(data + frame_payload_bytes, line_dat, payload_len * sizeof(uint16_t)); memcpy(stream0_data + stream0_payload_bytes, line_dat, out_len);
frame_payload_bytes += payload_len * sizeof(uint16_t); stream0_payload_bytes += out_len;
stream0_format = format; stream0_format = format;
} else { } else {
hsdaoh_output(dev, stream_id, format, line_dat, payload_len * sizeof(uint16_t)); uint8_t *out_data = malloc(out_len);
memcpy(out_data, line_dat, out_len);
hsdaoh_enqueue_data(dev, stream_id, format, out_data, out_len);
} }
} }
} }
if (dev->stream_synced && frame_payload_bytes) if (dev->stream_synced && stream0_payload_bytes)
hsdaoh_output(dev, 0, stream0_format, data, frame_payload_bytes); hsdaoh_enqueue_data(dev, 0, stream0_format, stream0_data, stream0_payload_bytes);
else
free(stream0_data);
if (frame_errors && dev->stream_synced) { if (frame_errors && dev->stream_synced) {
fprintf(stderr,"%d frame errors, %d frames since last error\n", frame_errors, dev->frames_since_error); fprintf(stderr,"%d frame errors, %d frames since last error\n", frame_errors, dev->frames_since_error);
@ -705,7 +800,7 @@ void _uvc_callback(uvc_frame_t *frame, void *ptr)
hsdaoh_process_frame(dev, (uint8_t *)frame->data, frame->data_bytes); hsdaoh_process_frame(dev, (uint8_t *)frame->data, frame->data_bytes);
} }
int hsdaoh_start_stream(hsdaoh_dev_t *dev, hsdaoh_read_cb_t cb, void *ctx) int hsdaoh_start_stream(hsdaoh_dev_t *dev, hsdaoh_read_cb_t cb, void *ctx, unsigned int buf_num)
{ {
int r = 0; int r = 0;
@ -723,7 +818,20 @@ int hsdaoh_start_stream(hsdaoh_dev_t *dev, hsdaoh_read_cb_t cb, void *ctx)
dev->cb = cb; dev->cb = cb;
dev->cb_ctx = ctx; dev->cb_ctx = ctx;
dev->output_float = true; // dev->output_float = true;
/* initialize with a threshold */
dev->highest_numq = 9;
dev->llbuf_num = (buf_num == 0) ? DEFAULT_BUFFERS : buf_num;
pthread_mutex_init(&dev->ll_mutex, NULL);
pthread_cond_init(&dev->cond, NULL);
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
r = pthread_create(&dev->hsdaoh_output_worker_thread, &attr, hsdaoh_output_worker, (void *)dev);
pthread_attr_destroy(&attr);
uvc_error_t res; uvc_error_t res;
uvc_stream_ctrl_t ctrl; uvc_stream_ctrl_t ctrl;
@ -761,10 +869,27 @@ int hsdaoh_stop_stream(hsdaoh_dev_t *dev)
if (HSDAOH_RUNNING == dev->async_status) { if (HSDAOH_RUNNING == dev->async_status) {
dev->async_status = HSDAOH_CANCELING; dev->async_status = HSDAOH_CANCELING;
dev->async_cancel = 1; dev->async_cancel = 1;
pthread_cond_signal(&dev->cond);
/* End the stream. Blocks until last callback is serviced */ /* End the stream. Blocks until last callback is serviced */
uvc_stop_streaming(dev->uvc_devh); uvc_stop_streaming(dev->uvc_devh);
void *status;
struct llist *curelem, *prev;
pthread_join(dev->hsdaoh_output_worker_thread, &status);
curelem = dev->ll_buffers;
dev->ll_buffers = NULL;
while (curelem != 0) {
prev = curelem;
curelem = curelem->next;
free(prev->data);
free(prev);
}
dev->global_numq = 0;
return 0; return 0;
} }