Python Numerische Simulation mit joblib/multiprocessing optimieren (numpy, pygame)

TheShooter

Lt. Junior Grade
Registriert
Juni 2011
Beiträge
370
Hallo liebe Forengemeinde,
ich habe ein Programm geschrieben, was nach Laplace die Temperaturentwicklung eines Systems iterativ berechnet. Da ich gerne jeden Iterationsschritt in Echtzeit sehen möchte, benutze ich pygame um die Ergebnisse grafisch darzustellen.
Hier erst einmal der Code:

Python:
#importiere wichtige Libraries
import pygame
import numpy as np
from math import pi
import random
from joblib import Parallel, delayed
import multiprocessing

max_temp = 100 #maximale Temperatur

breite = 100 #Breite des Bildes
hoehe = 100 #Höhe des Bildes
  
x = np.linspace(0,breite,breite+1,dtype=int) #generiere x-koordinaten
y = np.linspace(0,hoehe,hoehe+1,dtype=int)     #generiere y-koordinaten

area1_x = np.linspace(breite/2-10,breite/2+10,21,dtype=int) #x-koordinaten des ersten festen Temperaturbereichs
area1_y = np.linspace(hoehe/2-10,hoehe/2+10,21,dtype=int) #y-koordinaten des ersten festen Temperaturbereichs

rand = np.zeros((len(x),len(y))) #initialisiere das rand-array

def temp_color(temp): #konvertiere temperatur in Graustufenfarbe (0C...100C,schwarz...weiß)
    val = int(temp*2.55)
    return (val,val,val)
  
def draw_pixel(surface,pos,temp): #funktion um Pixel auf dem Schirm darzustellen
    surface.set_at(pos,temp_color(temp))
  
def generate_rand(): #generiere eine zufällige Temperatur
    rand = random.randrange(0,max_temp+1,1)
    return rand

def initialize_temperature(): #generiere für jeden Punkt eine zufällige Temperatur
    for i in range(len(x)):
        for j in range(len(y)):
            rand[i,j] = generate_rand()
  

def fixate_temperature(): #fixiere die Temperatur im Bereich area1
    for i in np.nditer(area1_x):
        for j in np.nditer(area1_y):
            rand[i,j] = 100
    for i in range(len(x)):
        rand[i,0] = 0
        rand[0,i-(breite-hoehe)] = 0
        rand[i,hoehe] = 0
        rand[breite,i-(breite-hoehe)] = 0
      
screen = pygame.display.set_mode((breite,hoehe)) #unser Bildschirm


def find_temp(i,j): #berechne die Temperatur basierend auf Laplace (iterativ später mit pool.map
        fixate_temperature() #fixiere area1
        #draw_pixel(screen,[i,j],rand[i,j])
        rand[i,j] = 1/4*(rand[i+1,j]+rand[i,j+1]+rand[i-1,j]+rand[i,j-1]) #Laplace in Action
        draw_pixel(screen,[i,j],rand[i,j]) #zeichne das neue Bild
        return rand
      
      
                  
initialize_temperature() #initialisiere die zufälligen Temperaturen


def main(): #die main-Funktion

    pygame.init() #initialisiere pygame
    pygame.display.set_caption("test-program") #Titel des Fensters
  
    running = True
    clock = pygame.time.Clock()

    while running:
        clock.tick(250) #Update-Intervall
        num_cores=multiprocessing.cpu_count() #Anzahl der logischen Prozessoren
        #iteriere über die find_temp-Funktion über alle Punkte
        Parallel(n_jobs=1)(delayed(find_temp)(i,j) for j in range(len(y)-1) for i in range(len(x)-1))
        print("ITER")
      
        for event in pygame.event.get(): #damit man das Fenster schließen kann
            if event.type == pygame.QUIT:
                running = False

        pygame.display.flip() #Zeichne ein neues Bild

    pygame.quit() #sauberes Ende
  
  
if __name__=="__main__": #für die Windows-Nutzer
    main()

