Guide on PySpark DataFrame Functionality

In this blog post I have covered some of the common PySpark DataFrame Functions, Joins, and Windows Functions. 

All the commands were executed on the new job authoring Jupyter notebook available in preview in AWS Glue Studio.

Import Libraries

from awsglue.job import Job
from awsglue.transforms import *
from awsglue.context import GlueContext

import pyspark.sql.functions as F
import pyspark.sql.types as T

from pyspark.sql import Row
from pyspark.context import SparkContext

Create Spark DataFrame from list of tuples

sourceData = [
    ("http://www.youtube.com","video","NY","en_US",90,34,9,10,2,1634256039878),
    ("http://www.msn.com","post","TYO","jp",86,56,20,8,1,1634256040074),
    ("http://www.google.ca","video","CA","en_US",81,30,23,10,None,1634256039306),
    ("http://www.youtube.com ","video","CA","en_US",99,40,24,3,7,1634256039306),
    ("http://www.google.ca","video","CA","en_CA",83,36,19,9,1,1634256040074),
    ("http://www.times.com ","post","WA","en_US",91,50,21,None,5,1634256039303),
    ("http://www.google.ca","post","NY","en",79,53,15, 1,2,1634256040074),
    ("http://www.youtube.com","video","CA","en_CA",80,25,18,5,15,1634256039303),
    ("http://www.msn.com","post","WA","en",91,50,None,20,5,1634256040074),
    ("http://www.times.com","post","TYO","jp",91,50,21,20,8,1634256039303),
    (None,"post","TYO","jp",54,23,12,2,8,1634256040074),
    (" ","post","TYO","jp",5,2,1,2,8,1634256040074)
]

schema = ["url","item_type","city","lang","view","like","dslike", "share", "comment", "time"]

df = spark.createDataFrame(data=sourceData, schema = schema)

Check schema of Spark DataFrame

df.printSchema()

root
 |-- url: string (nullable = true)
 |-- item_type: string (nullable = true)
 |-- city: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- view: long (nullable = true)
 |-- like: long (nullable = true)
 |-- dslike: long (nullable = true)
 |-- share: long (nullable = true)
 |-- comment: long (nullable = true)
 |-- time: long (nullable = true)
df.show(truncate=False)

+-----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|url                    |item_type|city|lang |view|like|dslike|share|comment|time         |
+-----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|http://www.youtube.com |video    |NY  |en_US|90  |34  |9     |10   |2      |1634256039878|
|http://www.msn.com     |post     |TYO |jp   |86  |56  |20    |8    |1      |1634256040074|
|http://www.google.ca   |video    |CA  |en_US|81  |30  |23    |10   |null   |1634256039306|
|http://www.youtube.com |video    |CA  |en_US|99  |40  |24    |3    |7      |1634256039306|
|http://www.google.ca   |video    |CA  |en_CA|83  |36  |19    |9    |1      |1634256040074|
|http://www.times.com   |post     |WA  |en_US|91  |50  |21    |null |5      |1634256039303|
|http://www.google.ca   |post     |NY  |en   |79  |53  |15    |1    |2      |1634256040074|
|http://www.youtube.com |video    |CA  |en_CA|80  |25  |18    |5    |15     |1634256039303|
|http://www.msn.com     |post     |WA  |en   |91  |50  |null  |20   |5      |1634256040074|
|http://www.times.com   |post     |TYO |jp   |91  |50  |21    |20   |8      |1634256039303|
|null                   |post     |TYO |jp   |54  |23  |12    |2    |8      |1634256040074|
|                       |post     |TYO |jp   |5   |2   |1     |2    |8      |1634256040074|
+-----------------------+---------+----+-----+----+----+------+-----+-------+-------------+

Count number of records

df.count()

12

Get distinct record count

# DataFrame distinct() returns a new DataFrame after eliminating \
# duplicate rows (distinct on all columns).

df.distinct().count()

12

Distinct count on selected column

# Get distinct count on selected columns using countDistinct(). 
# This function returns the number of distinct elements in a group.

df.select(F.countDistinct("url").alias("count_distinct_url")).show()

+------------------+
|count_distinct_url|
+------------------+
|                 7|
+------------------+

Group the DataFrame using specified column

df.groupBy("url").count().show(truncate=False)

+-----------------------+-----+
|url                    |count|
+-----------------------+-----+
|http://www.google.ca   |3    |
|http://www.times.com   |1    |
|http://www.msn.com     |2    |
|null                   |1    |
|http://www.youtube.com |2    |
|http://www.times.com   |1    |
|                       |1    |
|http://www.youtube.com |1    |
+-----------------------+-----+

GroupBy using PySpark SQL

df.createOrReplaceTempView("url_events")

spark.sql("select url, count(*) from url_events group by url").show(truncate=False)

