Merge remote-tracking branch 'ofw/dev' into mntm-dev --nobuild

This commit is contained in:
Willy-JL
2025-04-03 02:41:32 +01:00
77 changed files with 3102 additions and 1529 deletions

View File

@@ -1,6 +1,8 @@
#include "pipe.h"
#include <furi.h>
#define PIPE_DEFAULT_STATE_CHECK_PERIOD furi_ms_to_ticks(100)
/**
* Data shared between both sides.
*/
@@ -23,7 +25,7 @@ struct PipeSide {
PipeSideDataArrivedCallback on_data_arrived;
PipeSideSpaceFreedCallback on_space_freed;
PipeSideBrokenCallback on_pipe_broken;
FuriWait stdout_timeout;
FuriWait state_check_period;
};
PipeSideBundle pipe_alloc(size_t capacity, size_t trigger_level) {
@@ -53,14 +55,14 @@ PipeSideBundle pipe_alloc_ex(PipeSideReceiveSettings alice, PipeSideReceiveSetti
.shared = shared,
.sending = alice_to_bob,
.receiving = bob_to_alice,
.stdout_timeout = FuriWaitForever,
.state_check_period = PIPE_DEFAULT_STATE_CHECK_PERIOD,
};
*bobs_side = (PipeSide){
.role = PipeRoleBob,
.shared = shared,
.sending = bob_to_alice,
.receiving = alice_to_bob,
.stdout_timeout = FuriWaitForever,
.state_check_period = PIPE_DEFAULT_STATE_CHECK_PERIOD,
};
return (PipeSideBundle){.alices_side = alices_side, .bobs_side = bobs_side};
@@ -99,42 +101,62 @@ void pipe_free(PipeSide* pipe) {
}
}
static void _pipe_stdout_cb(const char* data, size_t size, void* context) {
static void pipe_stdout_cb(const char* data, size_t size, void* context) {
furi_assert(context);
PipeSide* pipe = context;
while(size) {
size_t sent = pipe_send(pipe, data, size, pipe->stdout_timeout);
if(!sent) break;
data += sent;
size -= sent;
}
pipe_send(pipe, data, size);
}
static size_t _pipe_stdin_cb(char* data, size_t size, FuriWait timeout, void* context) {
static size_t pipe_stdin_cb(char* data, size_t size, FuriWait timeout, void* context) {
UNUSED(timeout);
furi_assert(context);
PipeSide* pipe = context;
return pipe_receive(pipe, data, size, timeout);
return pipe_receive(pipe, data, size);
}
void pipe_install_as_stdio(PipeSide* pipe) {
furi_check(pipe);
furi_thread_set_stdout_callback(_pipe_stdout_cb, pipe);
furi_thread_set_stdin_callback(_pipe_stdin_cb, pipe);
furi_thread_set_stdout_callback(pipe_stdout_cb, pipe);
furi_thread_set_stdin_callback(pipe_stdin_cb, pipe);
}
void pipe_set_stdout_timeout(PipeSide* pipe, FuriWait timeout) {
void pipe_set_state_check_period(PipeSide* pipe, FuriWait check_period) {
furi_check(pipe);
pipe->stdout_timeout = timeout;
pipe->state_check_period = check_period;
}
size_t pipe_receive(PipeSide* pipe, void* data, size_t length, FuriWait timeout) {
size_t pipe_receive(PipeSide* pipe, void* data, size_t length) {
furi_check(pipe);
return furi_stream_buffer_receive(pipe->receiving, data, length, timeout);
size_t received = 0;
while(length) {
size_t received_this_time =
furi_stream_buffer_receive(pipe->receiving, data, length, pipe->state_check_period);
if(!received_this_time && pipe_state(pipe) == PipeStateBroken) break;
received += received_this_time;
length -= received_this_time;
data += received_this_time;
}
return received;
}
size_t pipe_send(PipeSide* pipe, const void* data, size_t length, FuriWait timeout) {
size_t pipe_send(PipeSide* pipe, const void* data, size_t length) {
furi_check(pipe);
return furi_stream_buffer_send(pipe->sending, data, length, timeout);
size_t sent = 0;
while(length) {
size_t sent_this_time =
furi_stream_buffer_send(pipe->sending, data, length, pipe->state_check_period);
if(!sent_this_time && pipe_state(pipe) == PipeStateBroken) break;
sent += sent_this_time;
length -= sent_this_time;
data += sent_this_time;
}
return sent;
}
size_t pipe_bytes_available(PipeSide* pipe) {
@@ -151,14 +173,14 @@ static void pipe_receiving_buffer_callback(FuriEventLoopObject* buffer, void* co
UNUSED(buffer);
PipeSide* pipe = context;
furi_assert(pipe);
if(pipe->on_space_freed) pipe->on_data_arrived(pipe, pipe->callback_context);
if(pipe->on_data_arrived) pipe->on_data_arrived(pipe, pipe->callback_context);
}
static void pipe_sending_buffer_callback(FuriEventLoopObject* buffer, void* context) {
UNUSED(buffer);
PipeSide* pipe = context;
furi_assert(pipe);
if(pipe->on_data_arrived) pipe->on_space_freed(pipe, pipe->callback_context);
if(pipe->on_space_freed) pipe->on_space_freed(pipe, pipe->callback_context);
}
static void pipe_semaphore_callback(FuriEventLoopObject* semaphore, void* context) {

View File

@@ -148,38 +148,48 @@ void pipe_free(PipeSide* pipe);
void pipe_install_as_stdio(PipeSide* pipe);
/**
* @brief Sets the timeout for `stdout` write operations
* @brief Sets the state check period for `send` and `receive` operations
*
* @note This value is set to `FuriWaitForever` when the pipe is created
* @note This value is set to 100 ms when the pipe is created
*
* @param [in] pipe Pipe side to set the timeout of
* @param [in] timeout Timeout value in ticks
* `send` and `receive` will check the state of the pipe if exactly 0 bytes were
* sent or received during any given `check_period`. Read the documentation for
* `pipe_send` and `pipe_receive` for more info.
*
* @param [in] pipe Pipe side to set the check period of
* @param [in] check_period Period in ticks
*/
void pipe_set_stdout_timeout(PipeSide* pipe, FuriWait timeout);
void pipe_set_state_check_period(PipeSide* pipe, FuriWait check_period);
/**
* @brief Receives data from the pipe.
*
* This function will try to receive all of the requested bytes from the pipe.
* If at some point during the operation the pipe becomes broken, this function
* will return prematurely, in which case the return value will be less than the
* requested `length`.
*
* @param [in] pipe The pipe side to read data out of
* @param [out] data The buffer to fill with data
* @param length Maximum length of data to read
* @param timeout The timeout (in ticks) after which the read operation is
* interrupted
* @returns The number of bytes actually written into the provided buffer
*/
size_t pipe_receive(PipeSide* pipe, void* data, size_t length, FuriWait timeout);
size_t pipe_receive(PipeSide* pipe, void* data, size_t length);
/**
* @brief Sends data into the pipe.
*
* This function will try to send all of the requested bytes to the pipe.
* If at some point during the operation the pipe becomes broken, this function
* will return prematurely, in which case the return value will be less than the
* requested `length`.
*
* @param [in] pipe The pipe side to send data into
* @param [out] data The buffer to get data from
* @param length Maximum length of data to send
* @param timeout The timeout (in ticks) after which the write operation is
* interrupted
* @returns The number of bytes actually read from the provided buffer
*/
size_t pipe_send(PipeSide* pipe, const void* data, size_t length, FuriWait timeout);
size_t pipe_send(PipeSide* pipe, const void* data, size_t length);
/**
* @brief Determines how many bytes there are in the pipe available to be read.