Das Problem ist nun, dass die Simulation super läuft, wenn ich n_jobs=1 (line 76) setze. Wenn ich jedoch die Anzahl der Jobs erhöhe, arbeitet das Programm auch, die Simulation funktioniert dann nur leider nicht richtig.
Ich habe bisher fast gar keine Erfahrung mit Parallelization, ich denke sogar dass ich hier eigentlich multiprocessing benutzen müsste.

In der obig eingestellten Auflösung ist die Performance auf einem Prozessorkern noch akzeptabel, sobald ich aber in Richtugn SD gehe, dauert ein Iterationsschritt mehrere Minuten.
Vielleicht gibt es auch noch andere Ansätze, wie ich die Performance verbessern könnte?

Wäre für frische Ideen sehr dankbar :).

Beste Grüße
The Shooter

PS: Das ganze läuft unter Python3 und wird unter Arch-Linux (Zen-Kernel) ausgeführt. Oh, und sorry für das "Denglish" im Code. :)
 
Also im nächsten Step würde ich dir mal empfehlen die ganzen Core-Funktionalitäten in Klassen auszulagern damit das Ganze wesentlich sauberer strukturiert ist.

Was mir hier noch sehr auffällt ist, dass du sehr sehr häufig den random value generator aufrufst und ich denke das wird zu einem starken overhead führen der dich sehr stark ausbremst. Vielleicht wäre eine günstigere Idee da im Vorfeld ein Array mit random values zu befüllen und du pickst dir dann die Werte bei der eigentlichen Simulation nur noch aus dem Array heraus

https://lemire.me/blog/2016/03/21/ranged-random-number-generation-is-slow-in-python/

Ansonsten ja - Multi-Threading big part ;)

EDIT: Vergiss das mit dem random value, das machst du ja eh schon, hab ich falsch gelesen :D
 
TheShooter schrieb:
Wenn ich jedoch die Anzahl der Jobs erhöhe, arbeitet das Programm auch, die Simulation funktioniert dann nur leider nicht richtig.
Stichwort: Race Condition

Wenn zwei Threads gleichzeitig auf ein Objekt zugreifen, dann kann es zu Problemen kommen. Du musst das Problem also so zerlegen, dass das nicht passieren kann.
TheShooter schrieb:
Vielleicht gibt es auch noch andere Ansätze, wie ich die Performance verbessern könnte?
Python:
fixate_temperature()
ist ein absoluter Performance Fresser. Ich würde es folgendermaßen lösen:
Python:
def find_temp(i, j):
    if i == 0 or j == 0 or i == breite or j == hoehe:
        rand[i,j] = 0
    elif i in np.nditer(area1_x) and j in np.nditer(area1_y):
        rand[i,j] = max_temp
    else:
        rand[i,j] = (rand[i+1,j] + rand[i,j+1] + rand[i-1,j] + rand[i,j-1]) / 4
Das läuft bei mir um den Faktor 10 schneller. Das Ergebnis sollte, wenn mich nicht alles täuscht, identisch sein.
Wenn es noch schneller werden soll, dann würde ich mir mal Cython, PyPy oder Numba ansehen.

Nachtrag: Hab mal zwei Bilder angehängt, wie das Ganze bei mir nach ein paar Iterationen aussieht.

Hier mein aufs Wesentliche reduzierter Quellcode:
Python:
%matplotlib inline
from IPython import display
import matplotlib.pyplot as plt
import numpy as np
import random

max_temp = 100
breite = 100
hoehe = 100

rand = np.zeros((breite + 1, hoehe + 1))

area1 = np.zeros((breite + 1, hoehe + 1))
for i in range(int(breite / 2) - 10, int(breite / 2) + 10):
    for j in range(int(hoehe / 2) - 10, int(hoehe / 2) + 10):
        area1[i, j] = 1

def initialize_temperature():
    for i in range(breite):
        for j in range(hoehe):
            if i == 0 or j == 0 or i == breite or j == hoehe:
                rand[i, j] = 0
            elif area1[i, j]:
                rand[i, j] = max_temp
            else:
                rand[i, j] = random.randrange(0, max_temp + 1, 1)