+-----------------------+--------+
|url                    |count(1)|
+-----------------------+--------+
|http://www.google.ca   |3       |
|http://www.times.com   |1       |
|http://www.msn.com     |2       |
|null                   |1       |
|http://www.youtube.com |2       |
|http://www.times.com   |1       |
|                       |1       |
|http://www.youtube.com |1       |
+-----------------------+--------+

Remove records where url is null or empty

df=df.withColumn('url', F.when(~(F.col('url').isNull() | F.isnan(F.col("url")) | (F.trim(F.col("url")) == "")), F.col("url")))

df.filter((df.url).isNotNull()).show(truncate=False)

+-----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|url                    |item_type|city|lang |view|like|dslike|share|comment|time         |
+-----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|http://www.youtube.com |video    |NY  |en_US|90  |34  |9     |10   |2      |1634256039878|
|http://www.msn.com     |post     |TYO |jp   |86  |56  |20    |8    |1      |1634256040074|
|http://www.google.ca   |video    |CA  |en_US|81  |30  |23    |10   |null   |1634256039306|
|http://www.youtube.com |video    |CA  |en_US|99  |40  |24    |3    |7      |1634256039306|
|http://www.google.ca   |video    |CA  |en_CA|83  |36  |19    |9    |1      |1634256040074|
|http://www.times.com   |post     |WA  |en_US|91  |50  |21    |null |5      |1634256039303|
|http://www.google.ca   |post     |NY  |en   |79  |53  |15    |1    |2      |1634256040074|
|http://www.youtube.com |video    |CA  |en_CA|80  |25  |18    |5    |15     |1634256039303|
|http://www.msn.com     |post     |WA  |en   |91  |50  |null  |20   |5      |1634256040074|
|http://www.times.com   |post     |TYO |jp   |91  |50  |21    |20   |8      |1634256039303|
+-----------------------+---------+----+-----+----+----+------+-----+-------+-------------+

df=df.filter((df.url).isNotNull())

trim – Trim the spaces from both ends for the specified string column

df = df.withColumn("url", F.trim(df.url))

df.groupBy("url").count().show(truncate=False)

+----------------------+-----+
|url                   |count|
+----------------------+-----+
|http://www.google.ca  |3    |
|http://www.times.com  |2    |
|http://www.msn.com    |2    |
|http://www.youtube.com|3    |
+----------------------+-----+

startswith – Find rows starting with specified string

df.filter(df.lang.startswith('en')).show()

+--------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|                 url|item_type|city| lang|view|like|dslike|share|comment|         time|
+--------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|http://www.youtub...|    video|  NY|en_US|  90|  34|     9|   10|      2|1634256039878|
|http://www.google.ca|    video|  CA|en_US|  81|  30|    23|   10|   null|1634256039306|
|http://www.youtub...|    video|  CA|en_US|  99|  40|    24|    3|      7|1634256039306|
|http://www.google.ca|    video|  CA|en_CA|  83|  36|    19|    9|      1|1634256040074|
|http://www.times.com|     post|  WA|en_US|  91|  50|    21| null|      5|1634256039303|
|http://www.google.ca|     post|  NY|   en|  79|  53|    15|    1|      2|1634256040074|
|http://www.youtub...|    video|  CA|en_CA|  80|  25|    18|    5|     15|1634256039303|
|  http://www.msn.com|     post|  WA|   en|  91|  50|  null|   20|      5|1634256040074|
+--------------------+---------+----+-----+----+----+------+-----+-------+-------------+

filter – Filter rows using given condition

df.filter((df.city == "NY")).show(truncate=False)

+----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|url                   |item_type|city|lang |view|like|dslike|share|comment|time         |
+----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|http://www.youtube.com|video    |NY  |en_US|90  |34  |9     |10   |2      |1634256039878|
|http://www.google.ca  |post     |NY  |en   |79  |53  |15    |1    |2      |1634256040074|
+----------------------+---------+----+-----+----+----+------+-----+-------+-------------+

# filters records where city is NY and CA

df.filter((df.city.isin(["NY", "CA"]))).show(truncate=False)
#df.filter((df.city == "NY") | (df.city == "CA")).show(truncate=False)

+----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|url                   |item_type|city|lang |view|like|dslike|share|comment|time         |
+----------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|http://www.youtube.com|video    |NY  |en_US|90  |34  |9     |10   |2      |1634256039878|
|http://www.google.ca  |video    |CA  |en_US|81  |30  |23    |10   |null   |1634256039306|
|http://www.youtube.com|video    |CA  |en_US|99  |40  |24    |3    |7      |1634256039306|
|http://www.google.ca  |video    |CA  |en_CA|83  |36  |19    |9    |1      |1634256040074|
|http://www.google.ca  |post     |NY  |en   |79  |53  |15    |1    |2      |1634256040074|
|http://www.youtube.com|video    |CA  |en_CA|80  |25  |18    |5    |15     |1634256039303|
+----------------------+---------+----+-----+----+----+------+-----+-------+-------------+

