In this blog post I have covered some of the common PySpark DataFrame Functions, Joins, and Windows Functions.
All the commands were executed on the new job authoring Jupyter notebook available in preview in AWS Glue Studio.


Import Libraries
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.context import GlueContext
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import Row
from pyspark.context import SparkContext
Create Spark DataFrame from list of tuples
sourceData = [
("http://www.youtube.com","video","NY","en_US",90,34,9,10,2,1634256039878),
("http://www.msn.com","post","TYO","jp",86,56,20,8,1,1634256040074),
("http://www.google.ca","video","CA","en_US",81,30,23,10,None,1634256039306),
("http://www.youtube.com ","video","CA","en_US",99,40,24,3,7,1634256039306),
("http://www.google.ca","video","CA","en_CA",83,36,19,9,1,1634256040074),
("http://www.times.com ","post","WA","en_US",91,50,21,None,5,1634256039303),
("http://www.google.ca","post","NY","en",79,53,15, 1,2,1634256040074),
("http://www.youtube.com","video","CA","en_CA",80,25,18,5,15,1634256039303),
("http://www.msn.com","post","WA","en",91,50,None,20,5,1634256040074),
("http://www.times.com","post","TYO","jp",91,50,21,20,8,1634256039303),
(None,"post","TYO","jp",54,23,12,2,8,1634256040074),
(" ","post","TYO","jp",5,2,1,2,8,1634256040074)
]
schema = ["url","item_type","city","lang","view","like","dslike", "share", "comment", "time"]
df = spark.createDataFrame(data=sourceData, schema = schema)
Check schema of Spark DataFrame
df.printSchema()
root
|-- url: string (nullable = true)
|-- item_type: string (nullable = true)
|-- city: string (nullable = true)
|-- lang: string (nullable = true)
|-- view: long (nullable = true)
|-- like: long (nullable = true)
|-- dslike: long (nullable = true)
|-- share: long (nullable = true)
|-- comment: long (nullable = true)
|-- time: long (nullable = true)
Print Spark DataFrame Row
df.show(truncate=False)
+-----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|url |item_type|city|lang |view|like|dslike|share|comment|time |
+-----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|http://www.youtube.com |video |NY |en_US|90 |34 |9 |10 |2 |1634256039878|
|http://www.msn.com |post |TYO |jp |86 |56 |20 |8 |1 |1634256040074|
|http://www.google.ca |video |CA |en_US|81 |30 |23 |10 |null |1634256039306|
|http://www.youtube.com |video |CA |en_US|99 |40 |24 |3 |7 |1634256039306|
|http://www.google.ca |video |CA |en_CA|83 |36 |19 |9 |1 |1634256040074|
|http://www.times.com |post |WA |en_US|91 |50 |21 |null |5 |1634256039303|
|http://www.google.ca |post |NY |en |79 |53 |15 |1 |2 |1634256040074|
|http://www.youtube.com |video |CA |en_CA|80 |25 |18 |5 |15 |1634256039303|
|http://www.msn.com |post |WA |en |91 |50 |null |20 |5 |1634256040074|
|http://www.times.com |post |TYO |jp |91 |50 |21 |20 |8 |1634256039303|
|null |post |TYO |jp |54 |23 |12 |2 |8 |1634256040074|
| |post |TYO |jp |5 |2 |1 |2 |8 |1634256040074|
+-----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
Count number of records
df.count()
12
Get distinct record count
# DataFrame distinct() returns a new DataFrame after eliminating \
# duplicate rows (distinct on all columns).
df.distinct().count()
12
Distinct count on selected column
# Get distinct count on selected columns using countDistinct().
# This function returns the number of distinct elements in a group.
df.select(F.countDistinct("url").alias("count_distinct_url")).show()
+------------------+
|count_distinct_url|
+------------------+
| 7|
+------------------+
Group the DataFrame using specified column
df.groupBy("url").count().show(truncate=False)
+-----------------------+-----+
|url |count|
+-----------------------+-----+
|http://www.google.ca |3 |
|http://www.times.com |1 |
|http://www.msn.com |2 |
|null |1 |
|http://www.youtube.com |2 |
|http://www.times.com |1 |
| |1 |
|http://www.youtube.com |1 |
+-----------------------+-----+
GroupBy using PySpark SQL
df.createOrReplaceTempView("url_events")
spark.sql("select url, count(*) from url_events group by url").show(truncate=False)
+-----------------------+--------+
|url |count(1)|
+-----------------------+--------+
|http://www.google.ca |3 |
|http://www.times.com |1 |
|http://www.msn.com |2 |
|null |1 |
|http://www.youtube.com |2 |
|http://www.times.com |1 |
| |1 |
|http://www.youtube.com |1 |
+-----------------------+--------+
Remove records where url is null or empty
df=df.withColumn('url', F.when(~(F.col('url').isNull() | F.isnan(F.col("url")) | (F.trim(F.col("url")) == "")), F.col("url")))
df.filter((df.url).isNotNull()).show(truncate=False)
+-----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|url |item_type|city|lang |view|like|dslike|share|comment|time |
+-----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|http://www.youtube.com |video |NY |en_US|90 |34 |9 |10 |2 |1634256039878|
|http://www.msn.com |post |TYO |jp |86 |56 |20 |8 |1 |1634256040074|
|http://www.google.ca |video |CA |en_US|81 |30 |23 |10 |null |1634256039306|
|http://www.youtube.com |video |CA |en_US|99 |40 |24 |3 |7 |1634256039306|
|http://www.google.ca |video |CA |en_CA|83 |36 |19 |9 |1 |1634256040074|
|http://www.times.com |post |WA |en_US|91 |50 |21 |null |5 |1634256039303|
|http://www.google.ca |post |NY |en |79 |53 |15 |1 |2 |1634256040074|
|http://www.youtube.com |video |CA |en_CA|80 |25 |18 |5 |15 |1634256039303|
|http://www.msn.com |post |WA |en |91 |50 |null |20 |5 |1634256040074|
|http://www.times.com |post |TYO |jp |91 |50 |21 |20 |8 |1634256039303|
+-----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
df=df.filter((df.url).isNotNull())
trim – Trim the spaces from both ends for the specified string column
df = df.withColumn("url", F.trim(df.url))
df.groupBy("url").count().show(truncate=False)
+----------------------+-----+
|url |count|
+----------------------+-----+
|http://www.google.ca |3 |
|http://www.times.com |2 |
|http://www.msn.com |2 |
|http://www.youtube.com|3 |
+----------------------+-----+
startswith – Find rows starting with specified string
df.filter(df.lang.startswith('en')).show()
+--------------------+---------+----+-----+----+----+------+-----+-------+-------------+
| url|item_type|city| lang|view|like|dslike|share|comment| time|
+--------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|http://www.youtub...| video| NY|en_US| 90| 34| 9| 10| 2|1634256039878|
|http://www.google.ca| video| CA|en_US| 81| 30| 23| 10| null|1634256039306|
|http://www.youtub...| video| CA|en_US| 99| 40| 24| 3| 7|1634256039306|
|http://www.google.ca| video| CA|en_CA| 83| 36| 19| 9| 1|1634256040074|
|http://www.times.com| post| WA|en_US| 91| 50| 21| null| 5|1634256039303|
|http://www.google.ca| post| NY| en| 79| 53| 15| 1| 2|1634256040074|
|http://www.youtub...| video| CA|en_CA| 80| 25| 18| 5| 15|1634256039303|
| http://www.msn.com| post| WA| en| 91| 50| null| 20| 5|1634256040074|
+--------------------+---------+----+-----+----+----+------+-----+-------+-------------+
filter – Filter rows using given condition
df.filter((df.city == "NY")).show(truncate=False)
+----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|url |item_type|city|lang |view|like|dslike|share|comment|time |
+----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|http://www.youtube.com|video |NY |en_US|90 |34 |9 |10 |2 |1634256039878|
|http://www.google.ca |post |NY |en |79 |53 |15 |1 |2 |1634256040074|
+----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
# filters records where city is NY and CA
df.filter((df.city.isin(["NY", "CA"]))).show(truncate=False)
#df.filter((df.city == "NY") | (df.city == "CA")).show(truncate=False)
+----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|url |item_type|city|lang |view|like|dslike|share|comment|time |
+----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|http://www.youtube.com|video |NY |en_US|90 |34 |9 |10 |2 |1634256039878|
|http://www.google.ca |video |CA |en_US|81 |30 |23 |10 |null |1634256039306|
|http://www.youtube.com|video |CA |en_US|99 |40 |24 |3 |7 |1634256039306|
|http://www.google.ca |video |CA |en_CA|83 |36 |19 |9 |1 |1634256040074|
|http://www.google.ca |post |NY |en |79 |53 |15 |1 |2 |1634256040074|
|http://www.youtube.com|video |CA |en_CA|80 |25 |18 |5 |15 |1634256039303|
+----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
# filters records where city is NOT in NY and CA
df.filter(~(df.city.isin(["NY", "CA"]))).show(truncate=False)
+--------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|url |item_type|city|lang |view|like|dslike|share|comment|time |
+--------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|http://www.msn.com |post |TYO |jp |86 |56 |20 |8 |1 |1634256040074|
|http://www.times.com|post |WA |en_US|91 |50 |21 |null |5 |1634256039303|
|http://www.msn.com |post |WA |en |91 |50 |null |20 |5 |1634256040074|
|http://www.times.com|post |TYO |jp |91 |50 |21 |20 |8 |1634256039303|
+--------------------+---------+----+-----+----+----+------+-----+-------+-------------+
groupBy – Groups DataFrame on specified columns, so we can run aggregation on them
df.groupBy("url", "city").agg(F.count("*")).show(truncate=False)
+----------------------+----+--------+
|url |city|count(1)|
+----------------------+----+--------+
|http://www.times.com |WA |1 |
|http://www.google.ca |NY |1 |
|http://www.times.com |TYO |1 |
|http://www.youtube.com|NY |1 |
|http://www.google.ca |CA |2 |
|http://www.youtube.com|CA |2 |
|http://www.msn.com |WA |1 |
|http://www.msn.com |TYO |1 |
+----------------------+----+--------+
pivot – Pivot a column of DataFrame and perform specific aggregation
df.groupBy("url").pivot("item_type").agg(F.count("*")).na.fill(0).show(truncate=False)
#df.groupBy("url").pivot("item_type").agg(F.count("*")).fillna(0).show(truncate=False)
+----------------------+----+-----+
|url |post|video|
+----------------------+----+-----+
|http://www.google.ca |1 |2 |
|http://www.times.com |2 |0 |
|http://www.msn.com |2 |0 |
|http://www.youtube.com|0 |3 |
+----------------------+----+-----+
withColumn – Returns a new DataFrame by adding a new column or replacing the existing column that has the same name.
df_item_type.withColumn("total", (df_item_type.post + df_item_type.video)).show(truncate=False)
+----------------------+----+-----+-----+
|url |post|video|total|
+----------------------+----+-----+-----+
|http://www.google.ca |1 |2 |3 |
|http://www.times.com |2 |0 |2 |
|http://www.msn.com |2 |0 |2 |
|http://www.youtube.com|0 |3 |3 |
+----------------------+----+-----+-----+
withColumnRenamed – Returns a new DataFrame by renaming an existing column.
df = df.withColumnRenamed("dslike","dislike")
lit – Create a column of literal value
df.select("url","item_type").withColumn("item_type_no", F.when(F.col("item_type") == "video" ,F.lit("1")).otherwise(F.lit("2"))).show(truncate=False)
+----------------------+---------+------------+
|url |item_type|item_type_no|
+----------------------+---------+------------+
|http://www.youtube.com|video |1 |
|http://www.msn.com |post |2 |
|http://www.google.ca |video |1 |
|http://www.youtube.com|video |1 |
|http://www.google.ca |video |1 |
|http://www.times.com |post |2 |
|http://www.google.ca |post |2 |
|http://www.youtube.com|video |1 |
|http://www.msn.com |post |2 |
|http://www.times.com |post |2 |
+----------------------+---------+------------+
df = df.select("url","item_type").withColumn("item_type_no", F.when(F.col("item_type") == "video" ,F.lit("1")).otherwise(F.lit("2")))
df.printSchema()
root
|-- url: string (nullable = true)
|-- item_type: string (nullable = true)
|-- item_type_no: string (nullable = false)
cast – Covert column datatype
df = df.withColumn("item_type_no", df["item_type_no"].cast(T.IntegerType()))
df.printSchema()
root
|-- url: string (nullable = true)
|-- item_type: string (nullable = true)
|-- item_type_no: integer (nullable = true)
Just like Joins in SQL, you can use PySpark to combine DataFrames. To demonstrate the joins, I will use the EMP and DEPT table. These tables are copy of EMP and DEPT table found in Oracle database in scott schema.
DEPT dataset
deptData = [
(10,'ACCOUNTING','NEW YORK'),
(20,'RESEARCH','DALLAS'),
(30,'SALES','CHICAGO'),
(40,'OPERATIONS','BOSTON')
]
dept_schema = ["deptno","dname","loc"]
df_dept = spark.createDataFrame(data=deptData, schema = dept_schema)
df_dept.show()
+------+----------+--------+
|deptno| dname| loc|
+------+----------+--------+
| 10|ACCOUNTING|NEW YORK|
| 20| RESEARCH| DALLAS|
| 30| SALES| CHICAGO|
| 40|OPERATIONS| BOSTON|
+------+----------+--------+
EMP DataFrame
empData = [
(7369,'SMITH','CLERK',7902,'17-12-1980',800,None,20),
(7499,'ALLEN','SALESMAN',7698,'20-2-1981',1600,300,30),
(7521,'WARD','SALESMAN',7698,'22-2-1981',1250,500,30),
(7566,'JONES','MANAGER',7839,'2-4-1981',2975,None,20),
(7654,'MARTIN','SALESMAN',7698,'28-9-1981',1250,1400,30),
(7698,'BLAKE','MANAGER',7839,'1-5-1981',2850,None,30),
(7782,'CLARK','MANAGER',7839,'9-6-1981',2450,None,10),
(7788,'SCOTT','ANALYST',7566,'13-JUL-87',3000,None,20),
(7839,'KING','PRESIDENT',None,'17-11-1981',5000,None,10),
(7844,'TURNER','SALESMAN',7698,'8-9-1981',1500,0,30),
(7876,'ADAMS','CLERK',7788,'13-JUL-87',1100,None,20),
(7900,'JAMES','CLERK',7698,'3-12-1981',950,None,30),
(7902,'FORD','ANALYST',7566,'3-12-1981',3000,None,20),
(7934,'MILLER','CLERK',7782,'23-1-1982',1300,None,10),
(9999,'ANDY','DBA',None,'02-1-1981',4300,None,None)
]
emp_schema = ["empno","ename","job","mgr","hiredate","sal","comm", "deptno"]
df_emp = spark.createDataFrame(data=empData, schema = emp_schema)
df_emp.show()
+-----+------+---------+----+----------+----+----+------+
|empno| ename| job| mgr| hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH| CLERK|7902|17-12-1980| 800|null| 20|
| 7499| ALLEN| SALESMAN|7698| 20-2-1981|1600| 300| 30|
| 7521| WARD| SALESMAN|7698| 22-2-1981|1250| 500| 30|
| 7566| JONES| MANAGER|7839| 2-4-1981|2975|null| 20|
| 7654|MARTIN| SALESMAN|7698| 28-9-1981|1250|1400| 30|
| 7698| BLAKE| MANAGER|7839| 1-5-1981|2850|null| 30|
| 7782| CLARK| MANAGER|7839| 9-6-1981|2450|null| 10|
| 7788| SCOTT| ANALYST|7566| 13-JUL-87|3000|null| 20|
| 7839| KING|PRESIDENT|null|17-11-1981|5000|null| 10|
| 7844|TURNER| SALESMAN|7698| 8-9-1981|1500| 0| 30|
| 7876| ADAMS| CLERK|7788| 13-JUL-87|1100|null| 20|
| 7900| JAMES| CLERK|7698| 3-12-1981| 950|null| 30|
| 7902| FORD| ANALYST|7566| 3-12-1981|3000|null| 20|
| 7934|MILLER| CLERK|7782| 23-1-1982|1300|null| 10|
| 9999| ANDY| DBA|null| 02-1-1981|4300|null| null|
+-----+------+---------+----+----------+----+----+------+
sort – Returns a new DataFrame sorted by specified column
df_emp.sort("deptno").show()
# df_emp.sort(F.desc("deptno")).show()
+-----+------+---------+----+----------+----+----+------+
|empno| ename| job| mgr| hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7934|MILLER| CLERK|7782| 23-1-1982|1300|null| 10|
| 7782| CLARK| MANAGER|7839| 9-6-1981|2450|null| 10|
| 7839| KING|PRESIDENT|null|17-11-1981|5000|null| 10|
| 7369| SMITH| CLERK|7902|17-12-1980| 800|null| 20|
| 7876| ADAMS| CLERK|7788| 13-JUL-87|1100|null| 20|
| 7902| FORD| ANALYST|7566| 3-12-1981|3000|null| 20|
| 7566| JONES| MANAGER|7839| 2-4-1981|2975|null| 20|
| 7788| SCOTT| ANALYST|7566| 13-JUL-87|3000|null| 20|
| 7844|TURNER| SALESMAN|7698| 8-9-1981|1500| 0| 30|
| 7499| ALLEN| SALESMAN|7698| 20-2-1981|1600| 300| 30|
| 7900| JAMES| CLERK|7698| 3-12-1981| 950|null| 30|
| 7521| WARD| SALESMAN|7698| 22-2-1981|1250| 500| 30|
| 7654|MARTIN| SALESMAN|7698| 28-9-1981|1250|1400| 30|
| 7698| BLAKE| MANAGER|7839| 1-5-1981|2850|null| 30|
+-----+------+---------+----+----------+----+----+------+
Inner Join
"""
SELECT d.department_id,
d.department_name,
e.employee_name
FROM departments d
JOIN employees e ON d.department_id = e.department_id
WHERE d.department_id >= 30
ORDER BY d.department_name;
"""
df_dept.filter(df_dept.deptno >= 30).join(df_emp, ['deptno'],how='inner').select(df_dept.deptno, df_dept.dname, df_emp.ename).show()
+------+-----+------+
|deptno|dname| ename|
+------+-----+------+
| 30|SALES| ALLEN|
| 30|SALES| WARD|
| 30|SALES|MARTIN|
| 30|SALES| BLAKE|
| 30|SALES|TURNER|
| 30|SALES| JAMES|
+------+-----+------+
LEFT [OUTER] JOIN
"""
SELECT d.department_id,
d. location,
d.department_name,
e.employee_name
FROM departments d
LEFT OUTER JOIN employees e ON d.department_id = e.department_id AND e.salary >= 2000
WHERE d.department_id >= 30
"""
df_dept.filter(df_dept.deptno >= 30).join(df_emp.filter(df_emp.sal >= 2000), ['deptno'],how='left_outer').select(df_dept.deptno, df_dept.loc, df_dept.dname, df_emp.ename).show()
FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…
+------+-------+----------+-----+
|deptno| loc| dname|ename|
+------+-------+----------+-----+
| 30|CHICAGO| SALES|BLAKE|
| 40| BOSTON|OPERATIONS| null|
+------+-------+----------+-----+
RIGHT [OUTER] JOIN
"""
SELECT d.department_id,
d.location,
d.department_name,
e.employee_name
FROM employees e
RIGHT OUTER JOIN departments d ON e.department_id = d.department_id
WHERE d.department_id >= 30
"""
df_emp.join(df_dept.filter(df_dept.deptno >= 30), ['deptno'], how='rightouter').select(df_dept.deptno, df_dept.loc, df_dept.dname, df_emp.ename).show()
+------+-------+----------+------+
|deptno| loc| dname| ename|
+------+-------+----------+------+
| 30|CHICAGO| SALES| ALLEN|
| 30|CHICAGO| SALES| WARD|
| 30|CHICAGO| SALES|MARTIN|
| 30|CHICAGO| SALES| BLAKE|
| 30|CHICAGO| SALES|TURNER|
| 30|CHICAGO| SALES| JAMES|
| 40| BOSTON|OPERATIONS| null|
+------+-------+----------+------+
FULL [OUTER] JOIN
"""
SELECT *
FROM employees e
FULL OUTER JOIN departments d ON e.department_id = d.department_id
"""
df_emp.join(df_dept, ['deptno'], how='full').show()
+------+-----+------+---------+----+----------+----+----+----------+--------+
|deptno|empno| ename| job| mgr| hiredate| sal|comm| dname| loc|
+------+-----+------+---------+----+----------+----+----+----------+--------+
| null| 9999| ANDY| DBA|null| 02-1-1981|4300|null| null| null|
| 10| 7782| CLARK| MANAGER|7839| 9-6-1981|2450|null|ACCOUNTING|NEW YORK|
| 10| 7839| KING|PRESIDENT|null|17-11-1981|5000|null|ACCOUNTING|NEW YORK|
| 10| 7934|MILLER| CLERK|7782| 23-1-1982|1300|null|ACCOUNTING|NEW YORK|
| 30| 7499| ALLEN| SALESMAN|7698| 20-2-1981|1600| 300| SALES| CHICAGO|
| 30| 7521| WARD| SALESMAN|7698| 22-2-1981|1250| 500| SALES| CHICAGO|
| 30| 7654|MARTIN| SALESMAN|7698| 28-9-1981|1250|1400| SALES| CHICAGO|
| 30| 7698| BLAKE| MANAGER|7839| 1-5-1981|2850|null| SALES| CHICAGO|
| 30| 7844|TURNER| SALESMAN|7698| 8-9-1981|1500| 0| SALES| CHICAGO|
| 30| 7900| JAMES| CLERK|7698| 3-12-1981| 950|null| SALES| CHICAGO|
| 20| 7369| SMITH| CLERK|7902|17-12-1980| 800|null| RESEARCH| DALLAS|
| 20| 7566| JONES| MANAGER|7839| 2-4-1981|2975|null| RESEARCH| DALLAS|
| 20| 7788| SCOTT| ANALYST|7566| 13-JUL-87|3000|null| RESEARCH| DALLAS|
| 20| 7876| ADAMS| CLERK|7788| 13-JUL-87|1100|null| RESEARCH| DALLAS|
| 20| 7902| FORD| ANALYST|7566| 3-12-1981|3000|null| RESEARCH| DALLAS|
| 40| null| null| null|null| null|null|null|OPERATIONS| BOSTON|
+------+-----+------+---------+----+----------+----+----+----------+--------+
udf – Create a user defined function
def salaryScale(sal):
if sal >= 4000:
return 'high'
elif sal > 2500 and sal < 4000:
return 'medium'
else:
return 'low'
#convert to a UDF Function by passing in the function and return type of function
salaryScaleUDF = F.udf(salaryScale, T.StringType())
df_emp.withColumn("SalaryScale", salaryScaleUDF("sal")).show()
+-----+------+---------+----+----------+----+----+------+-----------+
|empno| ename| job| mgr| hiredate| sal|comm|deptno|SalaryScale|
+-----+------+---------+----+----------+----+----+------+-----------+
| 7369| SMITH| CLERK|7902|17-12-1980| 800|null| 20| low|
| 7499| ALLEN| SALESMAN|7698| 20-2-1981|1600| 300| 30| low|
| 7521| WARD| SALESMAN|7698| 22-2-1981|1250| 500| 30| low|
| 7566| JONES| MANAGER|7839| 2-4-1981|2975|null| 20| medium|
| 7654|MARTIN| SALESMAN|7698| 28-9-1981|1250|1400| 30| low|
| 7698| BLAKE| MANAGER|7839| 1-5-1981|2850|null| 30| medium|
| 7782| CLARK| MANAGER|7839| 9-6-1981|2450|null| 10| low|
| 7788| SCOTT| ANALYST|7566| 13-JUL-87|3000|null| 20| medium|
| 7839| KING|PRESIDENT|null|17-11-1981|5000|null| 10| high|
| 7844|TURNER| SALESMAN|7698| 8-9-1981|1500| 0| 30| low|
| 7876| ADAMS| CLERK|7788| 13-JUL-87|1100|null| 20| low|
| 7900| JAMES| CLERK|7698| 3-12-1981| 950|null| 30| low|
| 7902| FORD| ANALYST|7566| 3-12-1981|3000|null| 20| medium|
| 7934|MILLER| CLERK|7782| 23-1-1982|1300|null| 10| low|
| 9999| ANDY| DBA|null| 02-1-1981|4300|null| null| high|
+-----+------+---------+----+----------+----+----+------+-----------+
Window – Window Function defines window in DataFrames.
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec = Window().partitionBy(['deptno']).orderBy(F.desc('sal'))
rank – Window function
Returns the rank of rows within a window partition
df_emp.withColumn("rank", F.rank().over(windowSpec)).select("empno", "ename", "job", "mgr", "sal", "deptno", "rank").show()
+-----+------+---------+----+----+------+----+
|empno| ename| job| mgr| sal|deptno|rank|
+-----+------+---------+----+----+------+----+
| 9999| ANDY| DBA|null|4300| null| 1|
| 7839| KING|PRESIDENT|null|5000| 10| 1|
| 7782| CLARK| MANAGER|7839|2450| 10| 2|
| 7934|MILLER| CLERK|7782|1300| 10| 3|
| 7698| BLAKE| MANAGER|7839|2850| 30| 1|
| 7499| ALLEN| SALESMAN|7698|1600| 30| 2|
| 7844|TURNER| SALESMAN|7698|1500| 30| 3|
| 7521| WARD| SALESMAN|7698|1250| 30| 4|
| 7654|MARTIN| SALESMAN|7698|1250| 30| 4|
| 7900| JAMES| CLERK|7698| 950| 30| 6|
| 7788| SCOTT| ANALYST|7566|3000| 20| 1|
| 7902| FORD| ANALYST|7566|3000| 20| 1|
| 7566| JONES| MANAGER|7839|2975| 20| 3|
| 7876| ADAMS| CLERK|7788|1100| 20| 4|
| 7369| SMITH| CLERK|7902| 800| 20| 5|
+-----+------+---------+----+----+------+----+
dense_rank – Window function
The difference between rank and dense_rank is that dense_rank does not leave gaps in ranking sequence when there are ties.
df_emp.withColumn("dense_rank", F.dense_rank().over(windowSpec)).select("empno", "ename", "job", "mgr", "sal", "deptno", "dense_rank").show()
+-----+------+---------+----+----+------+----------+
|empno| ename| job| mgr| sal|deptno|dense_rank|
+-----+------+---------+----+----+------+----------+
| 9999| ANDY| DBA|null|4300| null| 1|
| 7839| KING|PRESIDENT|null|5000| 10| 1|
| 7782| CLARK| MANAGER|7839|2450| 10| 2|
| 7934|MILLER| CLERK|7782|1300| 10| 3|
| 7698| BLAKE| MANAGER|7839|2850| 30| 1|
| 7499| ALLEN| SALESMAN|7698|1600| 30| 2|
| 7844|TURNER| SALESMAN|7698|1500| 30| 3|
| 7521| WARD| SALESMAN|7698|1250| 30| 4|
| 7654|MARTIN| SALESMAN|7698|1250| 30| 4|
| 7900| JAMES| CLERK|7698| 950| 30| 5|
| 7788| SCOTT| ANALYST|7566|3000| 20| 1|
| 7902| FORD| ANALYST|7566|3000| 20| 1|
| 7566| JONES| MANAGER|7839|2975| 20| 2|
| 7876| ADAMS| CLERK|7788|1100| 20| 3|
| 7369| SMITH| CLERK|7902| 800| 20| 4|
+-----+------+---------+----+----+------+----------+
ntile – Window function
Returns the ntile group id (from 1 to n inclusive) in an ordered window partition
df_emp.withColumn("ntile", F.ntile(3).over(windowSpec)).select("empno", "ename", "job", "mgr", "sal", "deptno", "ntile").show()
+-----+------+---------+----+----+------+-----+
|empno| ename| job| mgr| sal|deptno|ntile|
+-----+------+---------+----+----+------+-----+
| 9999| ANDY| DBA|null|4300| null| 1|
| 7839| KING|PRESIDENT|null|5000| 10| 1|
| 7782| CLARK| MANAGER|7839|2450| 10| 2|
| 7934|MILLER| CLERK|7782|1300| 10| 3|
| 7698| BLAKE| MANAGER|7839|2850| 30| 1|
| 7499| ALLEN| SALESMAN|7698|1600| 30| 1|
| 7844|TURNER| SALESMAN|7698|1500| 30| 2|
| 7521| WARD| SALESMAN|7698|1250| 30| 2|
| 7654|MARTIN| SALESMAN|7698|1250| 30| 3|
| 7900| JAMES| CLERK|7698| 950| 30| 3|
| 7788| SCOTT| ANALYST|7566|3000| 20| 1|
| 7902| FORD| ANALYST|7566|3000| 20| 1|
| 7566| JONES| MANAGER|7839|2975| 20| 2|
| 7876| ADAMS| CLERK|7788|1100| 20| 2|
| 7369| SMITH| CLERK|7902| 800| 20| 3|
+-----+------+---------+----+----+------+-----+
lag – Window function
Returns the value that is offset rows before the current row, and default if there is less than offset rows before the current row.
df_emp.withColumn("sal_prev",F.lag("sal",1, 0).over(windowSpec)) \
.select("empno", "ename", "job", "sal", "sal_prev")\
.show()
+-----+------+---------+----+--------+
|empno| ename| job| sal|sal_prev|
+-----+------+---------+----+--------+
| 9999| ANDY| DBA|4300| 0|
| 7839| KING|PRESIDENT|5000| 0|
| 7782| CLARK| MANAGER|2450| 5000|
| 7934|MILLER| CLERK|1300| 2450|
| 7698| BLAKE| MANAGER|2850| 0|
| 7499| ALLEN| SALESMAN|1600| 2850|
| 7844|TURNER| SALESMAN|1500| 1600|
| 7521| WARD| SALESMAN|1250| 1500|
| 7654|MARTIN| SALESMAN|1250| 1250|
| 7900| JAMES| CLERK| 950| 1250|
| 7788| SCOTT| ANALYST|3000| 0|
| 7902| FORD| ANALYST|3000| 3000|
| 7566| JONES| MANAGER|2975| 3000|
| 7876| ADAMS| CLERK|1100| 2975|
| 7369| SMITH| CLERK| 800| 1100|
+-----+------+---------+----+--------+
lead – Window function
Returns the value that is offset rows after the current row, and default if there is less than offset rows after the current row.
df_emp.withColumn("sal_next",F.lead("sal",1, 0).over(windowSpec)) \
.select("empno", "ename", "job", "sal", "sal_next")\
.show()
+-----+------+---------+----+--------+
|empno| ename| job| sal|sal_next|
+-----+------+---------+----+--------+
| 9999| ANDY| DBA|4300| 0|
| 7839| KING|PRESIDENT|5000| 2450|
| 7782| CLARK| MANAGER|2450| 1300|
| 7934|MILLER| CLERK|1300| 0|
| 7698| BLAKE| MANAGER|2850| 1600|
| 7499| ALLEN| SALESMAN|1600| 1500|
| 7844|TURNER| SALESMAN|1500| 1250|
| 7521| WARD| SALESMAN|1250| 1250|
| 7654|MARTIN| SALESMAN|1250| 950|
| 7900| JAMES| CLERK| 950| 0|
| 7788| SCOTT| ANALYST|3000| 3000|
| 7902| FORD| ANALYST|3000| 2975|
| 7566| JONES| MANAGER|2975| 1100|
| 7876| ADAMS| CLERK|1100| 800|
| 7369| SMITH| CLERK| 800| 0|
+-----+------+---------+----+--------+
first – Window function
Returns the first value in a group
df_emp.withColumn("sal_first",F.first("sal",True).over(windowSpec)) \
.select("empno", "ename", "job", "sal", "sal_first")\
.show()
+-----+------+---------+----+---------+
|empno| ename| job| sal|sal_first|
+-----+------+---------+----+---------+
| 9999| ANDY| DBA|4300| 4300|
| 7839| KING|PRESIDENT|5000| 5000|
| 7782| CLARK| MANAGER|2450| 5000|
| 7934|MILLER| CLERK|1300| 5000|
| 7698| BLAKE| MANAGER|2850| 2850|
| 7499| ALLEN| SALESMAN|1600| 2850|
| 7844|TURNER| SALESMAN|1500| 2850|
| 7521| WARD| SALESMAN|1250| 2850|
| 7654|MARTIN| SALESMAN|1250| 2850|
| 7900| JAMES| CLERK| 950| 2850|
| 7788| SCOTT| ANALYST|3000| 3000|
| 7902| FORD| ANALYST|3000| 3000|
| 7566| JONES| MANAGER|2975| 3000|
| 7876| ADAMS| CLERK|1100| 3000|
| 7369| SMITH| CLERK| 800| 3000|
+-----+------+---------+----+---------+
max, avg – Window function
partition = Window.partitionBy("deptno")
df_emp.withColumn("max_sal", F.max(F.col("sal")).over(partition))\
.withColumn("avg_sal", F.round(F.avg(F.col("sal")).over(partition),1))\
.withColumn("first_sal", F.first(F.col("sal")).over(partition))\
.withColumn("last_sal", F.last(F.col("sal")).over(partition)) \
.select("empno","ename", "job", "sal", "deptno", "max_sal", "avg_sal", "first_sal", "last_sal")\
.show()
+-----+------+---------+----+------+-------+-------+---------+--------+
|empno| ename| job| sal|deptno|max_sal|avg_sal|first_sal|last_sal|
+-----+------+---------+----+------+-------+-------+---------+--------+
| 9999| ANDY| DBA|4300| null| 4300| 4300.0| 4300| 4300|
| 7782| CLARK| MANAGER|2450| 10| 5000| 2916.7| 2450| 1300|
| 7839| KING|PRESIDENT|5000| 10| 5000| 2916.7| 2450| 1300|
| 7934|MILLER| CLERK|1300| 10| 5000| 2916.7| 2450| 1300|
| 7499| ALLEN| SALESMAN|1600| 30| 2850| 1566.7| 1600| 950|
| 7521| WARD| SALESMAN|1250| 30| 2850| 1566.7| 1600| 950|
| 7654|MARTIN| SALESMAN|1250| 30| 2850| 1566.7| 1600| 950|
| 7698| BLAKE| MANAGER|2850| 30| 2850| 1566.7| 1600| 950|
| 7844|TURNER| SALESMAN|1500| 30| 2850| 1566.7| 1600| 950|
| 7900| JAMES| CLERK| 950| 30| 2850| 1566.7| 1600| 950|
| 7369| SMITH| CLERK| 800| 20| 3000| 2175.0| 800| 3000|
| 7566| JONES| MANAGER|2975| 20| 3000| 2175.0| 800| 3000|
| 7788| SCOTT| ANALYST|3000| 20| 3000| 2175.0| 800| 3000|
| 7876| ADAMS| CLERK|1100| 20| 3000| 2175.0| 800| 3000|
| 7902| FORD| ANALYST|3000| 20| 3000| 2175.0| 800| 3000|
+-----+------+---------+----+------+-------+-------+---------+--------+
getNumPartitions
Returns the number of partitions in RDD
df_emp.rdd.getNumPartitions()
4
Row count from each partition
from pyspark.sql.functions import spark_partition_id
df_emp.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show()
+-----------+-----+
|partitionId|count|
+-----------+-----+
| 1| 3|
| 3| 6|
| 2| 3|
| 0| 3|
+-----------+-----+
# Another way
df_emp.rdd.glom().map(len).collect()
[3, 3, 3, 6]
Hope you find this helpful!