def update():
    for i in range(1, breite):
        for j in range(1, hoehe):
            if not area1[i, j]:
                rand[i, j] = (rand[i+1, j] + rand[i, j+1] + rand[i-1, j] + rand[i, j-1]) / 4

initialize_temperature()

fig = plt.figure(figsize=(5, 5))

while True:
    try:
        update()
        plt.imshow(rand, cmap='rainbow')
        display.clear_output(wait=True)
        display.display(plt.gcf())
    except KeyboardInterrupt:
        break

fig.savefig("laplace.png", dpi=300, bbox_inches='tight')
 

Anhänge

  • laplace.png
    laplace.png
    49,1 KB · Aufrufe: 705
  • laplace.png
    laplace.png
    44,3 KB · Aufrufe: 725
Zuletzt bearbeitet:
  • Gefällt mir
Reaktionen: TheShooter
Vielen Dank für deinen Beitrag!
du hast Recht, das Vereinfachen dieser Funktion erhöht tatsächlich etwas die Performance.
Tut mir Leid, dass die Antwort erst jetzt kommt, ich habe mich etwas in die Thematik eingelesen bzgl. Race condition usw. Dabei habe ich auch etwas mit joblib und pool multiprocessing in python experimentiert.

Z.B. habe ich versucht mit beiden Ansätzen Quadratzahlen zu berechnen - die Berechnungszeit habe ich im Anhang mal visualisiert.

Dabei gibt es bei der Pool-Methode einen mit der Pool-Anzahl immer größer werdenden Performance-Vorteil.
Joblib ist jedoch generell langsamer und hat auch nicht die Skalierbarkeit von Pool.
Hast du eine Idee, woher das kommen könnte? Unterscheiden sich die Anwendungsgebiete von joblib und Pool, sodass vielleicht eine Mehtode eher für das Berechnen von Quadradzahlen geeignet ist? Vielleicht zeigt joblib seine Stärken in einem anderen Anwendungsgebiet?

Ich freue mich auf eine interessante Diskussion.

Beste Grüße
Maximilian Stucke
 

Anhänge

  • performance.png
    performance.png
    51,2 KB · Aufrufe: 531
Singlethreaded (10.0 s):
Python:
import matplotlib.pyplot as plt
import numpy as np
import random

max_temp = 100
breite = 100
hoehe = 100

rand = np.zeros((breite + 1, hoehe + 1))
area1 = np.zeros((breite + 1, hoehe + 1))

for i in range(int(breite / 2) - 10, int(breite / 2) + 10):
    for j in range(int(hoehe / 2) - 10, int(hoehe / 2) + 10):
        area1[i, j] = 1
        
def initialize_temperature():
    for i in range(breite):
        for j in range(hoehe):
            if i == 0 or j == 0 or i == breite or j == hoehe:
                rand[i, j] = 0
            elif area1[i, j]:
                rand[i, j] = max_temp
            else:
                rand[i, j] = random.randrange(0, max_temp + 1, 1)

def update():
    for i in range(1, breite):
        for j in range(1, hoehe):
            if not area1[i, j]:
                rand[i, j] = (rand[i+1, j] + rand[i, j+1] + rand[i-1, j] + rand[i, j-1]) / 4

initialize_temperature()

for i in range(1000):
    update()
    
fig = plt.figure(figsize=(5, 5))
plt.imshow(rand, cmap='rainbow')       
fig.savefig("laplace.png", dpi=300, bbox_inches='tight')]

Multithreaded (3.6 s):
Python:
import matplotlib.pyplot as plt
import multiprocessing
import numpy as np
import random
import math

max_temp = 100
breite = 100
hoehe = 100

rand = np.zeros((breite + 1, hoehe + 1))
area1 = np.zeros((breite + 1, hoehe + 1))

for i in range(int(breite / 2) - 10, int(breite / 2) + 10):
    for j in range(int(hoehe / 2) - 10, int(hoehe / 2) + 10):
        area1[i, j] = 1
        