# filters records where city is NOT in NY and CA

df.filter(~(df.city.isin(["NY", "CA"]))).show(truncate=False)

+--------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|url                 |item_type|city|lang |view|like|dslike|share|comment|time         |
+--------------------+---------+----+-----+----+----+------+-----+-------+-------------+
|http://www.msn.com  |post     |TYO |jp   |86  |56  |20    |8    |1      |1634256040074|
|http://www.times.com|post     |WA  |en_US|91  |50  |21    |null |5      |1634256039303|
|http://www.msn.com  |post     |WA  |en   |91  |50  |null  |20   |5      |1634256040074|
|http://www.times.com|post     |TYO |jp   |91  |50  |21    |20   |8      |1634256039303|
+--------------------+---------+----+-----+----+----+------+-----+-------+-------------+

groupBy – Groups DataFrame on specified columns, so we can run aggregation on them

df.groupBy("url", "city").agg(F.count("*")).show(truncate=False)

+----------------------+----+--------+
|url                   |city|count(1)|
+----------------------+----+--------+
|http://www.times.com  |WA  |1       |
|http://www.google.ca  |NY  |1       |
|http://www.times.com  |TYO |1       |
|http://www.youtube.com|NY  |1       |
|http://www.google.ca  |CA  |2       |
|http://www.youtube.com|CA  |2       |
|http://www.msn.com    |WA  |1       |
|http://www.msn.com    |TYO |1       |
+----------------------+----+--------+

pivot – Pivot a column of DataFrame and perform specific aggregation

df.groupBy("url").pivot("item_type").agg(F.count("*")).na.fill(0).show(truncate=False)
#df.groupBy("url").pivot("item_type").agg(F.count("*")).fillna(0).show(truncate=False)

+----------------------+----+-----+
|url                   |post|video|
+----------------------+----+-----+
|http://www.google.ca  |1   |2    |
|http://www.times.com  |2   |0    |
|http://www.msn.com    |2   |0    |
|http://www.youtube.com|0   |3    |
+----------------------+----+-----+

withColumn – Returns a new DataFrame by adding a new column or replacing the existing column that has the same name.

df_item_type.withColumn("total", (df_item_type.post + df_item_type.video)).show(truncate=False)

+----------------------+----+-----+-----+
|url                   |post|video|total|
+----------------------+----+-----+-----+
|http://www.google.ca  |1   |2    |3    |
|http://www.times.com  |2   |0    |2    |
|http://www.msn.com    |2   |0    |2    |
|http://www.youtube.com|0   |3    |3    |
+----------------------+----+-----+-----+

withColumnRenamed – Returns a new DataFrame by renaming an existing column. 

df = df.withColumnRenamed("dslike","dislike")

lit – Create a column of literal value

df.select("url","item_type").withColumn("item_type_no", F.when(F.col("item_type") == "video" ,F.lit("1")).otherwise(F.lit("2"))).show(truncate=False)

+----------------------+---------+------------+
|url                   |item_type|item_type_no|
+----------------------+---------+------------+
|http://www.youtube.com|video    |1           |
|http://www.msn.com    |post     |2           |
|http://www.google.ca  |video    |1           |
|http://www.youtube.com|video    |1           |
|http://www.google.ca  |video    |1           |
|http://www.times.com  |post     |2           |
|http://www.google.ca  |post     |2           |
|http://www.youtube.com|video    |1           |
|http://www.msn.com    |post     |2           |
|http://www.times.com  |post     |2           |
+----------------------+---------+------------+

df = df.select("url","item_type").withColumn("item_type_no", F.when(F.col("item_type") == "video" ,F.lit("1")).otherwise(F.lit("2")))

df.printSchema()

root
 |-- url: string (nullable = true)
 |-- item_type: string (nullable = true)
 |-- item_type_no: string (nullable = false)

cast – Covert column datatype

df = df.withColumn("item_type_no", df["item_type_no"].cast(T.IntegerType()))

df.printSchema()

root
 |-- url: string (nullable = true)
 |-- item_type: string (nullable = true)
 |-- item_type_no: integer (nullable = true)

Just like Joins in SQL, you can use PySpark to combine DataFrames. To demonstrate the joins, I will use the EMP and DEPT table. These tables are copy of EMP and DEPT table found in Oracle database in scott schema.

DEPT dataset

deptData = [
    (10,'ACCOUNTING','NEW YORK'),
    (20,'RESEARCH','DALLAS'),
    (30,'SALES','CHICAGO'),
    (40,'OPERATIONS','BOSTON')
]

