tests: lib: cobs: Add streaming tests

Add test cases for streaming variants of COBS encoder/decoder.

Signed-off-by: Pieter De Gendt <pieter.degendt@basalte.be>
This commit is contained in:
Pieter De Gendt
2026-01-08 14:09:32 +01:00
committed by Henrik Brix Andersen
parent f344ab6b98
commit aa76c1d10d

View File

@@ -383,6 +383,46 @@ ZTEST_F(cobs_tests, test_encode)
}
}
static int cobs_net_buf_cb(const uint8_t *buf, size_t len, void *user_data)
{
struct net_buf *dst = user_data;
if (net_buf_tailroom(dst) < len) {
return -ENOMEM;
}
(void)net_buf_add_mem(dst, buf, len);
return 0;
}
ZTEST_F(cobs_tests, test_encode_stream)
{
int ret;
ARRAY_FOR_EACH(cobs_dataset, idx) {
const struct cobs_test_item *test = &cobs_dataset[idx];
struct cobs_encoder enc;
ret = cobs_encoder_init(&enc, cobs_net_buf_cb, fixture->encoded, test->flags);
zassert_ok(ret, "encoder init failed for %s (%d)", test->name, ret);
/* Simulate chunks by sending each byte */
for (size_t i = 0; i < test->decoded_len; ++i) {
ret = cobs_encoder_write(&enc, &test->decoded[i], 1);
zassert_equal(ret, 1, "encoder write failed (%d) for %s", ret, test->name);
}
ret = cobs_encoder_close(&enc);
zassert_ok(ret, "encoder close failed for %s (%d)", test->name, ret);
zassert_equal(test->encoded_len, fixture->encoded->len);
zassert_mem_equal(test->encoded, fixture->encoded->data, test->encoded_len);
net_buf_reset(fixture->encoded);
}
}
ZTEST_F(cobs_tests, test_decode)
{
int ret;
@@ -411,6 +451,126 @@ ZTEST_F(cobs_tests, test_decode)
}
}
ZTEST_F(cobs_tests, test_decode_stream)
{
int ret;
ARRAY_FOR_EACH(cobs_dataset, idx) {
const struct cobs_test_item *test = &cobs_dataset[idx];
struct cobs_decoder dec;
ret = cobs_decoder_init(&dec, cobs_net_buf_cb, fixture->decoded, test->flags);
zassert_ok(ret, "decoder init failed for %s (%d)", test->name, ret);
/* Simulate chunks by sending each byte */
for (size_t i = 0; i < test->encoded_len; ++i) {
ret = cobs_decoder_write(&dec, &test->encoded[i], 1);
zassert_equal(ret, 1, "decoder write failed (%d) for %s", ret, test->name);
}
ret = cobs_decoder_close(&dec);
zassert_ok(ret, "decoder close failed for %s (%d)", test->name, ret);
zassert_equal(test->decoded_len, fixture->decoded->len);
zassert_mem_equal(test->decoded, fixture->decoded->data, test->decoded_len);
net_buf_reset(fixture->decoded);
}
}
static int cobs_forward_to_decoder(const uint8_t *buf, size_t len, void *user_data)
{
struct cobs_decoder *dec = user_data;
return cobs_decoder_write(dec, buf, len);
}
ZTEST_F(cobs_tests, test_encode_decode_stream)
{
int ret;
ARRAY_FOR_EACH(cobs_dataset, idx) {
const struct cobs_test_item *test = &cobs_dataset[idx];
struct cobs_encoder enc;
struct cobs_decoder dec;
ret = cobs_encoder_init(&enc, cobs_forward_to_decoder, &dec, test->flags);
zassert_ok(ret, "encoder init failed for %s (%d)", test->name, ret);
ret = cobs_decoder_init(&dec, cobs_net_buf_cb, fixture->decoded, test->flags);
zassert_ok(ret, "decoder init failed for %s (%d)", test->name, ret);
ret = cobs_encoder_write(&enc, test->decoded, test->decoded_len);
zassert_equal(ret, test->decoded_len, "encoder write failed for %s (%d)",
test->name, ret);
ret = cobs_encoder_close(&enc);
zassert_ok(ret, "encoder close failed for %s (%d)", test->name, ret);
ret = cobs_decoder_close(&dec);
zassert_ok(ret, "decoder close failed for %s (%d)", test->name, ret);
zassert_equal(test->decoded_len, fixture->decoded->len);
zassert_mem_equal(test->decoded, fixture->decoded->data, test->decoded_len);
net_buf_reset(fixture->decoded);
}
}
static size_t frame_test_idx;
static int cobs_frame_tester(const uint8_t *buf, size_t len, void *user_data)
{
struct net_buf *dst = user_data;
if (buf == NULL) {
const struct cobs_test_item *test = &cobs_dataset[frame_test_idx];
zassert_equal(test->decoded_len, dst->len);
zassert_mem_equal(test->decoded, dst->data, test->decoded_len);
net_buf_reset(dst);
frame_test_idx++;
return 0;
}
if (net_buf_tailroom(dst) < len) {
return -ENOMEM;
}
(void)net_buf_add_mem(dst, buf, len);
return 0;
}
ZTEST_F(cobs_tests, test_decode_stream_frame_complete)
{
struct cobs_encoder enc;
struct cobs_decoder dec;
int ret;
/* This test re-uses the same encoder/decoder pair and streams multiple frames */
ret = cobs_encoder_init(&enc, cobs_forward_to_decoder, &dec, COBS_FLAG_TRAILING_DELIMITER);
zassert_ok(ret, "encoder init failed (%d)", ret);
ret = cobs_decoder_init(&dec, cobs_frame_tester, fixture->decoded,
COBS_FLAG_TRAILING_DELIMITER);
zassert_ok(ret, "decoder init failed (%d)", ret);
ARRAY_FOR_EACH(cobs_dataset, idx) {
ret = cobs_encoder_write(&enc, cobs_dataset[idx].decoded,
cobs_dataset[idx].decoded_len);
zassert_equal(ret, cobs_dataset[idx].decoded_len);
/* Closing will write a delimiter and reset the state */
ret = cobs_encoder_close(&enc);
zassert_ok(ret);
}
zassert_equal(frame_test_idx, ARRAY_SIZE(cobs_dataset));
}
ZTEST_F(cobs_tests, test_encode_trailing_delimiter)
{
int ret;