Message Passing Interface
Message Passing Interface (MPI) ist ein Standard, der den Nachrichtenaustausch bei parallelen Berechnungen auf verteilten Computersystemen beschreibt. Er legt dabei eine Sammlung von Operationen und ihre Semantik, also eine Programmierschnittstelle fest, aber kein konkretes Protokoll und keine Implementierung.
Eine MPI-Applikation besteht in der Regel aus mehreren miteinander kommunizierenden Prozessen, die alle zu Beginn der Programmausführung parallel gestartet werden. Alle diese Prozesse arbeiten dann gemeinsam an einem Problem und nutzen zum Datenaustausch Nachrichten, welche explizit von einem zum anderen Prozess geschickt werden. Ein Vorteil dieses Prinzips ist es, dass der Nachrichtenaustausch auch über Rechnergrenzen hinweg funktioniert. Parallele MPI-Programme sind somit sowohl auf PC-Clustern (hier funktioniert der Austausch der Nachrichten z. B. über TCP), als auch auf dedizierten Parallelrechnern (hier läuft der Nachrichtenaustausch dann z. B. über den gemeinsamen Hauptspeicher) ausführbar.
Punkt-zu-Punkt-Kommunikation
Blockierendes Senden und Empfangen
Die grundlegenden Operationen für eine Punkt-zu-Punkt-Kommunikation sind senden und empfangen:
int MPI_Send (void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm)
- buf: Zeiger auf den Sendepuffer
- count: Zahl der Elemente im Sendepuffer
- datatype: Datentyp der Elemente im Sendepuffer
- dest: Rang des Zielprozesses
- tag: Markierung der Nachricht
- comm: Kommunikator der Prozessgruppe
int MPI_Recv (void* buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status* status)
- buf: Zeiger auf einen Empfangspuffer ausreichender Größe
- count: Zahl der Elemente im Empfangspuffer
- datatype: Datentyp der Elemente im Empfangspuffer
- source: Rang des Quellprozesses (mit source=MPI_ANY_SOURCE wird von einem beliebigen Prozess empfangen)
- tag: erwartete Markierung der Nachricht (mit tag=MPI_ANY_TAG wird jede Nachricht empfangen)
- comm: Kommunikator der Prozessgruppe
- status: Zeiger auf eine Statusstruktur, in der Informationen über die empfangene Nachricht abgelegt werden sollen
Die beiden Operationen sind blockierend und asynchron. Das bedeutet:
- MPI_Recv kann ausgeführt werden, bevor das zugehörige MPI_Send gestartet wurde
- MPI_Recv blockiert, bis die Nachricht vollständig empfangen wurde
Analog gilt:
- MPI_Send kann ausgeführt werden, bevor das zugehörige MPI_Recv gestartet wurde
- MPI_Send blockiert, bis der Sendepuffer wiederverwendet werden kann (d.h. die Nachricht vollständig übermittelt oder zwischengepuffert wurde)
Programmbeispiel
Die Verwendung von MPI_Send und MPI_Recv wird im folgenden ANSI-C-Beispiel für 2 MPI Prozesse veranschaulicht:
#include "mpi.h"
#include <stdio.h>
#include <string.h>
int main(int argc, char *argv[])
{
char message[20];
int myrank, tag=99;
MPI_Status status;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
if (myrank == 0) {
strcpy(message, "Hello, there");
MPI_Send(message, strlen(message)+1, MPI_CHAR, 1, tag, MPI_COMM_WORLD);
}
else {
MPI_Recv(message, 20, MPI_CHAR, 0, tag, MPI_COMM_WORLD, &status);
printf("received \"%s\"\n", message);
}
MPI_Finalize();
return 0;
}
Nichtblockierende Kommunikation
Die Effizienz einer parallelen Anwendung kann oftmals gesteigert werden, indem man Kommunikation mit Berechnung überlappt. Dazu definiert der MPI Standard sogenannte nichtblockierende Kommunikation, bei der die Kommunikationsoperation lediglich angestoßen wird. Eine separate Funktion muss dann aufgerufen werden, um solch eine Operation zu beenden.
int MPI_Isend (void* buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request* request)
- ...
- request: Adresse der Datenstruktur, die Informationen zur Operation enthält
int MPI_Irecv (void* buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Request* request)
- ...
Fortschritt abfragen
Um den Fortschritt einer dieser Operationen zu erfahren, wird folgende Operation verwendet:
int MPI_Test (MPI_Request* request, int* flag, MPI_Status* status)
Wobei flag=1 oder 0 gesetzt wird, je nach dem, ob die Operation abgeschlossen ist oder noch andauert.
Blockierend warten
Um dennoch blockierend auf eine MPI_Isend- oder MPI_Irecv-Operation zu warten, wird folgende Operation benutzt:
int MPI_Wait (MPI_Request* request, MPI_Status* status)
Synchronisierendes Senden
Für die Sendeoperationen werden auch synchronen Varianten MPI_Ssend und MPI_Issend definiert. In diesem Modus wird das Senden erst dann beendet, wenn die zugehörige Empfangsoperation begonnen wurde.
Puffernde Varianten
...
Gruppen und Kommunikatoren
Prozesse lassen sich in Gruppen zusammenfassen, wobei jedem Prozess eine eindeutige Nummer, der sogenannte Rang zugeordnet wird. Für den Zugriff auf eine Gruppe wird ein Kommunikator benötigt. Soll also eine globale Kommunikationsoperation auf eine Gruppe beschränkt werden, so muss der zur Gruppe gehörende Kommunikator angegeben werden. Der Kommunikator für die Menge aller Prozesse heißt MPI_COMM_WORLD.
Die zum Kommunikator comm gehörende Gruppe erhält man mit
int MPI_Comm_group (MPI_Comm comm, MPI_Group* group)
Für Prozessgruppen stehen die üblichen Mengenoperationen zur Verfügung.
Vereinigung
Zwei Gruppen group1 und group2 können zu einer neuen Gruppe new_group vereinigt werden:
int MPI_Group_union (MPI_Group group1, MPI_Group group2, MPI_Group* new_group)
Die Prozesse aus group1 behalten ihre ursprüngliche Nummerierung. Die aus group2, die nicht bereits in der ersten enthalten sind, werden fortlaufend weiter nummeriert.
Schnittmenge
Die Schnittmenge zweier Gruppen erhält man mit
int MPI_Group_intersection (MPI_Group group1, MPI_Group group2, MPI_Group* new_group)
Differenz
Die Differenz zweier Gruppen erhält man mit
int MPI_Group_difference (MPI_Group group1, MPI_Group group2, MPI_Group* new_group)
Globale Kommunikation
In parallelen Anwendungen trifft man nicht selten spezielle Kommunikationsmuster an, bei denen mehrere MPI Prozesse gleichzeitig beteiligt sind. Der Standard hat deswegen für einige übliche Operationen eigene Funktionen definiert. Bei vielen davon gibt es einen ausgewählten MPI Prozess, der eine Sonderrolle einnimmt, und der typischerweise mit root bezeichnet wird.
Broadcast (ausstrahlen)
Mit der Broadcast-Operation schickt ein ausgewählter MPI Prozess root allen anderen Prozessen in seiner Gruppe comm die gleichen Daten. Die dafür definierte Funktion ist dabei für alle beteiligten Prozesse identisch:
int MPI_Bcast (void *buffer, int count, MPI_Datatype type, int root, MPI_Comm comm)
Der MPI Prozess root stellt in buffer seine Daten zur Verfügung, während die anderen Prozesse hier die Adresse ihres Empfangspuffers übergeben. Die restlichen Parameter müssen bei allen Prozessen gleich (bzw. gleichwertig) sein. Nachdem die Funktion zurückkehrt, befinden sich in allen Puffern die Daten, die ursprünglich nur bei root vorhanden waren.
Gather (sammeln)
Mit der Gather-Operation sammelt der MPI Prozess root die Daten aller beteiligten Prozesse ein. Die Daten aller Sendepuffer werden dabei (nach Rang sortiert) hintereinander im Empfangspuffer abgelegt:
int MPI_Gather (void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
Vektorbasierte Variante
Die vektorbasierte Variante der Gather-Operation erlaubt eine prozessabhängige Anzahl von Elementen:
int MPI_Gatherv (void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int *recvcounts, int *displs, MPI_Datatype recvtype, int root, MPI_Comm comm)
- recvcounts: Feld, das die Zahl der Elemente enthält, die von den einzelnen Prozessen empfangen werden (nur für root relevant)
- displs: Feld, dessen Eintrag i die Verschiebung im Empfangspuffer festlegt, bei der die Daten von Prozess i abgelegt werden sollen (ebenfalls nur für root relevant)
Bei den Feldern ist zu beachten, dass im Empfangspuffer zwar Lücken erlaubt sind aber keine Überlappungen.
Sollen also etwa von 3 Prozessen jeweils 1, 2 und 3 Elemente vom Typ Integer empfangen werden, so muss recvcounts = {1, 2, 3}
und displs = {0, 1*sizeof(int), 3*sizeof(int)}
gesetzt werden.
Scatter (streuen)
Mit einer Scatter-Operation schickt der MPI Prozess root jedem beteiligten Prozess ein unterschiedliches aber gleichgroßes Datenelement:
int MPI_Scatter (void *sendbuf, int *sendcounts, int *displs, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
Vektorbasierte Variante
int MPI_Scatterv (void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)
Akkumulation
Die Akkumulation ist eine spezielle Form der Gather-Operation. Hierbei werden ebenfalls die Daten aller beteiligten Prozesse aufgesammelt, aber zusätzlich noch mittels einer festgelegten Reduktionsoperation zu einem Datum reduziert.
int MPI_Reduce (void *sendbuf, void *recvbuf, int count, MPI_Datatype type, MPI_Op op, int root, MPI_Comm comm)
Für den Parameter op existieren dabei folgende vordefinierte Reduktionsoperationen:
Logische Operationen
- MPI_LAND: logische UND-Verknüpfung
- MPI_BAND: bitweise UND-Verknüpfung
- MPI_LOR: logische ODER-Verknüpfung
- MPI_BOR: bitweise ODER-Verknüpfung
- MPI_LXOR: logische exklusiv-ODER-Verknüpfung
- MPI_BXOR: bitweise exklusiv-ODER-Verknüpfung
Arithmetische Operationen
- MPI_MAX: Maximum
- MPI_MIN: Minimum
- MPI_SUM: Summe
- MPI_PROD: Produkt
- MPI_MINLOC: Minimum mit Prozess
- MPI_MAXLOC: Maximum mit Prozess
Die Operationen MPI_MINLOC und MPI_MAXLOC geben zusätzlich den Rang des MPI Prozesses zurück, der das Ergebnis bestimmte.
Benutzerdefinierte Operationen
Zusätzlich zu den oben genannten Operationen können auch eigene definiert werden:
int MPI_Op_create (MPI_User_function *function, int commute, MPI_Op *op)
Genaueres in [1]
Allgather
Bei der Allgather-Operation schickt jeder Prozess an jeden anderen Prozess die gleichen Daten. Es handelt sich also um eine Multi-Broadcast-Operation, bei der es keinen gesonderten MPI Prozess gibt.
int MPI_Allgather (void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
All-to-all (Gesamtaustausch)
int MPI_Alltoall (void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm)
Des weiteren gibt es noch die synchronisierende MPI_Barrier-Operation. Diese Funktion kehrt erst zurück, nachdem alle in der angegebenen Gruppe befindlichen MPI Prozesse diesen Teil des Programmes erreicht haben.
MPI-2
Seit 1997 ist eine zweite Version des MPI-Standards verfügbar, die einige Erweiterungen zu dem weiterhin bestehenden MPI-1.1 Standard hinzufügt. Zu diesen Erweiterungen gehören unter anderem
- eine dynamische Prozessverwaltung, d.h. Prozesse können nun zur Laufzeit erzeugt und gelöscht werden
- [paralleler] Zugriff auf das Dateisystem
- Spezifikation zusätzlicher Sprachschnittstellen (wie C++)
Genaueres in [2]
Implementierungen
C/C++
Die erste Implementierung des MPI-1.x-Standards war MPICH vom Argonne National Laboratory und der Mississippi State University. Mittlerweile ist MPICH2 verfügbar, das den MPI-2.1-Standard implementiert. LAM/MPI vom Ohio Supercomputing Center war eine weitere freie Version.