dept_schema = ["deptno","dname","loc"]
df_dept = spark.createDataFrame(data=deptData, schema = dept_schema)

df_dept.show()

+------+----------+--------+
|deptno|     dname|     loc|
+------+----------+--------+
|    10|ACCOUNTING|NEW YORK|
|    20|  RESEARCH|  DALLAS|
|    30|     SALES| CHICAGO|
|    40|OPERATIONS|  BOSTON|
+------+----------+--------+

EMP DataFrame

empData = [
    (7369,'SMITH','CLERK',7902,'17-12-1980',800,None,20),
    (7499,'ALLEN','SALESMAN',7698,'20-2-1981',1600,300,30),
    (7521,'WARD','SALESMAN',7698,'22-2-1981',1250,500,30),
    (7566,'JONES','MANAGER',7839,'2-4-1981',2975,None,20),
    (7654,'MARTIN','SALESMAN',7698,'28-9-1981',1250,1400,30),
    (7698,'BLAKE','MANAGER',7839,'1-5-1981',2850,None,30),
    (7782,'CLARK','MANAGER',7839,'9-6-1981',2450,None,10),
    (7788,'SCOTT','ANALYST',7566,'13-JUL-87',3000,None,20),
    (7839,'KING','PRESIDENT',None,'17-11-1981',5000,None,10),
    (7844,'TURNER','SALESMAN',7698,'8-9-1981',1500,0,30),
    (7876,'ADAMS','CLERK',7788,'13-JUL-87',1100,None,20),
    (7900,'JAMES','CLERK',7698,'3-12-1981',950,None,30),
    (7902,'FORD','ANALYST',7566,'3-12-1981',3000,None,20),
    (7934,'MILLER','CLERK',7782,'23-1-1982',1300,None,10),
    (9999,'ANDY','DBA',None,'02-1-1981',4300,None,None)
]

emp_schema = ["empno","ename","job","mgr","hiredate","sal","comm", "deptno"]
df_emp = spark.createDataFrame(data=empData, schema = emp_schema)

df_emp.show()

+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH|    CLERK|7902|17-12-1980| 800|null|    20|
| 7499| ALLEN| SALESMAN|7698| 20-2-1981|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698| 22-2-1981|1250| 500|    30|
| 7566| JONES|  MANAGER|7839|  2-4-1981|2975|null|    20|
| 7654|MARTIN| SALESMAN|7698| 28-9-1981|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839|  1-5-1981|2850|null|    30|
| 7782| CLARK|  MANAGER|7839|  9-6-1981|2450|null|    10|
| 7788| SCOTT|  ANALYST|7566| 13-JUL-87|3000|null|    20|
| 7839|  KING|PRESIDENT|null|17-11-1981|5000|null|    10|
| 7844|TURNER| SALESMAN|7698|  8-9-1981|1500|   0|    30|
| 7876| ADAMS|    CLERK|7788| 13-JUL-87|1100|null|    20|
| 7900| JAMES|    CLERK|7698| 3-12-1981| 950|null|    30|
| 7902|  FORD|  ANALYST|7566| 3-12-1981|3000|null|    20|
| 7934|MILLER|    CLERK|7782| 23-1-1982|1300|null|    10|
| 9999|  ANDY|      DBA|null| 02-1-1981|4300|null|  null|
+-----+------+---------+----+----------+----+----+------+

sort – Returns a new DataFrame sorted by specified column

df_emp.sort("deptno").show()
# df_emp.sort(F.desc("deptno")).show()

+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7934|MILLER|    CLERK|7782| 23-1-1982|1300|null|    10|
| 7782| CLARK|  MANAGER|7839|  9-6-1981|2450|null|    10|
| 7839|  KING|PRESIDENT|null|17-11-1981|5000|null|    10|
| 7369| SMITH|    CLERK|7902|17-12-1980| 800|null|    20|
| 7876| ADAMS|    CLERK|7788| 13-JUL-87|1100|null|    20|
| 7902|  FORD|  ANALYST|7566| 3-12-1981|3000|null|    20|
| 7566| JONES|  MANAGER|7839|  2-4-1981|2975|null|    20|
| 7788| SCOTT|  ANALYST|7566| 13-JUL-87|3000|null|    20|
| 7844|TURNER| SALESMAN|7698|  8-9-1981|1500|   0|    30|
| 7499| ALLEN| SALESMAN|7698| 20-2-1981|1600| 300|    30|
| 7900| JAMES|    CLERK|7698| 3-12-1981| 950|null|    30|
| 7521|  WARD| SALESMAN|7698| 22-2-1981|1250| 500|    30|
| 7654|MARTIN| SALESMAN|7698| 28-9-1981|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839|  1-5-1981|2850|null|    30|
+-----+------+---------+----+----------+----+----+------+

