From b7bf1d9835b0fab2da581df8d81686509d155e15 Mon Sep 17 00:00:00 2001 From: hathach Date: Sat, 22 Nov 2025 01:03:37 +0700 Subject: [PATCH] hil stress test cdc --- src/common/tusb_fifo.c | 4 +-- test/hil/hil_test.py | 63 +++++++++++++++++++++++++++--------------- 2 files changed, 43 insertions(+), 24 deletions(-) diff --git a/src/common/tusb_fifo.c b/src/common/tusb_fifo.c index 463a059f0..f09fcbafa 100644 --- a/src/common/tusb_fifo.c +++ b/src/common/tusb_fifo.c @@ -38,13 +38,13 @@ #if OSAL_MUTEX_REQUIRED -TU_ATTR_ALWAYS_INLINE static inline void _ff_lock(osal_mutex_t mutex) { +TU_ATTR_ALWAYS_INLINE static inline void ff_lock(osal_mutex_t mutex) { if (mutex != NULL) { osal_mutex_lock(mutex, OSAL_TIMEOUT_WAIT_FOREVER); } } -TU_ATTR_ALWAYS_INLINE static inline void _ff_unlock(osal_mutex_t mutex) { +TU_ATTR_ALWAYS_INLINE static inline void ff_unlock(osal_mutex_t mutex) { if (mutex != NULL) { osal_mutex_unlock(mutex); } diff --git a/test/hil/hil_test.py b/test/hil/hil_test.py index 3a11cee13..aabf8a449 100755 --- a/test/hil/hil_test.py +++ b/test/hil/hil_test.py @@ -41,6 +41,7 @@ import fs import hashlib import ctypes from pymtp import MTP +import string ENUM_TIMEOUT = 30 @@ -395,39 +396,57 @@ def test_device_cdc_dual_ports(board): ] ser = [open_serial_dev(p) for p in port] - str_test = [ b"test_no1", b"test_no2" ] - # Echo test write to each port and read back - for i in range(len(str_test)): - s = str_test[i] - l = len(s) - ser[i].write(s) - ser[i].flush() - rd = [ ser[i].read(l) for i in range(len(ser)) ] - assert rd[0] == s.lower(), f'Port1 wrong data: expected {s.lower()} was {rd[0]}' - assert rd[1] == s.upper(), f'Port2 wrong data: expected {s.upper()} was {rd[1]}' + def rand_ascii(length): + return "".join(random.choices(string.ascii_letters + string.digits, k=length)).encode("ascii") + + sizes = [32, 64, 128, 256, 512, random.randint(2000, 5000)] + + def write_and_check(writer, payload): + size = len(payload) + for s in ser: + s.reset_input_buffer() + ser[writer].write(payload) + ser[writer].flush() + rd0 = ser[0].read(size) + rd1 = ser[1].read(size) + assert rd0 == payload.lower(), f'Port0 wrong data ({size}): expected {payload.lower()[:16]}... was {rd0[:16]}' + assert rd1 == payload.upper(), f'Port1 wrong data ({size}): expected {payload.upper()[:16]}... was {rd1[:16]}' + + for size in sizes: + payload0 = rand_ascii(size) + write_and_check(0, payload0) + + payload1 = rand_ascii(size) + write_and_check(1, payload1) ser[0].close() ser[1].close() def test_device_cdc_msc(board): uid = board['uid'] - # Echo test + # CDC Echo test port = get_serial_dev(uid, 'TinyUSB', "TinyUSB_Device", 0) ser = open_serial_dev(port) - test_str = b"test_str" - ser.write(test_str) - ser.flush() - rd_str = ser.read(len(test_str)) - ser.close() - assert rd_str == test_str, f'CDC wrong data: expected: {test_str} was {rd_str}' + def rand_ascii(length): + return "".join(random.choices(string.ascii_letters + string.digits, k=length)).encode("ascii") - # Block test - data = read_disk_file(uid,0,'README.TXT') + sizes = [32, 64, 128, 256, 512, random.randint(2000, 5000)] + for size in sizes: + test_str = rand_ascii(size) + ser.write(test_str) + ser.flush() + rd_str = ser.read(len(test_str)) + assert rd_str == test_str, f'CDC wrong data ({size} bytes): expected: {test_str[:16]}... was {rd_str[:16]}' + + ser.close() + + # MSC Block test + data = read_disk_file(uid, 0, 'README.TXT') readme = \ - b"This is tinyusb's MassStorage Class demo.\r\n\r\n\ -If you find any bugs or get any questions, feel free to file an\r\n\ -issue at github.com/hathach/tinyusb" + b"This is tinyusb's MassStorage Class demo.\r\n\r\n\ + If you find any bugs or get any questions, feel free to file an\r\n\ + issue at github.com/hathach/tinyusb" assert data == readme, 'MSC wrong data'