✍ Omar HAYAK / Temps de lecture 12 minutes
Si votre travail consiste à analyser des données, vous avez certainement rencontré des questions dont la réponse est intuitive mais difficile à exprimer en pur SQL/Spark. Si vous avez déjà essayé de calculer une moyenne glissante ou le rang d’une ligne, sans doute avez-vous pensé à ça : si seulement je pouvais itérer sur toutes les lignes de ma requête. Cette intuition ouvre les portes sur un labyrinthe de workarounds : écrire des jointures complexes, utiliser Excel ou faire le calcul hors base de données. Mais toutes les alternatives ne sont pas acceptables.
En outre, il existe les Window functions ou fonctions de fenêtrage qui améliorent considérablement l’expressivité de SQL et Spark. Cet article vous fera découvrir ces fonctions à travers des exemples pratiques.
Définition
Avant Spark 1.4, on pouvait distinguer deux types de fonctions :
- Les fonctions built-in ou UDFs telles que
substr
ou round
, qui prennent des valeurs d’une seule ligne en entrée et génèrent une valeur de retour unique pour chaque ligne en entrée.
- Les fonctions d’agrégation, telles que
sum
ou max
, fonctionnent sur un groupe de lignes et calculent une valeur de retour unique pour chaque groupe.
Bien que celles-ci soient très utiles dans la pratique, il existe des opérations qui ne peuvent pas être exprimées en utilisant uniquement ces types de fonctions. Plus précisément, il n’y avait aucun moyen pour opérer sur un groupe de lignes tout en renvoyant une seule valeur pour chaque ligne en entrée.
Cette limitation complexifie l’exécution de diverses tâches de traitement des données, tel que le calcul d’une moyenne glissante, le rang d’une ligne ou l’accès aux valeurs d’une ligne avant ou après la ligne actuelle. Heureusement pour les utilisateurs de Spark SQL, les window functions introduites par Spark 1.4 comblent cette lacune.
Une window function (fonction de fenêtrage) calcule une valeur de retour pour chaque ligne d’une table à partir d’un groupe de lignes appelé Frame. Chaque ligne d’entrée peut être associée à un Frame unique. Cette caractéristique fondamentale rend les fonctions de fenêtrage plus puissantes. Cela permet aux utilisateurs d’exprimer diverses tâches de traitement de données difficiles (voir impossibles) à exprimer de manière concise.
Prenons cet exemple tiré du chapitre 3.5 Window Functions de la documentation PostgreSQL:
import sys
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.types as t
spark = SparkSession.builder \
.master("local") \
.appName("WindowsAreGood") \
.getOrCreate()
schema = t.StructType([
t.StructField('depName', t.StringType(), False),
t.StructField('empNo', t.IntegerType(), False),
t.StructField('salary', t.IntegerType(), False),
])
data = [
("sales", 1, 5000),
("personnel", 2, 3900),
("sales", 3, 4800),
("sales", 4, 4800),
("personnel", 5, 3500),
("develop", 7, 4200),
("develop", 8, 6000),
("develop", 9, 4500),
("develop", 10, 5200),
("develop", 11, 5200)
]
sdf = spark.createDataFrame(data, schema=schema)
Ce code fourni le DataFrame suivant:
+---------+-----+------+
| depName|empNo|salary|
+---------+-----+------+
| sales| 1| 5000|
|personnel| 2| 3900|
| sales| 3| 4800|
| sales| 4| 4800|
|personnel| 5| 3500|
| develop| 7| 4200|
| develop| 8| 6000|
| develop| 9| 4500|
| develop| 10| 5200|
| develop| 11| 5200|
+---------+-----+------+
On souhaite calculer le salaire moyen par département et sans utiliser les fonctions de fenêtrage :
sdf.groupBy('depName').agg(f.avg('salary').alias('avg')).show()
+---------+-----------------+
| depName| avg|
+---------+-----------------+
| develop| 5020.0|
| sales|4866.666666666667|
|personnel| 3700.0|
+---------+-----------------+
Pour obtenir le même résultat avec les window functions, on utilisera le module spark Window
. Ce dernier contient les outils nécessaires pour manipuler les window functions, notamment l’objet WindowSpec
qu’on va utiliser pour définir le Frame. Dans cet exemple, un frame est l’ensemble des lignes du même département. Plus de détails dans le chapitre suivant.
byDepName = Window.partitionBy('depName')
sdf.withColumn("avg", f.avg('salary').over(byDepName)).show()
+---------+-----+------+-----------------+
| depName|empNo|salary| avg|
+---------+-----+------+-----------------+
| develop| 7| 4200| 5020.0|
| develop| 8| 6000| 5020.0|
| develop| 9| 4500| 5020.0|
| develop| 10| 5200| 5020.0|
| develop| 11| 5200| 5020.0|
| sales| 1| 5000|4866.666666666667|
| sales| 3| 4800|4866.666666666667|
| sales| 4| 4800|4866.666666666667|
|personnel| 2| 3900| 3700.0|
|personnel| 5| 3500| 3700.0|
+---------+-----+------+-----------------+
Cet exemple montre exactement la différence entre le fonctionnement d’une fonction d’agrégation et une fonction de fenêtrage. Certes, le fond des deux résultats est identique, mais le format est différent. Une fonction de fenêtrage ne regroupe pas les lignes et conserve leurs identités distinctes.
Pour obtenir le même format à l’aide d’une agrégation, il faut ajouter une jointure avec le DataFrame initial pour chaque agrégation.
WindowSpec
WindowSpec
est une spécification qui définit quelles lignes sont incluses dans le frame, c’est-à-dire l’ensemble des lignes associées à la ligne actuelle. WindowSpec prend les éléments suivants lors de sa création :
- Partition : définit les enregistrements dans la même partition. Sans partition définie, tous les enregistrements appartiennent à une seule partition.
- Ordre : définit la façon dont les enregistrements dans une partition sont ordonnés, ce qui définit à son tour la position d’un enregistrement dans une partition.
- Cadre : définit les lignes à inclure dans la fenêtre de la ligne actuelle, en fonction de la position relative par rapport à la ligne actuelle. Par exemple : « Les trois lignes précédant la ligne actuelle vers la ligne actuelle » décrit un cadre comprenant la ligne d’entrée actuelle et trois lignes apparaissant avant.
En pratique, on utilise les fonctions suivantes pour définir les spécifications d’une fenêtre :
orderBy :
Crée un WindowSpec avec l’ordre défini.
partitionBy :
Crée un WindowSpec avec le partitionnement défini.
rowsBetween :
Crée un WindowSpec avec les limites du cadre définies, de start
(inclus) à end
(inclus). Les deux start
et end
sont des positions par rapport à la ligne actuelle, en fonction de sa position dans la partition.
windowSpec = Window.rowsBetween(-2, 1)
sdf.withColumn("first_empNo", f.first("empNo").over(windowSpec))\
.withColumn("last_empNo", f.last("empNo").over(windowSpec))\
.withColumn("frame_size", f.count("empNo").over(windowSpec))\
.show()
+---------+-----+------+-----------+----------+----------+
| depName|empNo|salary|first_empNo|last_empNo|frame_size|
+---------+-----+------+-----------+----------+----------+
| sales| 1| 5000| 1| 2| 2|
|personnel| 2| 3900| 1| 3| 3|
| sales| 3| 4800| 1| 4| 4|
| sales| 4| 4800| 2| 5| 4|<= currentRow - 2
|personnel| 5| 3500| 3| 7| 4|<= currentRow - 1
| develop| 7| 4200| 4| 8| 4|<= currentRow
| develop| 8| 6000| 5| 9| 4|<= currentRow + 1
| develop| 9| 4500| 7| 10| 4|
| develop| 10| 5200| 8| 11| 4|
| develop| 11| 5200| 9| 11| 3|
+---------+-----+------+-----------+----------+----------+
Dans l’exemple ci-dessus, la fenêtre définit un frame de 4 lignes:
- la ligne courante
- les deux lignes précédentes
- la ligne suivante
À l’exception des lignes aux extrémités, le frame est plus petit.
+---------+-----+------+-----------+----------+----------+
| depName|empNo|salary|first_empNo|last_empNo|frame_size|
+---------+-----+------+-----------+----------+----------+
| sales| 1| 5000| 1| 2| 2|<= currentRow
|personnel| 2| 3900| 1| 3| 3|<= currentRow + 1
| sales| 3| 4800| 1| 4| 4|
| sales| 4| 4800| 2| 5| 4|
|personnel| 5| 3500| 3| 7| 4|
| develop| 7| 4200| 4| 8| 4|
| develop| 8| 6000| 5| 9| 4|
| develop| 9| 4500| 7| 10| 4|
| develop| 10| 5200| 8| 11| 4|
| develop| 11| 5200| 9| 11| 3|
+---------+-----+------+-----------+----------+----------+
rangeBetween :
Crée un WindowSpec avec les limites du frame définies, de start
(inclus) à end
(inclus). Les deux start
et end
sont relatifs à la ligne actuelle, en fonction de la valeur de la colonne dans ORDER BY
. Par conséquent, rangeBetween
ne peut être utilisée que dans la définition d’une WindowSpec
ordonnée.
windowSpec = Window.orderBy("salary").rangeBetween(-1000, 500)
sdf.withColumn("range", f.concat(f.lit("["), f.col("salary")-1000,f.lit(","),f.col("salary")+500, f.lit("]"))) \
.withColumn("frame_first", f.first("salary").over(windowSpec))\
.withColumn("frame_last", f.last("salary").over(windowSpec))\
.withColumn("frame_count", f.count("empNo").over(windowSpec))\
.show()
+---------+-----+------+-----------+-----------+----------+-----------+
| depName|empNo|salary| range|frame_first|frame_last|frame_count|
+---------+-----+------+-----------+-----------+----------+-----------+
|personnel| 5| 3500|[2500,4000]| 3500| 3900| 2|
|personnel| 2| 3900|[2900,4400]| 3500| 4200| 3|<= inRange
| develop| 7| 4200|[3200,4700]| 3500| 4500| 4|<= inRange
| develop| 9| 4500|[3500,5000]| 3500| 5000| 7|<= inRange
| sales| 3| 4800|[3800,5300]| 3900| 5200| 8|<= inRange
| sales| 4| 4800|[3800,5300]| 3900| 5200| 8|<= currentRow
| sales| 1| 5000|[4000,5500]| 4200| 5200| 7|<= inRange
| develop| 10| 5200|[4200,5700]| 4200| 5200| 7|<= inRange
| develop| 11| 5200|[4200,5700]| 4200| 5200| 7|<= inRange
| develop| 8| 6000|[5000,6500]| 5000| 6000| 4|
+---------+-----+------+-----------+-----------+----------+-----------+
Dans l’exemple ci-dessus, la fenêtre définit un frame dont les éléments sont ordonnés par revenu. La taille du frame dépend de la valeur du salaire de la ligne courante.
Si la valeur du salaire est 4800, alors toutes les lignes dont la valeur du salaire sont comprises entre 4800 - 1000
et 4800 + 500
(Càd 3800 et 5300) seront incluses dans le frame.
Pour définir un frame dont les bornes sont “infinies”, on utilise les valeurs suivantes:
- currentRow : valeur représentant la ligne actuelle utilisée pour définir les limites du frame
- unboundedFollowing : valeur représentant la dernière ligne d’une partition (équivalente à “UNBOUNDED FOLLOWING” dans SQL)
- unboundedPreceding : valeur représentant la première ligne d’une partition (équivalente à “UNBOUNDED PRECEDING” dans SQL)
Après avoir défini une WindowSpec, il ne reste que choisir/créer une window function.
Window Functions
Spark SQL prend en charge les window functions suivantes :
- les fonctions de classement :
rank
, dense_rank
, percent_rank
, ntile
, row_number
- les fonctions analytiques :
cume_dist
, first
, last
, lag
, lead
- les fonctions d’agrégation : les utilisateurs peuvent utiliser n’importe quelle fonction d’agrégation existante en tant que fonction de fenêtrage. (
sum
, avg
, min
, max
et count
)
- User-defined window functions avec
pandas_udf
:
Pour plus de détails sur ces fonctions, veuillez consulter la documentation de Spark.
Pour utiliser les window functions, l’utilisateur doit associer la fonction à une fenêtre :
- Ajout d’une clause OVER après une fonction prise en charge dans SQL, par exemple
avg (salary) OVER (...);
ou
- Appel de la méthode
over()
sur une fonction prise en charge dans l’API DataFrame, par exemple rank().over(windowSpec)
Exemple
On veut répondre aux questions suivantes :
- Quel est le top 2 des salaires dans chaque département ?
- Pour chaque salarié dans un même département, quelle est la différence de revenu par rapport au salarié le mieux payé ?
Pour répondre à la première question, nous devons classer les salariés dans un département en fonction de leurs revenus et sélectionner les deux mieux payés. Vous trouverez ci-dessous le code utilisé pour résoudre cette question en utilisant la fonction de fenêtrage dense_rank
:
windowSpec = Window.partitionBy('depName').orderBy(f.col('salary').desc())
sdf.withColumn("rank", f.dense_rank().over(windowSpec)).where(f.col('rank') <= 2).show()
+---------+-----+------+----+
| depName|empNo|salary|rank|
+---------+-----+------+----+
| develop| 8| 6000| 1|
| develop| 10| 5200| 2|
| develop| 11| 5200| 2|
| sales| 1| 5000| 1|
| sales| 3| 4800| 2|
| sales| 4| 4800| 2|
|personnel| 2| 3900| 1|
|personnel| 5| 3500| 2|
+---------+-----+------+----+
En Spark SQL ça donne :
SELECT *
FROM (
SELECT
depName,
empNo,
salary,
dense_rank() OVER (PARTITION BY depName ORDER BY salary DESC) as rank
FROM depSalaries) tmp
WHERE
rank <= 2
Essayez d’écrire une requête SQL équivalente sans fenêtrage pour se rendre compte à quel point c’est pénible.
Pour la deuxième question, nous devons trouver le revenu le plus élevé dans un même département et pour chaque ligne. Voici un programme pour répondre à cette question.
windowSpec = Window.partitionBy('depName').orderBy(f.col('salary').desc())
sdf.withColumn("diff", f.max(f.col('salary')).over(windowSpec) - f.col('salary')).show()
+---------+-----+------+----+
| depName|empNo|salary|diff|
+---------+-----+------+----+
| develop| 8| 6000| 0|
| develop| 10| 5200| 800|
| develop| 11| 5200| 800|
| develop| 9| 4500|1500|
| develop| 7| 4200|1800|
| sales| 1| 5000| 0|
| sales| 3| 4800| 200|
| sales| 4| 4800| 200|
|personnel| 2| 3900| 0|
|personnel| 5| 3500| 400|
+---------+-----+------+----+
Sans utiliser les fenêtres, il faut trouver le revenu le plus élevé dans chaque département. Puis joindre cet ensemble de données dérivées à la table d’origine pour calculer les différences de revenus.
max_salary = sdf.groupBy('depName').agg(f.max(f.col('salary')).alias('max'))
sdf.join(max_salary, 'depName').withColumn("diff", f.col('max') - f.col('salary')).show()
+---------+-----+------+----+----+
| depName|empNo|salary| max|diff|
+---------+-----+------+----+----+
| develop| 7| 4200|6000|1800|
| develop| 8| 6000|6000| 0|
| develop| 9| 4500|6000|1500|
| develop| 10| 5200|6000| 800|
| develop| 11| 5200|6000| 800|
| sales| 1| 5000|5000| 0|
| sales| 3| 4800|5000| 200|
| sales| 4| 4800|5000| 200|
|personnel| 2| 3900|3900| 0|
|personnel| 5| 3500|3900| 400|
+---------+-----+------+----+----+
Un autre exemple à démontrer la puissance des fonctions de fenêtrage est le calcul d’une somme cumulée.
windowSpec = Window.partitionBy('depName').orderBy(f.col('salary').asc())\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
sdf.withColumn("sum_cum", f.sum(f.col('salary')).over(windowSpec)).show()
+---------+-----+------+-----+
| depName|empNo|salary| cum|
+---------+-----+------+-----+
| develop| 7| 4200| 4200|
| develop| 9| 4500| 8700|
| develop| 10| 5200|13900|
| develop| 11| 5200|19100|
| develop| 8| 6000|25100|
| sales| 3| 4800| 4800|
| sales| 4| 4800| 9600|
| sales| 1| 5000|14600|
|personnel| 5| 3500| 3500|
|personnel| 2| 3900| 7400|
+---------+-----+------+-----+
Cette fois, on a utilisé la fonction rowsBetween
pour créer un frame délimité par la ligne actuelle currentRow
et toutes les lignes précédentes Window.unboundedPreceding
(−∞).
Spark ignore automatiquement les lignes précédentes et suivantes, si la ligne actuelle est la première ou la dernière ligne.
LEAD & LAG
lag et lead peuvent être utilisés lorsque nous voulons obtenir un résultat relatif entre les lignes: lag signifie obtenir la valeur de la ligne précédente; lead signifie obtenir la valeur de la ligne suivante.
overCategory = Window.partitionBy('depName').orderBy(f.col('salary').desc())
df = sdf.withColumn("lead", f.lead('salary', 1).over(overCategory))\
.withColumn("lag", f.lag('salary', 1).over(overCategory))
df.show()
+---------+-----+------+----+----+
| depName|empNo|salary|lead| lag|
+---------+-----+------+----+----+
| develop| 8| 6000|5200|null|
| develop| 10| 5200|5200|6000|
| develop| 11| 5200|4500|5200|
| develop| 9| 4500|4200|5200|
| develop| 7| 4200|null|4500|
| sales| 1| 5000|4800|null|
| sales| 3| 4800|4800|5000|
| sales| 4| 4800|null|4800|
|personnel| 2| 3900|3500|null|
|personnel| 5| 3500|null|3900|
+---------+-----+------+----+----+
Remarquez la première ligne de la colonne lag
de chaque groupe a la valeur nulle, et la dernière ligne de la colonne lead
de chaque groupe a la valeur nulle.
Après avoir calculé la différence, nous pouvons trouver des valeurs aberrantes qui présentent un énorme écart salarial.
df.fillna(0).withColumn("higher_than_next", f.col('salary') - f.col('lead'))\
.withColumn("lower_than_previous", f.col('lag') - f.col('salary'))\
.show()
+---------+-----+------+----+----+----------------+-------------------+
| depName|empNo|salary|lead| lag|higher_than_next|lower_than_previous|
+---------+-----+------+----+----+----------------+-------------------+
| develop| 8| 6000|5200| 0| 800| -6000|
| develop| 10| 5200|5200|6000| 0| 800|
| develop| 11| 5200|4500|5200| 700| 0|
| develop| 9| 4500|4200|5200| 300| 700|
| develop| 7| 4200| 0|4500| 4200| 300|
| sales| 1| 5000|4800| 0| 200| -5000|
| sales| 3| 4800|4800|5000| 0| 200|
| sales| 4| 4800| 0|4800| 4800| 0|
|personnel| 2| 3900|3500| 0| 400| -3900|
|personnel| 5| 3500| 0|3900| 3500| 400|
+---------+-----+------+----+----+----------------+-------------------+
Références
✍ Omar HAYAK / Temps de lecture 12 minutes
Si votre travail consiste à analyser des données, vous avez certainement rencontré des questions dont la réponse est intuitive mais difficile à exprimer en pur SQL/Spark. Si vous avez déjà essayé de calculer une moyenne glissante ou le rang d’une ligne, sans doute avez-vous pensé à ça : si seulement je pouvais itérer sur toutes les lignes de ma requête. Cette intuition ouvre les portes sur un labyrinthe de workarounds : écrire des jointures complexes, utiliser Excel ou faire le calcul hors base de données. Mais toutes les alternatives ne sont pas acceptables.
En outre, il existe les Window functions ou fonctions de fenêtrage qui améliorent considérablement l’expressivité de SQL et Spark. Cet article vous fera découvrir ces fonctions à travers des exemples pratiques.
Définition
Avant Spark 1.4, on pouvait distinguer deux types de fonctions :
substr
ouround
, qui prennent des valeurs d’une seule ligne en entrée et génèrent une valeur de retour unique pour chaque ligne en entrée.sum
oumax
, fonctionnent sur un groupe de lignes et calculent une valeur de retour unique pour chaque groupe.Bien que celles-ci soient très utiles dans la pratique, il existe des opérations qui ne peuvent pas être exprimées en utilisant uniquement ces types de fonctions. Plus précisément, il n’y avait aucun moyen pour opérer sur un groupe de lignes tout en renvoyant une seule valeur pour chaque ligne en entrée.
Cette limitation complexifie l’exécution de diverses tâches de traitement des données, tel que le calcul d’une moyenne glissante, le rang d’une ligne ou l’accès aux valeurs d’une ligne avant ou après la ligne actuelle. Heureusement pour les utilisateurs de Spark SQL, les window functions introduites par Spark 1.4 comblent cette lacune.
Une window function (fonction de fenêtrage) calcule une valeur de retour pour chaque ligne d’une table à partir d’un groupe de lignes appelé Frame. Chaque ligne d’entrée peut être associée à un Frame unique. Cette caractéristique fondamentale rend les fonctions de fenêtrage plus puissantes. Cela permet aux utilisateurs d’exprimer diverses tâches de traitement de données difficiles (voir impossibles) à exprimer de manière concise.
Prenons cet exemple tiré du chapitre 3.5 Window Functions de la documentation PostgreSQL:
import sys from pyspark.sql.window import Window from pyspark.sql import SparkSession import pyspark.sql.functions as f import pyspark.sql.types as t spark = SparkSession.builder \ .master("local") \ .appName("WindowsAreGood") \ .getOrCreate() schema = t.StructType([ t.StructField('depName', t.StringType(), False), t.StructField('empNo', t.IntegerType(), False), t.StructField('salary', t.IntegerType(), False), ]) data = [ ("sales", 1, 5000), ("personnel", 2, 3900), ("sales", 3, 4800), ("sales", 4, 4800), ("personnel", 5, 3500), ("develop", 7, 4200), ("develop", 8, 6000), ("develop", 9, 4500), ("develop", 10, 5200), ("develop", 11, 5200) ] sdf = spark.createDataFrame(data, schema=schema)
Ce code fourni le DataFrame suivant:
+---------+-----+------+ | depName|empNo|salary| +---------+-----+------+ | sales| 1| 5000| |personnel| 2| 3900| | sales| 3| 4800| | sales| 4| 4800| |personnel| 5| 3500| | develop| 7| 4200| | develop| 8| 6000| | develop| 9| 4500| | develop| 10| 5200| | develop| 11| 5200| +---------+-----+------+
On souhaite calculer le salaire moyen par département et sans utiliser les fonctions de fenêtrage :
sdf.groupBy('depName').agg(f.avg('salary').alias('avg')).show()
+---------+-----------------+ | depName| avg| +---------+-----------------+ | develop| 5020.0| | sales|4866.666666666667| |personnel| 3700.0| +---------+-----------------+
Pour obtenir le même résultat avec les window functions, on utilisera le module spark
Window
. Ce dernier contient les outils nécessaires pour manipuler les window functions, notamment l’objetWindowSpec
qu’on va utiliser pour définir le Frame. Dans cet exemple, un frame est l’ensemble des lignes du même département. Plus de détails dans le chapitre suivant.byDepName = Window.partitionBy('depName') sdf.withColumn("avg", f.avg('salary').over(byDepName)).show()
+---------+-----+------+-----------------+ | depName|empNo|salary| avg| +---------+-----+------+-----------------+ | develop| 7| 4200| 5020.0| | develop| 8| 6000| 5020.0| | develop| 9| 4500| 5020.0| | develop| 10| 5200| 5020.0| | develop| 11| 5200| 5020.0| | sales| 1| 5000|4866.666666666667| | sales| 3| 4800|4866.666666666667| | sales| 4| 4800|4866.666666666667| |personnel| 2| 3900| 3700.0| |personnel| 5| 3500| 3700.0| +---------+-----+------+-----------------+
Cet exemple montre exactement la différence entre le fonctionnement d’une fonction d’agrégation et une fonction de fenêtrage. Certes, le fond des deux résultats est identique, mais le format est différent. Une fonction de fenêtrage ne regroupe pas les lignes et conserve leurs identités distinctes.
Pour obtenir le même format à l’aide d’une agrégation, il faut ajouter une jointure avec le DataFrame initial pour chaque agrégation.
WindowSpec
WindowSpec
est une spécification qui définit quelles lignes sont incluses dans le frame, c’est-à-dire l’ensemble des lignes associées à la ligne actuelle. WindowSpec prend les éléments suivants lors de sa création :En pratique, on utilise les fonctions suivantes pour définir les spécifications d’une fenêtre :
orderBy :
Crée un WindowSpec avec l’ordre défini.
partitionBy :
Crée un WindowSpec avec le partitionnement défini.
rowsBetween :
Crée un WindowSpec avec les limites du cadre définies, de
start
(inclus) àend
(inclus). Les deuxstart
etend
sont des positions par rapport à la ligne actuelle, en fonction de sa position dans la partition.windowSpec = Window.rowsBetween(-2, 1) sdf.withColumn("first_empNo", f.first("empNo").over(windowSpec))\ .withColumn("last_empNo", f.last("empNo").over(windowSpec))\ .withColumn("frame_size", f.count("empNo").over(windowSpec))\ .show()
+---------+-----+------+-----------+----------+----------+ | depName|empNo|salary|first_empNo|last_empNo|frame_size| +---------+-----+------+-----------+----------+----------+ | sales| 1| 5000| 1| 2| 2| |personnel| 2| 3900| 1| 3| 3| | sales| 3| 4800| 1| 4| 4| | sales| 4| 4800| 2| 5| 4|<= currentRow - 2 |personnel| 5| 3500| 3| 7| 4|<= currentRow - 1 | develop| 7| 4200| 4| 8| 4|<= currentRow | develop| 8| 6000| 5| 9| 4|<= currentRow + 1 | develop| 9| 4500| 7| 10| 4| | develop| 10| 5200| 8| 11| 4| | develop| 11| 5200| 9| 11| 3| +---------+-----+------+-----------+----------+----------+
Dans l’exemple ci-dessus, la fenêtre définit un frame de 4 lignes:
À l’exception des lignes aux extrémités, le frame est plus petit.
+---------+-----+------+-----------+----------+----------+ | depName|empNo|salary|first_empNo|last_empNo|frame_size| +---------+-----+------+-----------+----------+----------+ | sales| 1| 5000| 1| 2| 2|<= currentRow |personnel| 2| 3900| 1| 3| 3|<= currentRow + 1 | sales| 3| 4800| 1| 4| 4| | sales| 4| 4800| 2| 5| 4| |personnel| 5| 3500| 3| 7| 4| | develop| 7| 4200| 4| 8| 4| | develop| 8| 6000| 5| 9| 4| | develop| 9| 4500| 7| 10| 4| | develop| 10| 5200| 8| 11| 4| | develop| 11| 5200| 9| 11| 3| +---------+-----+------+-----------+----------+----------+
rangeBetween :
Crée un WindowSpec avec les limites du frame définies, de
start
(inclus) àend
(inclus). Les deuxstart
etend
sont relatifs à la ligne actuelle, en fonction de la valeur de la colonne dansORDER BY
. Par conséquent,rangeBetween
ne peut être utilisée que dans la définition d’uneWindowSpec
ordonnée.windowSpec = Window.orderBy("salary").rangeBetween(-1000, 500) sdf.withColumn("range", f.concat(f.lit("["), f.col("salary")-1000,f.lit(","),f.col("salary")+500, f.lit("]"))) \ .withColumn("frame_first", f.first("salary").over(windowSpec))\ .withColumn("frame_last", f.last("salary").over(windowSpec))\ .withColumn("frame_count", f.count("empNo").over(windowSpec))\ .show()
+---------+-----+------+-----------+-----------+----------+-----------+ | depName|empNo|salary| range|frame_first|frame_last|frame_count| +---------+-----+------+-----------+-----------+----------+-----------+ |personnel| 5| 3500|[2500,4000]| 3500| 3900| 2| |personnel| 2| 3900|[2900,4400]| 3500| 4200| 3|<= inRange | develop| 7| 4200|[3200,4700]| 3500| 4500| 4|<= inRange | develop| 9| 4500|[3500,5000]| 3500| 5000| 7|<= inRange | sales| 3| 4800|[3800,5300]| 3900| 5200| 8|<= inRange | sales| 4| 4800|[3800,5300]| 3900| 5200| 8|<= currentRow | sales| 1| 5000|[4000,5500]| 4200| 5200| 7|<= inRange | develop| 10| 5200|[4200,5700]| 4200| 5200| 7|<= inRange | develop| 11| 5200|[4200,5700]| 4200| 5200| 7|<= inRange | develop| 8| 6000|[5000,6500]| 5000| 6000| 4| +---------+-----+------+-----------+-----------+----------+-----------+
Dans l’exemple ci-dessus, la fenêtre définit un frame dont les éléments sont ordonnés par revenu. La taille du frame dépend de la valeur du salaire de la ligne courante.
Si la valeur du salaire est 4800, alors toutes les lignes dont la valeur du salaire sont comprises entre
4800 - 1000
et4800 + 500
(Càd 3800 et 5300) seront incluses dans le frame.Pour définir un frame dont les bornes sont “infinies”, on utilise les valeurs suivantes:
Après avoir défini une WindowSpec, il ne reste que choisir/créer une window function.
Window Functions
Spark SQL prend en charge les window functions suivantes :
rank
,dense_rank
,percent_rank
,ntile
,row_number
cume_dist
,first
,last
,lag
,lead
sum
,avg
,min
,max
etcount
)pandas_udf
:Pour plus de détails sur ces fonctions, veuillez consulter la documentation de Spark.
Pour utiliser les window functions, l’utilisateur doit associer la fonction à une fenêtre :
avg (salary) OVER (...);
ou
over()
sur une fonction prise en charge dans l’API DataFrame, par exemplerank().over(windowSpec)
Exemple
On veut répondre aux questions suivantes :
Pour répondre à la première question, nous devons classer les salariés dans un département en fonction de leurs revenus et sélectionner les deux mieux payés. Vous trouverez ci-dessous le code utilisé pour résoudre cette question en utilisant la fonction de fenêtrage
dense_rank
:windowSpec = Window.partitionBy('depName').orderBy(f.col('salary').desc()) sdf.withColumn("rank", f.dense_rank().over(windowSpec)).where(f.col('rank') <= 2).show()
+---------+-----+------+----+ | depName|empNo|salary|rank| +---------+-----+------+----+ | develop| 8| 6000| 1| | develop| 10| 5200| 2| | develop| 11| 5200| 2| | sales| 1| 5000| 1| | sales| 3| 4800| 2| | sales| 4| 4800| 2| |personnel| 2| 3900| 1| |personnel| 5| 3500| 2| +---------+-----+------+----+
En Spark SQL ça donne :
SELECT * FROM ( SELECT depName, empNo, salary, dense_rank() OVER (PARTITION BY depName ORDER BY salary DESC) as rank FROM depSalaries) tmp WHERE rank <= 2
Essayez d’écrire une requête SQL équivalente sans fenêtrage pour se rendre compte à quel point c’est pénible.
Pour la deuxième question, nous devons trouver le revenu le plus élevé dans un même département et pour chaque ligne. Voici un programme pour répondre à cette question.
windowSpec = Window.partitionBy('depName').orderBy(f.col('salary').desc()) sdf.withColumn("diff", f.max(f.col('salary')).over(windowSpec) - f.col('salary')).show()
+---------+-----+------+----+ | depName|empNo|salary|diff| +---------+-----+------+----+ | develop| 8| 6000| 0| | develop| 10| 5200| 800| | develop| 11| 5200| 800| | develop| 9| 4500|1500| | develop| 7| 4200|1800| | sales| 1| 5000| 0| | sales| 3| 4800| 200| | sales| 4| 4800| 200| |personnel| 2| 3900| 0| |personnel| 5| 3500| 400| +---------+-----+------+----+
Sans utiliser les fenêtres, il faut trouver le revenu le plus élevé dans chaque département. Puis joindre cet ensemble de données dérivées à la table d’origine pour calculer les différences de revenus.
max_salary = sdf.groupBy('depName').agg(f.max(f.col('salary')).alias('max')) sdf.join(max_salary, 'depName').withColumn("diff", f.col('max') - f.col('salary')).show()
+---------+-----+------+----+----+ | depName|empNo|salary| max|diff| +---------+-----+------+----+----+ | develop| 7| 4200|6000|1800| | develop| 8| 6000|6000| 0| | develop| 9| 4500|6000|1500| | develop| 10| 5200|6000| 800| | develop| 11| 5200|6000| 800| | sales| 1| 5000|5000| 0| | sales| 3| 4800|5000| 200| | sales| 4| 4800|5000| 200| |personnel| 2| 3900|3900| 0| |personnel| 5| 3500|3900| 400| +---------+-----+------+----+----+
Un autre exemple à démontrer la puissance des fonctions de fenêtrage est le calcul d’une somme cumulée.
windowSpec = Window.partitionBy('depName').orderBy(f.col('salary').asc())\ .rowsBetween(Window.unboundedPreceding, Window.currentRow) sdf.withColumn("sum_cum", f.sum(f.col('salary')).over(windowSpec)).show()
+---------+-----+------+-----+ | depName|empNo|salary| cum| +---------+-----+------+-----+ | develop| 7| 4200| 4200| | develop| 9| 4500| 8700| | develop| 10| 5200|13900| | develop| 11| 5200|19100| | develop| 8| 6000|25100| | sales| 3| 4800| 4800| | sales| 4| 4800| 9600| | sales| 1| 5000|14600| |personnel| 5| 3500| 3500| |personnel| 2| 3900| 7400| +---------+-----+------+-----+
Cette fois, on a utilisé la fonction
rowsBetween
pour créer un frame délimité par la ligne actuellecurrentRow
et toutes les lignes précédentesWindow.unboundedPreceding
(−∞).Spark ignore automatiquement les lignes précédentes et suivantes, si la ligne actuelle est la première ou la dernière ligne.
LEAD & LAG
lag et lead peuvent être utilisés lorsque nous voulons obtenir un résultat relatif entre les lignes: lag signifie obtenir la valeur de la ligne précédente; lead signifie obtenir la valeur de la ligne suivante.
overCategory = Window.partitionBy('depName').orderBy(f.col('salary').desc()) df = sdf.withColumn("lead", f.lead('salary', 1).over(overCategory))\ .withColumn("lag", f.lag('salary', 1).over(overCategory)) df.show()
+---------+-----+------+----+----+ | depName|empNo|salary|lead| lag| +---------+-----+------+----+----+ | develop| 8| 6000|5200|null| | develop| 10| 5200|5200|6000| | develop| 11| 5200|4500|5200| | develop| 9| 4500|4200|5200| | develop| 7| 4200|null|4500| | sales| 1| 5000|4800|null| | sales| 3| 4800|4800|5000| | sales| 4| 4800|null|4800| |personnel| 2| 3900|3500|null| |personnel| 5| 3500|null|3900| +---------+-----+------+----+----+
Remarquez la première ligne de la colonne
lag
de chaque groupe a la valeur nulle, et la dernière ligne de la colonnelead
de chaque groupe a la valeur nulle.Après avoir calculé la différence, nous pouvons trouver des valeurs aberrantes qui présentent un énorme écart salarial.
df.fillna(0).withColumn("higher_than_next", f.col('salary') - f.col('lead'))\ .withColumn("lower_than_previous", f.col('lag') - f.col('salary'))\ .show()
+---------+-----+------+----+----+----------------+-------------------+ | depName|empNo|salary|lead| lag|higher_than_next|lower_than_previous| +---------+-----+------+----+----+----------------+-------------------+ | develop| 8| 6000|5200| 0| 800| -6000| | develop| 10| 5200|5200|6000| 0| 800| | develop| 11| 5200|4500|5200| 700| 0| | develop| 9| 4500|4200|5200| 300| 700| | develop| 7| 4200| 0|4500| 4200| 300| | sales| 1| 5000|4800| 0| 200| -5000| | sales| 3| 4800|4800|5000| 0| 200| | sales| 4| 4800| 0|4800| 4800| 0| |personnel| 2| 3900|3500| 0| 400| -3900| |personnel| 5| 3500| 0|3900| 3500| 400| +---------+-----+------+----+----+----------------+-------------------+
Références