Inner Join

"""
SELECT d.department_id,
       d.department_name,
       e.employee_name
FROM   departments d
       JOIN employees e ON d.department_id = e.department_id
WHERE  d.department_id >= 30
ORDER BY d.department_name;
"""

df_dept.filter(df_dept.deptno >= 30).join(df_emp, ['deptno'],how='inner').select(df_dept.deptno, df_dept.dname, df_emp.ename).show()

+------+-----+------+
|deptno|dname| ename|
+------+-----+------+
|    30|SALES| ALLEN|
|    30|SALES|  WARD|
|    30|SALES|MARTIN|
|    30|SALES| BLAKE|
|    30|SALES|TURNER|
|    30|SALES| JAMES|
+------+-----+------+

LEFT [OUTER] JOIN

"""
SELECT d.department_id,
       d. location,
       d.department_name,
       e.employee_name     
FROM   departments d
       LEFT OUTER JOIN employees e ON d.department_id = e.department_id AND e.salary >= 2000
WHERE  d.department_id >= 30
"""
df_dept.filter(df_dept.deptno >= 30).join(df_emp.filter(df_emp.sal >= 2000), ['deptno'],how='left_outer').select(df_dept.deptno, df_dept.loc, df_dept.dname, df_emp.ename).show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+-------+----------+-----+
|deptno|    loc|     dname|ename|
+------+-------+----------+-----+
|    30|CHICAGO|     SALES|BLAKE|
|    40| BOSTON|OPERATIONS| null|
+------+-------+----------+-----+

RIGHT [OUTER] JOIN

"""
SELECT d.department_id,
       d.location,
       d.department_name,
       e.employee_name     
FROM   employees e
       RIGHT OUTER JOIN departments d ON e.department_id = d.department_id
WHERE  d.department_id >= 30
"""
df_emp.join(df_dept.filter(df_dept.deptno >= 30), ['deptno'], how='rightouter').select(df_dept.deptno, df_dept.loc, df_dept.dname, df_emp.ename).show()

+------+-------+----------+------+
|deptno|    loc|     dname| ename|
+------+-------+----------+------+
|    30|CHICAGO|     SALES| ALLEN|
|    30|CHICAGO|     SALES|  WARD|
|    30|CHICAGO|     SALES|MARTIN|
|    30|CHICAGO|     SALES| BLAKE|
|    30|CHICAGO|     SALES|TURNER|
|    30|CHICAGO|     SALES| JAMES|
|    40| BOSTON|OPERATIONS|  null|
+------+-------+----------+------+

FULL [OUTER] JOIN

"""
SELECT *  
FROM   employees e
       FULL OUTER JOIN departments d ON e.department_id = d.department_id
"""
df_emp.join(df_dept, ['deptno'], how='full').show()

+------+-----+------+---------+----+----------+----+----+----------+--------+
|deptno|empno| ename|      job| mgr|  hiredate| sal|comm|     dname|     loc|
+------+-----+------+---------+----+----------+----+----+----------+--------+
|  null| 9999|  ANDY|      DBA|null| 02-1-1981|4300|null|      null|    null|
|    10| 7782| CLARK|  MANAGER|7839|  9-6-1981|2450|null|ACCOUNTING|NEW YORK|
|    10| 7839|  KING|PRESIDENT|null|17-11-1981|5000|null|ACCOUNTING|NEW YORK|
|    10| 7934|MILLER|    CLERK|7782| 23-1-1982|1300|null|ACCOUNTING|NEW YORK|
|    30| 7499| ALLEN| SALESMAN|7698| 20-2-1981|1600| 300|     SALES| CHICAGO|
|    30| 7521|  WARD| SALESMAN|7698| 22-2-1981|1250| 500|     SALES| CHICAGO|
|    30| 7654|MARTIN| SALESMAN|7698| 28-9-1981|1250|1400|     SALES| CHICAGO|
|    30| 7698| BLAKE|  MANAGER|7839|  1-5-1981|2850|null|     SALES| CHICAGO|
|    30| 7844|TURNER| SALESMAN|7698|  8-9-1981|1500|   0|     SALES| CHICAGO|
|    30| 7900| JAMES|    CLERK|7698| 3-12-1981| 950|null|     SALES| CHICAGO|
|    20| 7369| SMITH|    CLERK|7902|17-12-1980| 800|null|  RESEARCH|  DALLAS|
|    20| 7566| JONES|  MANAGER|7839|  2-4-1981|2975|null|  RESEARCH|  DALLAS|
|    20| 7788| SCOTT|  ANALYST|7566| 13-JUL-87|3000|null|  RESEARCH|  DALLAS|
|    20| 7876| ADAMS|    CLERK|7788| 13-JUL-87|1100|null|  RESEARCH|  DALLAS|
|    20| 7902|  FORD|  ANALYST|7566| 3-12-1981|3000|null|  RESEARCH|  DALLAS|
|    40| null|  null|     null|null|      null|null|null|OPERATIONS|  BOSTON|
+------+-----+------+---------+----+----------+----+----+----------+--------+

