Rust: Kind-Prozesse überwachen und schließen

jb_alvarado

Lieutenant
Registriert
Sep. 2015
Beiträge
576
Hallo Allerseits,
ich bräuchte mal wieder einen Ratschlag bei folgender Situation:

Ich möchte insgesamt 3 Kind- Prozesse starten:
  • einmal ffmpeg welches eine Videodatei einließ
  • einmal ffmpeg welches auf einen rtmp input stream wartet
  • einmal ffplay welches Video, bzw. Steam auf dem Bildschirm ausgibt
Das bekomme ich soweit auch hin. Das erste ffmpeg und ffplay laufen im main Thread und der Streaming-Server läuft in einem Thread.

Das Problem ist nun, dass saubere Schließen der Prozesse, wenn ein Teil abstürzt. Konkret ist das Problemkind ffplay, wenn ich das abschieße schließt sich das Programm nicht mehr sauber. Dabei wird noch nicht einmal das Ende der main Funktion erreicht, wo ich die Prozesse normalerweise nach einander schließen würde.

Also das Programm bleibt wohl im Thread hängen und ich habe keine Möglichkeit dem Prozess in dem Thread zu sagen, es soll stoppen. Auch bleibt der erste ffmpeg Prozess mit <defunct> im System.

Ich habe hier mal ein Beispielcode zusammen geschustert, ist schon etwas lang, aber damit lässt sich das Problem reproduzieren.

Rust:
use std::{
    io::{prelude::*, Error, Read},
    process::{Command, Stdio},
    sync::{
        mpsc::{channel, Receiver, Sender},
        Arc, Mutex,
    },
    thread::sleep,
    time::Duration,
};

use process_control::{ChildExt, Terminator};
use tokio::runtime::Runtime;

async fn ingest_server(
    dec_setting: Vec<&str>,
    ingest_sender: Sender<[u8; 65424]>,
    proc_terminator: Arc<Mutex<Option<Terminator>>>,
    is_terminated: Arc<Mutex<bool>>,
) -> Result<(), Error> {
    let mut buffer: [u8; 65424] = [0; 65424];
    let filter = "[0:v]fps=25,scale=1024:576,setdar=dar=1.778[vout1]";
    let mut filter_list = vec!["-filter_complex", &filter, "-map", "[vout1]", "-map", "0:a"];
    let mut server_cmd = vec!["-hide_banner", "-nostats", "-v", "error"];

    let mut stream_input = vec![
        "-f",
        "live_flv",
        "-listen",
        "1",
        "-i",
        "rtmp://localhost:1936/live/stream",
    ];

    server_cmd.append(&mut stream_input);
    server_cmd.append(&mut filter_list);
    server_cmd.append(&mut dec_setting.clone());

    loop {
        if *is_terminated.lock().unwrap() {
            break;
        }

        let mut server_proc = match Command::new("ffmpeg")
            .args(server_cmd.clone())
            .stdout(Stdio::piped())
            .spawn()
        {
            Err(e) => {
                panic!("couldn't spawn ingest server: {}", e)
            }
            Ok(proc) => proc,
        };

        let serv_terminator = server_proc.terminator()?;
        *proc_terminator.lock().unwrap() = Some(serv_terminator);
        let ingest_reader = server_proc.stdout.as_mut().unwrap();

        loop {
            match ingest_reader.read_exact(&mut buffer[..]) {
                Ok(length) => length,
                Err(_) => break,
            };
            if let Err(e) = ingest_sender.send(buffer) {
                println!("Ingest server error: {:?}", e);
                break;
            }
        }

        sleep(Duration::from_secs(1));

        if let Err(e) = server_proc.wait() {
            panic!("Decoder error: {:?}", e)
        };
    }

    Ok(())
}
fn main() {
    let decoder_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
    let player_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
    let server_term: Arc<Mutex<Option<Terminator>>> = Arc::new(Mutex::new(None));
    let is_terminated: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));

    let dec_setting: Vec<&str> = vec![
        "-pix_fmt",
        "yuv420p",
        "-c:v",
        "mpeg2video",
        "-g",
        "1",
        "-b:v",
        "50000k",
        "-minrate",
        "50000k",
        "-maxrate",
        "50000k",
        "-bufsize",
        "25000k",
        "-c:a",
        "s302m",
        "-strict",
        "-2",
        "-ar",
        "48000",
        "-ac",
        "2",
        "-f",
        "mpegts",
        "-",
    ];

    let player_proc = match Command::new("ffplay")
        .args(["-v", "error", "-hide_banner", "-nostats", "-i", "pipe:0"])
        .stdin(Stdio::piped())
        .spawn()
    {
        Err(e) => panic!("couldn't spawn ffplay: {}", e),
        Ok(proc) => proc,
    };

    let player_terminator = match player_proc.terminator() {
        Ok(proc) => Some(proc),
        Err(_) => None,
    };

    *player_term.lock().unwrap() = player_terminator;
    let (ingest_sender, ingest_receiver): (Sender<[u8; 65424]>, Receiver<[u8; 65424]>) = channel();
    let runtime = Runtime::new().unwrap();

    runtime.spawn(ingest_server(
        dec_setting.clone(),
        ingest_sender,
        server_term.clone(),
        is_terminated.clone(),
    ));

    let mut buffer: [u8; 65424] = [0; 65424];

    let mut dec_cmd = vec![
        "-f",
        "lavfi",
        "-i",
        "testsrc=duration=20:size=1024x576:rate=25",
        "-f",
        "lavfi",
        "-i",
        "anoisesrc=d=20:c=pink:r=48000:a=0.5",
    ];

    dec_cmd.append(&mut dec_setting.clone());

    let mut dec_proc = match Command::new("ffmpeg")
        .args(dec_cmd)
        .stdout(Stdio::piped())
        .spawn()
    {
        Err(e) => panic!("couldn't spawn ffmpeg: {}", e),
        Ok(proc) => proc,
    };

    let dec_terminator = match dec_proc.terminator() {
        Ok(proc) => Some(proc),
        Err(_) => None,
    };

    *decoder_term.lock().unwrap() = dec_terminator;
    let mut player_writer = player_proc.stdin.as_ref().unwrap();

    let dec_reader = dec_proc.stdout.as_mut().unwrap();

    loop {
            let bytes_len = match dec_reader.read(&mut buffer[..]) {
            Ok(length) => length,
            Err(e) => panic!("Reading error from decoder: {:?}", e),
        };

        if let Ok(receive) = ingest_receiver.try_recv() {
            if let Err(e) = player_writer.write_all(&receive) {
                panic!("Err: {:?}", e)
            };
            continue;
        }

        if let Err(e) = player_writer.write(&buffer[..bytes_len]) {
            panic!("Err: {:?}", e)
        };

        if bytes_len == 0 {
            break;
        }
    }

    *is_terminated.lock().unwrap() = true;

    sleep(Duration::from_secs(1));

    println!("Terminate decoder...");

    match &*decoder_term.lock().unwrap() {
        Some(dec) => unsafe {
            if let Ok(_) = dec.terminate() {
                println!("Terminate decoder done");
            }
        },
        None => (),
    }

    println!("Terminate encoder...");

    match &*player_term.lock().unwrap() {
        Some(enc) => unsafe {
            if let Ok(_) = enc.terminate() {
                println!("Terminate encoder done");
            }
        },
        None => (),
    }

    println!("Terminate server...");

    match &*server_term.lock().unwrap() {
        Some(serv) => unsafe {
            if let Ok(_) = serv.terminate() {
                println!("Terminate server done");
            }
        },
        None => (),
    }

    println!("Terminate done...");
}

