Join-Algorithmen von Apache Spark

Blog

Eine der am häufigsten verwendeten Transformationen in Apache Spark ist Join-Operation . Joins in Apache Spark ermöglichen es dem Entwickler, zwei oder mehr Datenrahmen basierend auf bestimmten (sortierbaren) Schlüsseln zu kombinieren. Die Syntax zum Schreiben einer Join-Operation ist einfach, aber manchmal geht verloren, was hinter dem Vorhang passiert. Intern schlägt Apache Spark für Joins einige Algorithmen vor und wählt dann einen davon aus. Wenn Sie nicht wissen, was diese internen Algorithmen sind und welchen Spark wählt, kann eine einfache Join-Operation teuer werden.

Bei der Entscheidung für einen Join-Algorithmus berücksichtigt Spark die Größe der beteiligten Datenrahmen. Es berücksichtigt den angegebenen Join-Typ und die angegebene Bedingung und den Hinweis (falls vorhanden), um endgültig über den zu verwendenden Algorithmus zu entscheiden. In den meisten Fällen, Zusammenführen sortieren und Shuffle Hash Join sind die beiden großen Power Horses, die die Spark SQL-Joins antreiben. Wenn Spark jedoch feststellt, dass die Größe eines der Datenframes kleiner als ein bestimmter Schwellenwert ist, meldet Spark Übertragung beitreten als Top-Anwärter.

Broadcast-Hash-Join

Betrachtet man den physischen Plan eines Join-Vorgangs, sieht ein Broadcast Hash Join in Spark wie folgt aus

Joins in Apache Spark: Broadcast Join

Der obige Plan zeigt, dass der Datenrahmen von einem der Zweige an jeden Knoten sendet, der den anderen Datenrahmen enthält. In jedem Knoten führt Spark dann den letzten Join-Vorgang durch. Das ist Sparks Kommunikationsstrategie pro Knoten .

Spark verwendet den Broadcast Hash Join, wenn die Größe des Datenrahmens kleiner als der in festgelegte Schwellenwert ist spark.sql.autoBroadcastJoinThreshold . Der Standardwert ist 10 MB, kann aber mit dem folgenden Code geändert werden

spark.conf.set('spark.sql.autoBroadcastJoinThreshold', 100 * 1024 * 1024)

Git-Flow vs. Github-Flow

Dieser Algorithmus hat den Vorteil, dass die andere Seite des Joins kein Shuffle erfordert. Wenn diese andere Seite sehr groß ist, bringt das Nichtdurchführen des Mischens eine bemerkenswerte Beschleunigung im Vergleich zu anderen Algorithmen, die das Mischen durchführen müssten.

Auch das Senden großer Datensätze kann zu Zeitüberschreitungsfehlern führen. Eine Konfiguration spark.sql.broadcastTimeout legt die maximale Zeit fest, die ein Broadcast-Vorgang dauern soll, nach deren Ablauf der Vorgang fehlschlägt. Der Standardwert für die Zeitüberschreitung beträgt 5 Minuten, kann aber wie folgt eingestellt werden:

spark.conf.set('spark.sql.broadcastTimeout', time_in_sec)

Zusammenführen beitreten sortieren

Wenn keiner der Datenrahmen gesendet werden kann, greift Spark auf Zusammenführen beitreten sortieren . Dieser Algorithmus verwendet die Knoten-Knoten Kommunikationsstrategie , wo Spark die Daten im Cluster mischt.

Sort Merge Join erfordert, dass beide Seiten des Joins die richtige Partitionierung und Reihenfolge aufweisen. Im Allgemeinen wird dies durch ** Shuffle und Sort** in beiden Zweigen des Joins sichergestellt, wie unten dargestellt

numpy multiple lineare Regression

#apache spark #scala #tech blogs #broadcast join #join operation #join Optimization #joins in spark #shuffled hash join #sort merge join

blog.knoldus.com

Join-Algorithmen von Apache Spark

Eine der am häufigsten verwendeten Transformationen in Apache Spark ist der Join-Vorgang. Joins in Apache Spark ermöglichen es dem Entwickler, zwei oder mehr Datenrahmen basierend auf bestimmten (sortierbaren) Schlüsseln zu kombinieren.