udf – Create a user defined function

def salaryScale(sal):
    if sal >= 4000:
        return 'high'
    elif sal > 2500 and sal < 4000:
        return 'medium'
    else:
        return 'low'

#convert to a UDF Function by passing in the function and return type of function
salaryScaleUDF = F.udf(salaryScale, T.StringType())

df_emp.withColumn("SalaryScale", salaryScaleUDF("sal")).show()

+-----+------+---------+----+----------+----+----+------+-----------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|SalaryScale|
+-----+------+---------+----+----------+----+----+------+-----------+
| 7369| SMITH|    CLERK|7902|17-12-1980| 800|null|    20|        low|
| 7499| ALLEN| SALESMAN|7698| 20-2-1981|1600| 300|    30|        low|
| 7521|  WARD| SALESMAN|7698| 22-2-1981|1250| 500|    30|        low|
| 7566| JONES|  MANAGER|7839|  2-4-1981|2975|null|    20|     medium|
| 7654|MARTIN| SALESMAN|7698| 28-9-1981|1250|1400|    30|        low|
| 7698| BLAKE|  MANAGER|7839|  1-5-1981|2850|null|    30|     medium|
| 7782| CLARK|  MANAGER|7839|  9-6-1981|2450|null|    10|        low|
| 7788| SCOTT|  ANALYST|7566| 13-JUL-87|3000|null|    20|     medium|
| 7839|  KING|PRESIDENT|null|17-11-1981|5000|null|    10|       high|
| 7844|TURNER| SALESMAN|7698|  8-9-1981|1500|   0|    30|        low|
| 7876| ADAMS|    CLERK|7788| 13-JUL-87|1100|null|    20|        low|
| 7900| JAMES|    CLERK|7698| 3-12-1981| 950|null|    30|        low|
| 7902|  FORD|  ANALYST|7566| 3-12-1981|3000|null|    20|     medium|
| 7934|MILLER|    CLERK|7782| 23-1-1982|1300|null|    10|        low|
| 9999|  ANDY|      DBA|null| 02-1-1981|4300|null|  null|       high|
+-----+------+---------+----+----------+----+----+------+-----------+

Window –  Window Function defines window in DataFrames.

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

windowSpec = Window().partitionBy(['deptno']).orderBy(F.desc('sal'))

rank – Window function

Returns the rank of rows within a window partition

df_emp.withColumn("rank", F.rank().over(windowSpec)).select("empno", "ename", "job", "mgr", "sal", "deptno", "rank").show()

+-----+------+---------+----+----+------+----+
|empno| ename|      job| mgr| sal|deptno|rank|
+-----+------+---------+----+----+------+----+
| 9999|  ANDY|      DBA|null|4300|  null|   1|
| 7839|  KING|PRESIDENT|null|5000|    10|   1|
| 7782| CLARK|  MANAGER|7839|2450|    10|   2|
| 7934|MILLER|    CLERK|7782|1300|    10|   3|
| 7698| BLAKE|  MANAGER|7839|2850|    30|   1|
| 7499| ALLEN| SALESMAN|7698|1600|    30|   2|
| 7844|TURNER| SALESMAN|7698|1500|    30|   3|
| 7521|  WARD| SALESMAN|7698|1250|    30|   4|
| 7654|MARTIN| SALESMAN|7698|1250|    30|   4|
| 7900| JAMES|    CLERK|7698| 950|    30|   6|
| 7788| SCOTT|  ANALYST|7566|3000|    20|   1|
| 7902|  FORD|  ANALYST|7566|3000|    20|   1|
| 7566| JONES|  MANAGER|7839|2975|    20|   3|
| 7876| ADAMS|    CLERK|7788|1100|    20|   4|
| 7369| SMITH|    CLERK|7902| 800|    20|   5|
+-----+------+---------+----+----+------+----+

dense_rank – Window function

The difference between rank and dense_rank is that dense_rank does not leave gaps in ranking sequence when there are ties.

df_emp.withColumn("dense_rank", F.dense_rank().over(windowSpec)).select("empno", "ename", "job", "mgr", "sal", "deptno", "dense_rank").show()