def initialize_temperature():
    for i in range(breite):
        for j in range(hoehe):
            if i == 0 or j == 0 or i == breite or j == hoehe:
                rand[i, j] = 0
            elif area1[i, j]:
                rand[i, j] = max_temp
            else:
                rand[i, j] = random.randrange(0, max_temp + 1, 1)

def update_chunk(data):
    chunk, rand = data
    for i in chunk:
        for j in range(1, hoehe):
            if not area1[i, j]:
                rand[i, j] = (rand[i+1, j] + rand[i, j+1] + rand[i-1, j] + rand[i, j-1]) / 4
    return rand

initialize_temperature()

num_threads = 8
threadpool = multiprocessing.Pool(num_threads)
chunk_size = math.ceil(breite / num_threads)
chunks = [range(1, breite)[i:i + chunk_size] for i in range(0, breite-1, chunk_size)]
input_data = [[chunk, rand] for chunk in chunks]

for i in range(1000):
    results = threadpool.map(update_chunk, input_data)
    for idx, chunk in enumerate(chunks):
        for i in chunk:
            rand[i,:] = results[idx][i,:]
    
fig = plt.figure(figsize=(5, 5))
plt.imshow(rand, cmap='rainbow')       
fig.savefig("laplace.png", dpi=300, bbox_inches='tight')
 
Zuletzt bearbeitet:
Hier findest du den Quellcode. Ich bin mir ziemlich sicher, dass da irgendwo ein Fehler drinn ist, der das alles erklärt. Nur ich weiß mir gerade nicht zu helfen. :|

Vielen Dank für deine Hilfe und Geduld.

Python:
#importiere die ganzen goodies
from numba import jit
import numpy as np
import time
import multiprocessing
from joblib import Parallel, delayed
import matplotlib.pyplot as plt

inputs = np.arange(1000000) #inputs für später

def sqr(i): #berechne die Quadrate für alle i
    return i*i

def rand(i): #Zufallszahl mal i
    return i*np.random.rand(0,5000000)


def parallel(): #mit joblib parallel, delayed
    #definiere Grenzen für Job- und Poolanzahl
    j = 1
    lim = 9
    array_helper = 0
    x = np.zeros(lim-j)
    y = np.zeros(lim-j)

    while j<lim:
        start_time = time.time()
        Parallel(n_jobs=j)(delayed(sqr)(i) for i in inputs)
        print("number of cores:",j,",", "time to completion:",time.time()-start_time,"sec")
        x[array_helper] = j
        y[array_helper] = time.time()-start_time
        j+=1
        array_helper += 1
    return {'x':x, 'y':y}

def pool(): #mit pool map
    #definiere Grenzen für Job- und Poolanzahl
    j = 1
    lim = 9
    array_helper = 0
    x = np.zeros(lim-j)
    y = np.zeros(lim-j)

    while j<lim:
        start_time = time.time()
        p = multiprocessing.Pool(j)
        p.map(sqr, inputs)
        print("number of cores:",j,",", "time to completion:",time.time()-start_time,"sec")
        x[array_helper] = j
        y[array_helper] = time.time()-start_time
        j+=1
        array_helper += 1
    p.close()
    p.join()
    return {'x':x, 'y':y}

res1 = parallel() #Ergebnisse für joblib
res2 = pool() #Ergebnisse für pool

plt.figure(figsize=(np.sqrt(2)*10,10)) #Plotgröße

plt.subplot(211) #Erster subplot für joblib
plt.title("squares (0...1e6)")
plt.xlabel("number of jobs")
plt.ylabel("computation time [sec]")
plt.plot(res1['x'],res1['y'],label="data") #plotte Daten
plt.legend()

plt.subplot(212) #Zweiter subplot für pool
plt.xlabel("pool number")
plt.ylabel("computation time [sec]")
plt.plot(res2['x'],res2['y'],label="data") #plotte Daten
plt.legend()
plt.show()
 
Da sich die einzelnen Aufgaben extrem schnell berechnen lassen, wird der theoretische Geschwindigkeitsvorteil der parallelen Abarbeitung durch den Mehraufwand der Parallelisierung aufgefressen.