Ist das so verständlich?

Wäre super, wenn mir da jemand helfen könnte!

Grüße
Jonathan
 
Zuletzt bearbeitet:
Ich würde erstmal schauen, wo genau es hängen bleibt. Notfalls mit print-Statements überall.

Könnte es nicht eventuell sein, dass einfach die "loop" in main weiterläuft, selbst wenn der ffplay Prozess beendet ist?
 
Danke @cx01, ich dachte da hätte ich schon alle Möglichkeiten durch gespielt. In den Loops bewegt sich auch nichts mehr weiter, das habe ich jetzt noch mal getestet. Allerdings hatte ich einen wichtigen Schritte missachtet... Nämlich den Backtrace beim Ausführen anzumachen. In Zeile 186 wird der Panic zwar ausgelöst, aber dieser Panic beendet nicht alle Prozesse. Das kann ich jetzt selbst dort mit einbauen.

Vielleicht noch eine Frage: Gibt es die Möglichkeit auf den process_control crate zu verzichten? Ich würde gerne nur mit child.kill und child.wait arbeiten, aber ich darf laut Kompiler davon nur eine mutible Referenz erzeugen, daher weiß ich nicht wie ich ansonsten die Prozesse kontrolliert schließen kann, wenn sie an verschiedenen Stellen/Threads ausgeführt werden.
 
Zuletzt bearbeitet:
Also "decoder_term" und "player_term" brauchst du (glaub ich) nicht; denn du kannst doch direkt player_proc.wait() und dec_proc.wait() aufrufen.

Problematisch ist also nur der "server_proc". Und da hast du doch die "is_terminated" Variable, die den Prozess dann letztlich beenden soll:

Code:
if *is_terminated.lock().unwrap() {
    break;
}

Da müsstest du nur noch zusätzlich den aktuellen Process merken und dann kill() bzw. wait() machen.
 
Ja stimmt, das muss ich mal probieren. Ganz eins zu eins stimmt der obere Code nicht mit meinem Programm überein. Die Schleife im Main ist normalerweise umgeben von einer weiteren Schleife (ob ich über eine Liste von Dateien drüber schleifen lasse), in der dann ein dec_proc.wait() am Ende ist. Ich hätte gerne am Ende noch mal ein dec_proc.kill() gehabt, falls irgendwo zwischen drin was nicht passt und der Prozess vorher noch nicht sauber schlossen hat. Aber ich teste das noch mal, vielleicht klappt das auch ohne.
 
Habe jetzt noch einen ganz blöden Fehler entdeckt. Und zwar jedes mal wenn ich von live auf offline Clip zurück switche erhöht sich der Arbeitsspeicherverbrauch. Und wenn ich von offline Clip auf Live wieder switche wird noch etwas vom vorherigen Live-Input abgespielt.

Also entweder im channel(), oder stdin/stdout wird noch was gebuffert, was nicht dahin gehört. Das Buffern erklärt für mich aber eigentlich nur die Tatsache, dass noch alte Frames abgespielt werden, aber nicht warum der Speicherverbrauch bei jedem Wechsel kontinuierlich weiter anwächst.

Edit: Ok, habe das doch lösen können, verwende nun einen sync_channel(), damit geht es.
 
Zuletzt bearbeitet:
Zurück
Oben