+-----+------+---------+----+----+------+----------+
|empno| ename|      job| mgr| sal|deptno|dense_rank|
+-----+------+---------+----+----+------+----------+
| 9999|  ANDY|      DBA|null|4300|  null|         1|
| 7839|  KING|PRESIDENT|null|5000|    10|         1|
| 7782| CLARK|  MANAGER|7839|2450|    10|         2|
| 7934|MILLER|    CLERK|7782|1300|    10|         3|
| 7698| BLAKE|  MANAGER|7839|2850|    30|         1|
| 7499| ALLEN| SALESMAN|7698|1600|    30|         2|
| 7844|TURNER| SALESMAN|7698|1500|    30|         3|
| 7521|  WARD| SALESMAN|7698|1250|    30|         4|
| 7654|MARTIN| SALESMAN|7698|1250|    30|         4|
| 7900| JAMES|    CLERK|7698| 950|    30|         5|
| 7788| SCOTT|  ANALYST|7566|3000|    20|         1|
| 7902|  FORD|  ANALYST|7566|3000|    20|         1|
| 7566| JONES|  MANAGER|7839|2975|    20|         2|
| 7876| ADAMS|    CLERK|7788|1100|    20|         3|
| 7369| SMITH|    CLERK|7902| 800|    20|         4|
+-----+------+---------+----+----+------+----------+

ntile – Window function

Returns the ntile group id (from 1 to n inclusive) in an ordered window partition

df_emp.withColumn("ntile", F.ntile(3).over(windowSpec)).select("empno", "ename", "job", "mgr", "sal", "deptno", "ntile").show()

+-----+------+---------+----+----+------+-----+
|empno| ename|      job| mgr| sal|deptno|ntile|
+-----+------+---------+----+----+------+-----+
| 9999|  ANDY|      DBA|null|4300|  null|    1|
| 7839|  KING|PRESIDENT|null|5000|    10|    1|
| 7782| CLARK|  MANAGER|7839|2450|    10|    2|
| 7934|MILLER|    CLERK|7782|1300|    10|    3|
| 7698| BLAKE|  MANAGER|7839|2850|    30|    1|
| 7499| ALLEN| SALESMAN|7698|1600|    30|    1|
| 7844|TURNER| SALESMAN|7698|1500|    30|    2|
| 7521|  WARD| SALESMAN|7698|1250|    30|    2|
| 7654|MARTIN| SALESMAN|7698|1250|    30|    3|
| 7900| JAMES|    CLERK|7698| 950|    30|    3|
| 7788| SCOTT|  ANALYST|7566|3000|    20|    1|
| 7902|  FORD|  ANALYST|7566|3000|    20|    1|
| 7566| JONES|  MANAGER|7839|2975|    20|    2|
| 7876| ADAMS|    CLERK|7788|1100|    20|    2|
| 7369| SMITH|    CLERK|7902| 800|    20|    3|
+-----+------+---------+----+----+------+-----+

lag – Window function

Returns the value that is offset rows before the current row, and default if there is less than offset rows before the current row.

df_emp.withColumn("sal_prev",F.lag("sal",1, 0).over(windowSpec)) \
.select("empno", "ename", "job", "sal", "sal_prev")\
.show()

+-----+------+---------+----+--------+
|empno| ename|      job| sal|sal_prev|
+-----+------+---------+----+--------+
| 9999|  ANDY|      DBA|4300|       0|
| 7839|  KING|PRESIDENT|5000|       0|
| 7782| CLARK|  MANAGER|2450|    5000|
| 7934|MILLER|    CLERK|1300|    2450|
| 7698| BLAKE|  MANAGER|2850|       0|
| 7499| ALLEN| SALESMAN|1600|    2850|
| 7844|TURNER| SALESMAN|1500|    1600|
| 7521|  WARD| SALESMAN|1250|    1500|
| 7654|MARTIN| SALESMAN|1250|    1250|
| 7900| JAMES|    CLERK| 950|    1250|
| 7788| SCOTT|  ANALYST|3000|       0|
| 7902|  FORD|  ANALYST|3000|    3000|
| 7566| JONES|  MANAGER|2975|    3000|
| 7876| ADAMS|    CLERK|1100|    2975|
| 7369| SMITH|    CLERK| 800|    1100|
+-----+------+---------+----+--------+

lead – Window function

Returns the value that is offset rows after the current row, and default if there is less than offset rows after the current row.

df_emp.withColumn("sal_next",F.lead("sal",1, 0).over(windowSpec)) \
.select("empno", "ename", "job", "sal", "sal_next")\
.show()

