diff --git a/include/hsdaoh.h b/include/hsdaoh.h index 350a1af..49a5038 100644 --- a/include/hsdaoh.h +++ b/include/hsdaoh.h @@ -114,11 +114,14 @@ typedef void(*hsdaoh_read_cb_t)(hsdaoh_data_info_t *data_info); * \param dev the device handle given by hsdaoh_open() * \param cb callback function to return received data * \param ctx user specific context to pass via the callback function + * \param buf_num optional buffer count + * set to 0 for default buffer count (16) * \return 0 on success */ HSDAOH_API int hsdaoh_start_stream(hsdaoh_dev_t *dev, hsdaoh_read_cb_t cb, - void *ctx); + void *ctx, + unsigned int buf_num); /*! * Stop streaming data from the device. diff --git a/include/hsdaoh_private.h b/include/hsdaoh_private.h index 250ed29..e14623b 100644 --- a/include/hsdaoh_private.h +++ b/include/hsdaoh_private.h @@ -7,6 +7,14 @@ enum hsdaoh_async_status { HSDAOH_RUNNING }; +struct llist { + uint8_t *data; + size_t len; + uint16_t sid; + uint16_t format; + struct llist *next; +}; + struct hsdaoh_dev { libusb_context *ctx; struct libusb_device_handle *devh; @@ -44,6 +52,16 @@ struct hsdaoh_dev { unsigned int xfer_errors; char manufact[256]; char product[256]; + + /* buffering */ + pthread_t hsdaoh_output_worker_thread; + + pthread_mutex_t ll_mutex; + pthread_cond_t cond; + unsigned int highest_numq; + unsigned int global_numq; + struct llist *ll_buffers; + unsigned int llbuf_num; }; enum diff --git a/src/hsdaoh_file.c b/src/hsdaoh_file.c index 33133e1..8519731 100644 --- a/src/hsdaoh_file.c +++ b/src/hsdaoh_file.c @@ -40,7 +40,6 @@ #define FD_NUMS 4 static int do_exit = 0; -static uint32_t bytes_to_read = 0; static hsdaoh_dev_t *dev = NULL; typedef struct file_ctx { @@ -53,8 +52,7 @@ void usage(void) "hsdaoh_file, HDMI data acquisition tool\n\n" "Usage:\n" "\t[-d device_index (default: 0)]\n" - "\t[-p ppm_error (default: 0)]\n" - "\t[-n number of samples to read (default: 0, infinite)]\n" + "\t[-b maximum number of buffers (default: 16)]\n" "\t[-0 to -3 filename of steam 0 to stream 3 (a '-' dumps samples to stdout)]\n" "\tfilename (of stream 0) (a '-' dumps samples to stdout)\n\n"); exit(1); @@ -99,12 +97,6 @@ static void hsdaoh_callback(hsdaoh_data_info_t *data_info) if (!file) return; - if ((bytes_to_read > 0) && (bytes_to_read < len)) { - len = bytes_to_read; - do_exit = 1; - hsdaoh_stop_stream(dev); - } - while (nbytes < len) { nbytes += fwrite(data_info->buf + nbytes, 1, len - nbytes, file); @@ -114,9 +106,6 @@ static void hsdaoh_callback(hsdaoh_data_info_t *data_info) break; } } - - if (bytes_to_read > 0) - bytes_to_read -= len; } int main(int argc, char **argv) @@ -127,22 +116,19 @@ int main(int argc, char **argv) char *filenames[FD_NUMS] = { NULL, }; int n_read; int r, opt; - int ppm_error = 0; file_ctx_t f; int dev_index = 0; + unsigned int num_bufs = 0; bool fname0_used = false; bool have_file = false; - while ((opt = getopt(argc, argv, "0:1:2:3:d:n:p:d:a:")) != -1) { + while ((opt = getopt(argc, argv, "0:1:2:3:d:b:")) != -1) { switch (opt) { case 'd': dev_index = (uint32_t)atoi(optarg); break; - case 'p': - ppm_error = atoi(optarg); - break; - case 'n': - bytes_to_read = (uint32_t)atof(optarg) * 2; + case 'b': + num_bufs = (unsigned int)atoi(optarg); break; case '0': fname0_used = true; @@ -216,8 +202,7 @@ int main(int argc, char **argv) } } - fprintf(stderr, "Reading samples...\n"); - r = hsdaoh_start_stream(dev, hsdaoh_callback, (void *)&f); + r = hsdaoh_start_stream(dev, hsdaoh_callback, (void *)&f, num_bufs); while (!do_exit) { usleep(50000); diff --git a/src/hsdaoh_tcp.c b/src/hsdaoh_tcp.c index fce01f8..9f6dfdc 100644 --- a/src/hsdaoh_tcp.c +++ b/src/hsdaoh_tcp.c @@ -472,7 +472,7 @@ int main(int argc, char **argv) r = pthread_create(&command_thread, &attr, command_worker, NULL); pthread_attr_destroy(&attr); - r = hsdaoh_start_stream(dev, hsdaoh_callback, NULL); + r = hsdaoh_start_stream(dev, hsdaoh_callback, NULL, 0); while (!do_exit) { usleep(50000); } diff --git a/src/hsdaoh_test.c b/src/hsdaoh_test.c index 8c72b41..bdd789f 100644 --- a/src/hsdaoh_test.c +++ b/src/hsdaoh_test.c @@ -294,7 +294,7 @@ int main(int argc, char **argv) fprintf(stderr, "Reporting PPM error measurement every %u seconds...\n", ppm_duration); fprintf(stderr, "Press ^C after a few minutes.\n"); - r = hsdaoh_start_stream(dev, hsdaoh_callback, NULL); + r = hsdaoh_start_stream(dev, hsdaoh_callback, NULL, 0); while (!do_exit) usleep(50000); diff --git a/src/libhsdaoh.c b/src/libhsdaoh.c index ae0fc3b..6e9016f 100644 --- a/src/libhsdaoh.c +++ b/src/libhsdaoh.c @@ -40,6 +40,7 @@ #endif #include +#include #include #include @@ -51,6 +52,8 @@ #include #include +#define DEFAULT_BUFFERS 16 + typedef struct hsdaoh_adapter { uint16_t vid; uint16_t pid; @@ -525,34 +528,6 @@ int hsdaoh_set_output_format(hsdaoh_dev_t *dev, hsdaoh_output_format_t format) return 0; } -/* callback for idle/filler data */ -inline int hsdaoh_check_idle_cnt(hsdaoh_dev_t *dev, uint16_t *buf, size_t length) -{ - int idle_counter_errors = 0; - - if (length == 0) - return 0; - - for (unsigned int i = 0; i < length; i++) { - if (buf[i] != ((dev->idle_cnt+1) & 0xffff)) - idle_counter_errors++; - - dev->idle_cnt = buf[i]; - } - - return idle_counter_errors; -} - -/* Extract the metadata stored in the upper 4 bits of the last word of each line */ -inline void hsdaoh_extract_metadata(uint8_t *data, metadata_t *metadata, unsigned int width) -{ - int j = 0; - uint8_t *meta = (uint8_t *)metadata; - - for (unsigned i = 0; i < sizeof(metadata_t)*2; i += 2) - meta[j++] = (data[((i+1)*width*2) - 1] >> 4) | (data[((i+2)*width*2) - 1] & 0xf0); -} - void hsdaoh_output(hsdaoh_dev_t *dev, uint16_t sid, int format, uint8_t *data, size_t len) { hsdaoh_data_info_t data_info; @@ -583,10 +558,124 @@ void hsdaoh_output(hsdaoh_dev_t *dev, uint16_t sid, int format, uint8_t *data, s } } +static void *hsdaoh_output_worker(void *arg) +{ + struct llist *curelem, *prev; + struct timespec ts; + struct timeval tp; + fd_set writefds; + int r = 0; + hsdaoh_dev_t *dev = (hsdaoh_dev_t *)arg; + + while(1) { + if (dev->async_status != HSDAOH_RUNNING) + pthread_exit(NULL); + + pthread_mutex_lock(&dev->ll_mutex); + gettimeofday(&tp, NULL); + ts.tv_sec = tp.tv_sec+1; + ts.tv_nsec = tp.tv_usec * 1000; + r = pthread_cond_timedwait(&dev->cond, &dev->ll_mutex, &ts); + + if (r == ETIMEDOUT) { + pthread_mutex_unlock(&dev->ll_mutex); + continue; + } + + curelem = dev->ll_buffers; + dev->ll_buffers = NULL; + pthread_mutex_unlock(&dev->ll_mutex); + + while (curelem != NULL) { + // printf("got a buffer for sid %d with len %d\n", curelem->sid, bytesleft); + hsdaoh_output(dev, curelem->sid, curelem->format, curelem->data, curelem->len); + + prev = curelem; + curelem = curelem->next; + free(prev->data); + free(prev); + } + } +} + +void hsdaoh_enqueue_data(hsdaoh_dev_t *dev, uint16_t sid, int format, uint8_t *data, size_t len) +{ + if (dev->async_status != HSDAOH_RUNNING) { + free(data); + return; + } + + struct llist *rpt = (struct llist*)malloc(sizeof(struct llist)); + rpt->data = data; + rpt->len = len; + rpt->sid = sid; + rpt->format = format; + rpt->next = NULL; + + pthread_mutex_lock(&dev->ll_mutex); + + if (dev->ll_buffers == NULL) { + dev->ll_buffers = rpt; + } else { + struct llist *cur = dev->ll_buffers; + unsigned int num_queued = 0; + + while (cur->next != NULL) { + cur = cur->next; + num_queued++; + } + + if (dev->llbuf_num && dev->llbuf_num == num_queued-2) { + struct llist *curelem; + fprintf(stderr, "Buffer dropped due to overrun!\n"); + free(dev->ll_buffers->data); + curelem = dev->ll_buffers->next; + free(dev->ll_buffers); + dev->ll_buffers = curelem; + } + + cur->next = rpt; + + if (num_queued > dev->highest_numq) { + fprintf(stderr, "Maximum buffer queue length: %d\n", num_queued); + dev->highest_numq = num_queued; + } + + dev->global_numq = num_queued; + } + pthread_cond_signal(&dev->cond); + pthread_mutex_unlock(&dev->ll_mutex); +} + +/* callback for idle/filler data */ +inline int hsdaoh_check_idle_cnt(hsdaoh_dev_t *dev, uint16_t *buf, size_t length) +{ + int idle_counter_errors = 0; + + if (length == 0) + return 0; + + for (unsigned int i = 0; i < length; i++) { + if (buf[i] != ((dev->idle_cnt+1) & 0xffff)) + idle_counter_errors++; + + dev->idle_cnt = buf[i]; + } + + return idle_counter_errors; +} + +/* Extract the metadata stored in the upper 4 bits of the last word of each line */ +inline void hsdaoh_extract_metadata(uint8_t *data, metadata_t *metadata, unsigned int width) +{ + uint8_t *meta = (uint8_t *)metadata; + + for (unsigned i = 0; i < sizeof(metadata_t)*2; i += 2) + meta[i/2] = (data[((i+1)*width*2) - 1] >> 4) | (data[((i+2)*width*2) - 1] & 0xf0); +} + void hsdaoh_process_frame(hsdaoh_dev_t *dev, uint8_t *data, int size) { - uint32_t frame_payload_bytes = 0; - metadata_t meta; hsdaoh_extract_metadata(data, &meta, dev->width); @@ -612,7 +701,9 @@ void hsdaoh_process_frame(hsdaoh_dev_t *dev, uint8_t *data, int size) dev->last_frame_cnt = meta.framecounter; int frame_errors = 0; + unsigned int stream0_payload_bytes = 0; uint16_t stream0_format = 0; + uint8_t *stream0_data = malloc(dev->width-1 * dev->height * 2); for (unsigned int i = 0; i < dev->height; i++) { uint8_t *line_dat = data + (dev->width * sizeof(uint16_t) * i); @@ -645,28 +736,32 @@ void hsdaoh_process_frame(hsdaoh_dev_t *dev, uint8_t *data, int size) } else if ((meta.crc_config == CRC16_1_LINE) || (meta.crc_config == CRC16_2_LINE)) { uint16_t expected_crc = (meta.crc_config == CRC16_1_LINE) ? dev->last_crc[0] : dev->last_crc[1]; - if ((crc != expected_crc) && dev->stream_synced) { + if ((crc != expected_crc) && dev->stream_synced) frame_errors++; - fprintf(stderr, "Checksum mismatch in line %d: %04x != %04x\n", i, crc, expected_crc); - } dev->last_crc[1] = dev->last_crc[0]; dev->last_crc[0] = crc16_ccitt(line_dat, dev->width * sizeof(uint16_t)); } if ((payload_len > 0) && dev->stream_synced) { + unsigned int out_len = payload_len * sizeof(uint16_t); + if (!(meta.flags & FLAG_STREAM_ID_PRESENT) || stream_id == 0) { - memmove(data + frame_payload_bytes, line_dat, payload_len * sizeof(uint16_t)); - frame_payload_bytes += payload_len * sizeof(uint16_t); + memcpy(stream0_data + stream0_payload_bytes, line_dat, out_len); + stream0_payload_bytes += out_len; stream0_format = format; } else { - hsdaoh_output(dev, stream_id, format, line_dat, payload_len * sizeof(uint16_t)); + uint8_t *out_data = malloc(out_len); + memcpy(out_data, line_dat, out_len); + hsdaoh_enqueue_data(dev, stream_id, format, out_data, out_len); } } } - if (dev->stream_synced && frame_payload_bytes) - hsdaoh_output(dev, 0, stream0_format, data, frame_payload_bytes); + if (dev->stream_synced && stream0_payload_bytes) + hsdaoh_enqueue_data(dev, 0, stream0_format, stream0_data, stream0_payload_bytes); + else + free(stream0_data); if (frame_errors && dev->stream_synced) { fprintf(stderr,"%d frame errors, %d frames since last error\n", frame_errors, dev->frames_since_error); @@ -705,7 +800,7 @@ void _uvc_callback(uvc_frame_t *frame, void *ptr) hsdaoh_process_frame(dev, (uint8_t *)frame->data, frame->data_bytes); } -int hsdaoh_start_stream(hsdaoh_dev_t *dev, hsdaoh_read_cb_t cb, void *ctx) +int hsdaoh_start_stream(hsdaoh_dev_t *dev, hsdaoh_read_cb_t cb, void *ctx, unsigned int buf_num) { int r = 0; @@ -723,7 +818,20 @@ int hsdaoh_start_stream(hsdaoh_dev_t *dev, hsdaoh_read_cb_t cb, void *ctx) dev->cb = cb; dev->cb_ctx = ctx; - dev->output_float = true; +// dev->output_float = true; + + /* initialize with a threshold */ + dev->highest_numq = 9; + dev->llbuf_num = (buf_num == 0) ? DEFAULT_BUFFERS : buf_num; + + pthread_mutex_init(&dev->ll_mutex, NULL); + pthread_cond_init(&dev->cond, NULL); + + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + r = pthread_create(&dev->hsdaoh_output_worker_thread, &attr, hsdaoh_output_worker, (void *)dev); + pthread_attr_destroy(&attr); uvc_error_t res; uvc_stream_ctrl_t ctrl; @@ -761,10 +869,27 @@ int hsdaoh_stop_stream(hsdaoh_dev_t *dev) if (HSDAOH_RUNNING == dev->async_status) { dev->async_status = HSDAOH_CANCELING; dev->async_cancel = 1; + pthread_cond_signal(&dev->cond); /* End the stream. Blocks until last callback is serviced */ uvc_stop_streaming(dev->uvc_devh); + void *status; + struct llist *curelem, *prev; + pthread_join(dev->hsdaoh_output_worker_thread, &status); + + curelem = dev->ll_buffers; + dev->ll_buffers = NULL; + + while (curelem != 0) { + prev = curelem; + curelem = curelem->next; + free(prev->data); + free(prev); + } + + dev->global_numq = 0; + return 0; }