Bei joblib.Parallel scheint der Mehraufwand (overhead) deutlich größer zu sein, als bei multiprocessing.Pool.
Eine einfache Möglichkeit die Situation zu verbessern, wäre der Wechsel von backend="multiprocessing" auf backend="threading" (Erklärung: https://pythonhosted.org/joblib/parallel.html).
Python:
Parallel(n_jobs=num_threads, backend="threading")(map(delayed(sqr), inputs))

Ich hab den Code mal wieder etwas gekürzt:
Python:
from joblib import Parallel, delayed
import matplotlib.pyplot as plt
import multiprocessing
import time

def sqr(i):
    return i*i

def parallel(num_threads, inputs):
    Parallel(n_jobs=num_threads, backend="threading")(map(delayed(sqr), inputs))

def pool(num_threads, inputs):
    p = multiprocessing.Pool(num_threads)
    p.map(sqr, inputs)
    p.close()

def calc_squares(max_threads, func, inputs):
    x = []
    y = []
    for j in range(max_threads):
        num_threads = j + 1
        start_time = time.time()
        func(num_threads, inputs)
        x.append(num_threads)
        y.append(time.time() - start_time)
        print("number of threads:", num_threads, ",", "time to completion:", y[-1], "sec")
    return {'x':x, 'y':y}

res1 = calc_squares(8, parallel, range(1000000))
res2 = calc_squares(8, pool,     range(10000000))

fig = plt.figure(figsize=(15,10))

plt.subplot(211)
plt.grid()
plt.title("squares (0...1e6)")
plt.xlabel("number of jobs")
plt.ylabel("computation time [sec]")
plt.plot(res1['x'], res1['y'], label="data")
plt.legend()

plt.subplot(212)
plt.grid()
plt.title("squares (0...1e7)")
plt.xlabel("pool number")
plt.ylabel("computation time [sec]")
plt.plot(res2['x'], res2['y'], label="data")
plt.legend()

plt.show()
Ergebnis sieht bei mir dann folgendermaßen aus:
parallel_vs_pool.png
 

Anhänge

  • parallel_vs_pool.png
    parallel_vs_pool.png
    194,5 KB · Aufrufe: 560
Zuletzt bearbeitet:
Wenn man performance-Probleme mit Python hat gibts viele Möglichkeiten
1) Man parallelisiert in Python
=> Sehr aufwendig und wenig Aussichtsreich. Bringt maximal Faktor = numOfCpuCores
2) Man lässt performanteren Code erstellen
=> Sehr wenig aufwand, da automatisch möglich und bringt leicht Faktor 100 (zB sowas? https://de.wikipedia.org/wiki/Cython)
3) Man schreibt zumindest die rechenaufwendigen Teile der SW in C++
=> Das ist natürlich je nach C++-Fähigkeiten sehr Aufwendig aber bringt auch locker mal Faktor 1000 und mehr

Oder allgemeine Artikel zu dem Thema:
http://earthpy.org/speed.html
http://www.monitis.com/blog/7-ways-to-improve-your-python-performance/
https://wiki.python.org/moin/PythonSpeed/PerformanceTips

Selbst parallelisieren ist wie mit dem Kopf durch die Wand und am wenigsten aussichtsreich.
Das mindeste ist auch, dass man durch einen Profiler erstmal feststellt, welche Funktion bzw. welcher Codeabschnitt überhaupt das Problem ist...
 
Zuletzt bearbeitet:
Ob eine Parallesierung Sinn macht, hängt auch immer stark von dem gesetzten Problem ab. Für ein eigenes Projekt konnte ich durch Parallesireung auf einer GPU (fft von mehreren 10000 bildern) einen erheblichen Vorteil erzielen.
Im selben Projekt hat eine Race Condition die Parallelisierung an anderer Stelle obsolet gemacht.

Die Kunst besteht immer darin, das Problem, das man berechnet, auf die gewünschte Methode anzupassen. Manchmal gelingt das gut, manchmal nicht.

Trotzdem vielen Dank, ich konnte einiges für Python mitnehmen, das ich so nicht genau kannte und wofür ich nun einen Beispiel code habe
 
Zurück
Oben