Snowflake Connector Spark Zepplin
Tutorial to add snowflake connector to the zepplin notebooks
You can find the snowflake connector documents in the below link from the Snowflake official documentation
In order to import the snowflake connector you need to add two jars to spark
- Snowflake JDBC driver,
- Spark Connector, use the
So to add both these we can use the the below two options of spark zepplin
--packages
--jars
In order to add the packages, all we need to do is add this line to the spark.jars.packages property of zepplin.
If you open the interpreter we can go to the spark configurations and add the below line to the properties
spark.jars.packages
net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.12:2.4.14
And below image shows how the configuration is added
One of the common issues I faced was using the correct scala version. In the EMR I was using the correct scala version was 2.11. and in the above package its “2.14”. So make sure to align with the proper scala version to avoid errors
Another approach is to manually download the jars and add it to the path. You can download the jars from the below links
Snowflake Spark connector:
Snowflake JDBC Connector :
After manually downloading the Jars we can add to the path
spark.jars
After adding the path to the zepplin configuration we should now be all set to run the snowflake spark connector.
Sample spark scala code to create a connector
import org.apache.spark.sql._
import org.apache.spark.sql.DataFrameimport net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAMEval user = "abcd"
val password = "******"val clientKey = "********"
val clientSecret = "*********"val options = Map("sfUrl" -> "*****.snowflakecomputing.com",
"sfUser" -> user,
"sfPassword" -> password,
"sfDatabase" -> "database",
"sfSchema" -> "dw",
"sfRole" -> "role",
"sfWarehouse" -> "warehouse",
"awsAccessKey" -> "********",
"awsSecretKey" -> "********",
"tempdir" -> "s3a://*******/")
Reading table
val df: DataFrame = spark.sqlContext.read.format(SNOWFLAKE_SOURCE_NAME).options(options).option("dbtable", "tableName").load()
Writing to a table
df.write.format("snowflake")
.options(options)
.option("continue_on_error", "on")
.option("dbtable", "WritingTable")
.mode(SaveMode.Append).save()