+-----+------+---------+----+--------+
|empno| ename|      job| sal|sal_next|
+-----+------+---------+----+--------+
| 9999|  ANDY|      DBA|4300|       0|
| 7839|  KING|PRESIDENT|5000|    2450|
| 7782| CLARK|  MANAGER|2450|    1300|
| 7934|MILLER|    CLERK|1300|       0|
| 7698| BLAKE|  MANAGER|2850|    1600|
| 7499| ALLEN| SALESMAN|1600|    1500|
| 7844|TURNER| SALESMAN|1500|    1250|
| 7521|  WARD| SALESMAN|1250|    1250|
| 7654|MARTIN| SALESMAN|1250|     950|
| 7900| JAMES|    CLERK| 950|       0|
| 7788| SCOTT|  ANALYST|3000|    3000|
| 7902|  FORD|  ANALYST|3000|    2975|
| 7566| JONES|  MANAGER|2975|    1100|
| 7876| ADAMS|    CLERK|1100|     800|
| 7369| SMITH|    CLERK| 800|       0|
+-----+------+---------+----+--------+

first – Window function

Returns the first value in a group

df_emp.withColumn("sal_first",F.first("sal",True).over(windowSpec)) \
.select("empno", "ename", "job", "sal", "sal_first")\
.show()

+-----+------+---------+----+---------+
|empno| ename|      job| sal|sal_first|
+-----+------+---------+----+---------+
| 9999|  ANDY|      DBA|4300|     4300|
| 7839|  KING|PRESIDENT|5000|     5000|
| 7782| CLARK|  MANAGER|2450|     5000|
| 7934|MILLER|    CLERK|1300|     5000|
| 7698| BLAKE|  MANAGER|2850|     2850|
| 7499| ALLEN| SALESMAN|1600|     2850|
| 7844|TURNER| SALESMAN|1500|     2850|
| 7521|  WARD| SALESMAN|1250|     2850|
| 7654|MARTIN| SALESMAN|1250|     2850|
| 7900| JAMES|    CLERK| 950|     2850|
| 7788| SCOTT|  ANALYST|3000|     3000|
| 7902|  FORD|  ANALYST|3000|     3000|
| 7566| JONES|  MANAGER|2975|     3000|
| 7876| ADAMS|    CLERK|1100|     3000|
| 7369| SMITH|    CLERK| 800|     3000|
+-----+------+---------+----+---------+

max, avg – Window function

partition = Window.partitionBy("deptno")
​
df_emp.withColumn("max_sal", F.max(F.col("sal")).over(partition))\
        .withColumn("avg_sal", F.round(F.avg(F.col("sal")).over(partition),1))\
        .withColumn("first_sal", F.first(F.col("sal")).over(partition))\
        .withColumn("last_sal", F.last(F.col("sal")).over(partition)) \
        .select("empno","ename", "job", "sal", "deptno", "max_sal", "avg_sal", "first_sal", "last_sal")\
        .show()

+-----+------+---------+----+------+-------+-------+---------+--------+
|empno| ename|      job| sal|deptno|max_sal|avg_sal|first_sal|last_sal|
+-----+------+---------+----+------+-------+-------+---------+--------+
| 9999|  ANDY|      DBA|4300|  null|   4300| 4300.0|     4300|    4300|
| 7782| CLARK|  MANAGER|2450|    10|   5000| 2916.7|     2450|    1300|
| 7839|  KING|PRESIDENT|5000|    10|   5000| 2916.7|     2450|    1300|
| 7934|MILLER|    CLERK|1300|    10|   5000| 2916.7|     2450|    1300|
| 7499| ALLEN| SALESMAN|1600|    30|   2850| 1566.7|     1600|     950|
| 7521|  WARD| SALESMAN|1250|    30|   2850| 1566.7|     1600|     950|
| 7654|MARTIN| SALESMAN|1250|    30|   2850| 1566.7|     1600|     950|
| 7698| BLAKE|  MANAGER|2850|    30|   2850| 1566.7|     1600|     950|
| 7844|TURNER| SALESMAN|1500|    30|   2850| 1566.7|     1600|     950|
| 7900| JAMES|    CLERK| 950|    30|   2850| 1566.7|     1600|     950|
| 7369| SMITH|    CLERK| 800|    20|   3000| 2175.0|      800|    3000|
| 7566| JONES|  MANAGER|2975|    20|   3000| 2175.0|      800|    3000|
| 7788| SCOTT|  ANALYST|3000|    20|   3000| 2175.0|      800|    3000|
| 7876| ADAMS|    CLERK|1100|    20|   3000| 2175.0|      800|    3000|
| 7902|  FORD|  ANALYST|3000|    20|   3000| 2175.0|      800|    3000|
+-----+------+---------+----+------+-------+-------+---------+--------+

getNumPartitions

Returns the number of partitions in RDD

df_emp.rdd.getNumPartitions()

4

Row count from each partition

from pyspark.sql.functions  import spark_partition_id
df_emp.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show()

+-----------+-----+
|partitionId|count|
+-----------+-----+
|          1|    3|
|          3|    6|
|          2|    3|
|          0|    3|
+-----------+-----+

# Another way 

df_emp.rdd.glom().map(len).collect()
[3, 3, 3, 6]

Hope you find this helpful!

Leave a comment