jb_alvarado
Lieutenant
- Registriert
- Sep. 2015
- Beiträge
- 594
Hallo Allerseits,
ich habe schon auf der Diskussionsseite von Actix-web gefragt, aber da bekomme ich keine Antwort. Vielleicht habe ich hier mehr Glück.
Ich programmiere ein Backend mit Rust und Actix. Dabei möchte ich die Möglichkeit haben per API Aufgaben auszulösen, die zum Teil länger dauern können. Diese Aufgaben sollen im Hintergrund laufen und per Websocket soll der Status vom Server zum Client geschickt werden.
Allerdings habe ich gerade das Problem, dass ich für diesen Status einen Channel (tokio broadcast) zwischen dem API Service und dem Websocket erstellen möchte, dieser aber keine Nachrichten empfängt*. Wenn ich in der Main Funktion einen Thread erstelle und von dort aus Nachrichten versende, kommen sie durch.
*Das komische ist, dass in ganz seltenen Fällen Nachrichten vom Service geschickt werden und auch ankommen. Nach welchem Schema das funktioniert ist mir nicht ersichtlich.
Ich habe hier mal einen Beispielcode erstellt:
Habt ihr eine Idee, was hier falsch läuft?
Hm, warum findet man oft kurz nach dem man ins Forum schreibt selbst die Antwort...
Habe den Channel im falschen Kontext erstellt. Wenn er direkt am Anfang in der Main Funktion erstellt wird, geht es.
ich habe schon auf der Diskussionsseite von Actix-web gefragt, aber da bekomme ich keine Antwort. Vielleicht habe ich hier mehr Glück.
Ich programmiere ein Backend mit Rust und Actix. Dabei möchte ich die Möglichkeit haben per API Aufgaben auszulösen, die zum Teil länger dauern können. Diese Aufgaben sollen im Hintergrund laufen und per Websocket soll der Status vom Server zum Client geschickt werden.
Allerdings habe ich gerade das Problem, dass ich für diesen Status einen Channel (tokio broadcast) zwischen dem API Service und dem Websocket erstellen möchte, dieser aber keine Nachrichten empfängt*. Wenn ich in der Main Funktion einen Thread erstelle und von dort aus Nachrichten versende, kommen sie durch.
*Das komische ist, dass in ganz seltenen Fällen Nachrichten vom Service geschickt werden und auch ankommen. Nach welchem Schema das funktioniert ist mir nicht ersichtlich.
Ich habe hier mal einen Beispielcode erstellt:
Rust:
use std::{thread, time::Duration};
use actix::prelude::*;
use actix_web::{
get, middleware, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder,
};
use actix_web_actors::ws;
use tokio::sync::broadcast;
struct WebSocket {
receiver: web::Data<broadcast::Receiver<String>>,
spawn_handle: Option<SpawnHandle>,
}
impl WebSocket {
fn new(receiver: web::Data<broadcast::Receiver<String>>) -> Self {
Self {
receiver,
spawn_handle: None,
}
}
}
impl Actor for WebSocket {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
println!("-- actor");
let mut receiver = self.receiver.resubscribe();
self.spawn_handle = Some(ctx.add_stream(async_stream::stream! {
while let Ok(message) = receiver.recv().await {
println!("-- msg: {message}");
yield message.to_string();
}
}));
}
}
impl StreamHandler<String> for WebSocket {
fn handle(&mut self, msg: String, ctx: &mut Self::Context) {
ctx.text(msg);
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WebSocket {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, _ctx: &mut Self::Context) {
println!("Received message: {msg:?}")
}
}
async fn status_ws(
req: HttpRequest,
stream: web::Payload,
recv: web::Data<broadcast::Receiver<String>>,
) -> Result<HttpResponse, Error> {
println!("-- client connect");
ws::start(WebSocket::new(recv), &req, stream)
}
#[get("/hello/")]
async fn say_hello(sender: web::Data<broadcast::Sender<String>>) -> impl Responder {
let task = "from service 00".to_string();
sender
.send(task.clone())
.expect("Failed to write to channel");
println!("say hello");
web::Json(task)
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
HttpServer::new(move || {
let (sender, receiver) = broadcast::channel::<String>(20);
let sender2 = sender.clone();
let _handle = thread::spawn(move || {
for i in 1..22 {
thread::sleep(Duration::from_millis(600));
sender2
.send(format!("from loop {i}"))
.expect("Failed to write to channel");
}
});
App::new()
.app_data(web::Data::new(sender))
.app_data(web::Data::new(receiver))
.wrap(middleware::Logger::default())
.service(web::resource("/ws/status").route(web::get().to(status_ws)))
.service(web::scope("/api").service(say_hello))
})
.bind(("127.0.0.1", 8077))?
.run()
.await
}
Habt ihr eine Idee, was hier falsch läuft?
Ergänzung ()
Hm, warum findet man oft kurz nach dem man ins Forum schreibt selbst die Antwort...
Habe den Channel im falschen Kontext erstellt. Wenn er direkt am Anfang in der Main Funktion erstellt wird, geht es.
Zuletzt bearbeitet: