|

How to Build an End-to-End Data Engineering and Machine Learning Pipeline with Apache Spark and PySpark

In this tutorial, we discover how to harness Apache Spark’s strategies utilizing PySpark immediately in Google Colab. We start by establishing a neighborhood Spark session, then progressively transfer by means of transformations, SQL queries, joins, and window features. We additionally construct and consider a easy machine-learning mannequin to predict person subscription sorts and lastly display how to save and reload Parquet information. Also, we expertise how Spark’s distributed data-processing capabilities will be leveraged for analytics and ML workflows even in a single-node Colab surroundings. Check out the FULL CODES here.

!pip set up -q pyspark==3.5.1
from pyspark.sql import SparkSession, features as F, Window
from pyspark.sql.sorts import IntegerType, StringType, StructType, StructField, FloatType
from pyspark.ml.function import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.analysis import MulticlassClassificationEvaluator


spark = (SparkSession.builder.appName("ColabSparkSuperiorTutorial")
        .grasp("native[*]")
        .config("spark.sql.shuffle.partitions", "4")
        .getOrCreate())
print("Spark model:", spark.model)


knowledge = [
   (1, "Alice", "IN", "2025-10-01", 56000.0, "premium"),
   (2, "Bob", "US", "2025-10-03", 43000.0, "standard"),
   (3, "Carlos", "IN", "2025-09-27", 72000.0, "premium"),
   (4, "Diana", "UK", "2025-09-30", 39000.0, "standard"),
   (5, "Esha", "IN", "2025-10-02", 85000.0, "premium"),
   (6, "Farid", "AE", "2025-10-02", 31000.0, "basic"),
   (7, "Gita", "IN", "2025-09-29", 46000.0, "standard"),
   (8, "Hassan", "PK", "2025-10-01", 52000.0, "premium"),
]
schema = StructType([
   StructField("id", IntegerType(), False),
   StructField("name", StringType(), True),
   StructField("country", StringType(), True),
   StructField("signup_date", StringType(), True),
   StructField("income", FloatType(), True),
   StructField("plan", StringType(), True),
])
df = spark.createDataBody(knowledge, schema)
df.present()

We start by establishing PySpark, initializing the Spark session, and getting ready our dataset. We create a structured DataBody containing person info, together with nation, revenue, and plan sort. This types the inspiration for all transformations and analyses that observe. Check out the FULL CODES here.

df2 = (df.withColumn("signup_ts", F.to_timestamp("signup_date"))
        .withColumn("yr", F.yr("signup_ts"))
        .withColumn("month", F.month("signup_ts"))
        .withColumn("is_india", (F.col("nation") == "IN").solid("int")))
df2.present()


df2.createOrReplaceTempView("customers")
spark.sql("""
SELECT nation, COUNT(*) AS cnt, AVG(revenue) AS avg_income
FROM customers
GROUP BY nation
ORDER BY cnt DESC
""").present()


w = Window.partitionBy("nation").orderBy(F.col("revenue").desc())
df_ranked = df2.withColumn("income_rank_in_country", F.rank().over(w))
df_ranked.present()


def plan_priority(plan):
   if plan == "premium": return 3
   if plan == "customary": return 2
   if plan == "fundamental": return 1
   return 0
plan_priority_udf = F.udf(plan_priority, IntegerType())
df_udf = df_ranked.withColumn("plan_priority", plan_priority_udf(F.col("plan")))
df_udf.present()

We now carry out varied knowledge transformations, add new columns, and register the DataBody as a SQL desk. We discover Spark SQL for aggregation and apply window features to rank customers by revenue. We additionally introduce a user-defined operate (UDF) to assign precedence ranges to subscription plans. Check out the FULL CODES here.

country_data = [
   ("IN", "Asia", 1.42), ("US", "North America", 0.33),
   ("UK", "Europe", 0.07), ("AE", "Asia", 0.01), ("PK", "Asia", 0.24),
]
country_schema = StructType([
   StructField("country", StringType(), True),
   StructField("region", StringType(), True),
   StructField("population_bn", FloatType(), True),
])
country_df = spark.createDataBody(country_data, country_schema)


joined = df_udf.alias("u").be a part of(country_df.alias("c"), on="nation", how="left")
joined.present()


region_stats = (joined.groupBy("area", "plan")
               .agg(F.depend("*").alias("customers"),
                    F.spherical(F.avg("revenue"), 2).alias("avg_income"))
               .orderBy("area", "plan"))
region_stats.present()

We enrich our person dataset by becoming a member of it with country-level metadata that features area and inhabitants. We then compute analytical summaries comparable to common revenue and person counts by area and plan sort. This step demonstrates how Spark simplifies the seamless mixture and aggregation of enormous datasets. Check out the FULL CODES here.

ml_df = joined.withColumn("label", (F.col("plan") == "premium").solid("int")).na.drop()
country_indexer = StringIndexer(inputCol="nation", outputCol="country_idx", deal withInlegitimate="maintain")
country_fitted = country_indexer.match(ml_df)
ml_df2 = country_fitted.remodel(ml_df)


assembler = VectorAssembler(inputCols=["income", "country_idx", "plan_priority"], outputCol="options")
ml_final = assembler.remodel(ml_df2)
train_df, test_df = ml_final.randomSplit([0.7, 0.3], seed=42)


lr = LogisticRegression(featuresCol="options", labelCol="label", maxIter=20)
lr_model = lr.match(train_df)
preds = lr_model.remodel(test_df)
preds.choose("identify", "nation", "revenue", "plan", "label", "prediction", "chance").present(truncate=False)


evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
acc = evaluator.consider(preds)
print("Classification accuracy:", acc)

We transfer into machine studying by getting ready knowledge for mannequin coaching and function engineering. We index categorical columns, assemble options, and practice a logistic regression mannequin to predict premium customers. We then consider its accuracy, showcasing how Spark MLlib integrates simply into the information workflow. Check out the FULL CODES here.

output_path = "/content material/spark_users_parquet"
joined.write.mode("overwrite").parquet(output_path)
parquet_df = spark.learn.parquet(output_path)
print("Parquet reloaded:")
parquet_df.present()


latest = spark.sql("""
SELECT identify, nation, revenue, signup_ts
FROM customers
WHERE signup_ts >= '2025-10-01'
ORDER BY signup_ts DESC
""")
latest.present()


latest.clarify()
spark.cease()

We conclude by writing the processed knowledge to Parquet format and studying it again into Spark for verification. We run a SQL question to extract latest signups and examine the question plan for optimization insights. Finally, we gracefully cease the Spark session to full our workflow.

In conclusion, we achieve a sensible understanding of how PySpark unifies knowledge engineering and machine studying duties inside a single scalable framework. We witness how easy DataBody transformations evolve into SQL analytics, function engineering, and predictive modeling, all whereas staying inside Google Colab. By experimenting with these ideas, we strengthen our capability to prototype and deploy Spark-based knowledge options effectively in each native and distributed setups.


Check out the FULL CODES here. Feel free to take a look at our GitHub Page for Tutorials, Codes and Notebooks. Also, be at liberty to observe us on Twitter and don’t overlook to be a part of our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

The publish How to Build an End-to-End Data Engineering and Machine Learning Pipeline with Apache Spark and PySpark appeared first on MarkTechPost.

Similar Posts