Skip to main content

python csv transformation

In order to quickly insert data via ADF I needed to ensure the csv file I was provided contained a PartitionKey and RowKey column.

To quickly create these columns derived from exists data I decided to run a small python script in Databricks as this was a fast method to deal with data in such quantities.

Pre-requisites:

  • Databricks instance
  • Cluster not running in serverless (need to set spark.conf)
  • SAS token for the storage account with READ/WRITE/LIST permission
read csv from blob, transform and upload
from pyspark.sql.functions import regexp_replace, substring

storage_account_name = ""
container_name = ""
file_path = "....csv"

spark.conf.set(f"fs.azure.account.auth.type.{storage_account_name}.dfs.core.windows.net", "SAS")
spark.conf.set(f"fs.azure.sas.token.provider.type.{storage_account_name}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set(f"fs.azure.sas.fixed.token.{storage_account_name}.dfs.core.windows.net", "SAS TOKEN HERE")

df = spark.read.format("csv") \
.option("delimiter", ",") \
.option("header", "true") \
.option("inferSchema", "false") \
.load(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/{file_path}")

df = df.withColumn("PartitionKey", substring(regexp_replace("COL1", " ", ""), 0, 3)) \
.withColumn("RowKey", regexp_replace("COL1", " ", ""))

df.coalesce(1).write.format("csv") \
.option("header", "true") \
.mode("overwrite") \
.save(f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/")

display(df).limit(20)