Internal Usage Example Two: CliReporterBoardcastMpsc

vielpork provides a built-in CLI broadcast reporter CliReporterBoardcastMpsc for broadcasting the progress of download tasks to multiple mpsc channels. CliReporterBoardcastMpsc implements two traits, ProgressReporter and ResultReporter, to report the progress and results of download tasks to the outside world.

CliReporterBoardcastMpsc uses tokio::sync::mpsc to broadcast the progress of download tasks. When a download task starts, CliReporterBoardcastMpsc sends progress update messages to multiple mpsc channels, and reports the result of the download task to the outside world when the download task ends.

This reporter's implementation is relatively simple, only need to send messages to multiple mpsc channels in the methods of ProgressReporter and ResultReporter. It was originally used to solve the problem that the rx type in Tonic gRPC server stream can only be mpsc, so we need to broadcast the progress to mpsc channels and then send it to the client through the server.


#![allow(unused)]
fn main() {
use crate::error::Result;
use crate::base::traits::{ProgressReporter, ResultReporter};
use crate::base::structs::DownloadProgress;
use crate::base::enums::{ProgressEvent, DownloadResult, OperationType};
use async_trait::async_trait;

#[derive(Debug,Clone)]
pub struct CliReporterBoardcastMpsc{
    inner_tx: tokio::sync::broadcast::Sender<ProgressEvent>,
    buffer_size: usize,
}
impl CliReporterBoardcastMpsc {
    pub fn new(buffer_size: usize) -> Self {
        let (inner_tx, _) = tokio::sync::broadcast::channel(buffer_size);
        Self { inner_tx, buffer_size }
    }

    pub fn subscribe_mpsc(&self) -> tokio::sync::mpsc::Receiver<ProgressEvent> {
        let (tx, rx) = tokio::sync::mpsc::channel(self.buffer_size);
        let mut inner_rx = self.inner_tx.subscribe();
        
        tokio::spawn(async move {
            loop {
                match inner_rx.recv().await {
                    Ok(event) => {
                        if tx.send(event).await.is_err() {
                            break;
                        }
                    }
                    Err(_) => {
                        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
                    }
                }
            }
        });
        
        rx
    }

    // 发送事件的方法
    pub async fn send(&self, event: ProgressEvent) -> Result<usize> {
        self.inner_tx.send(event)?;
        Ok(self.inner_tx.receiver_count())
    }

    // 创建新订阅者
    pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<ProgressEvent> {
        self.inner_tx.subscribe()
    }
}

#[async_trait]
impl ProgressReporter for CliReporterBoardcastMpsc {
    async fn start_task(&self, task_id: u32, total: u64) ->Result<()> {
        self.send(ProgressEvent::Start { task_id, total }).await?;
        Ok(())
    }

    async fn update_progress(&self, task_id: u32, progress: &DownloadProgress)->Result<()> {
        self.send(ProgressEvent::Update { task_id, progress: progress.clone() }).await?;
        Ok(())
    }

    async fn finish_task(&self, task_id: u32,finish: DownloadResult) ->Result<()>{
        self.send(ProgressEvent::Finish { task_id ,finish}).await?;
        Ok(())
    }
}

#[async_trait]
impl ResultReporter for CliReporterBoardcastMpsc {
    async fn operation_result(&self, operation: OperationType, code: u32, message: String) ->Result<()> {
        self.send(ProgressEvent::OperationResult { operation, code, message }).await?;
        Ok(())
    }
}
}