Data Engineering

Building a Spark Connect Engine on Snowflake: An Engineering Deep Dive

Last month, we announced Snowpark Connect for Apache Spark™, now available in public preview. In this post we will go beyond the press release and examine the engineering behind this feature, discussing best practices and challenges we faced while developing it.

Snowpark Connect diagram

Snowpark Connect, the latest addition to Snowpark, expands the developer experience by enabling Spark code to run on Snowflake (without a hosted Spark engine!) as an alternative to Snowpark DataFrames. To take advantage of platform-specific integrations such as Snowflake SQL, AI or pandas, Snowpark continues to offer a suite of easy-to-use tools for developers.  

What is Spark Connect?

Spark Connect is a client-server architecture introduced in Apache Spark™ 3.4 that decouples the client interface from the Spark execution engine. It allows remote clients to connect to Spark clusters over a gRPC-based protocol. This enables developers to interact with Spark from various languages and environments (i.e., web apps, IDEs, notebooks) without requiring a full Spark or JVM installation on the client side. 

How does it work?

At its core, Spark Connect operates by sending logical plans from the client to the server. The server then executes these plans and returns the results. This is achieved through gRPC and protocol buffers (protobufs).

What do messages look like?

Spark Connect messages are defined using protobuf schemas. These schemas are defined in Spark source code and describe the structure of the logical plans, expressions and results that are exchanged between the client and the server. This standardized message format enables interoperability across different clients and server implementations. For example, take the following Spark code:

# My Spark session is connected to my Snowpark Connect endpoint

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
df.show()

Similar to the classic mode of Apache Spark, Spark Connect will continue to build up the underlying query plan until execution is triggered (via e.g., persist, collect, show, etc.). When the show command from above is executed, this is the ExecutePlanRequest (source) that is received by the gRPC server that extends OSS Spark’s SparkConnectServiceServicer (source):

session_id: "8069f6ab-fb2d-4796-8748-fb8998a05aa2"
user_context {
  user_id: "dpetersohn"
}
plan {
  root {
    common {
      plan_id: 2
    }
    show_string {
      input {
        common {
          plan_id: 1
        }
        to_df {
          input {
            common {
              plan_id: 0
            }
            local_relation {
              data: "\377\377\377\377\250\000\000\000\020
                     <binary data continues>
                     \377\377\377\377\000\000\000\000"
              schema: "{\"fields\":[{\"metadata\":{},\"name\":\"id\",\"nullable\":true,\"type\":\"long\"},{\"metadata\":{},\"name\":\"age\",\"nullable\":true,\"type\":\"long\"}],\"type\":\"struct\"}"
            }
          }
          column_names: "id"
          column_names: "age"
        }
      }
      num_rows: 20
      truncate: 20
    }
  }
}
client_type: "_SPARK_CONNECT_PYTHON spark/3.5.6 os/darwin python/3.11.5"
request_options {
  reattach_options {
    reattachable: true
  }
}
operation_id: "20ae08d2-6048-41cb-9c94-d99d34c7bcc8"

At the top level, we have session_id, user_context, plan, client_type, request_options and operation_id. The plan field contains the Spark query plan that was created in the client. Notice that the structure is nested, as a tree. 

Building a “Hello World” example

Let's walk through how this works with a simple "Hello World” example for Spark Connect, using Snowpark and designed for high-performance relational OLAP workloads. We will implement the gRPC server in Python using the Snowpark Python client, which is optimized for Snowflake vectorized compute.

Building a Python gRPC server to accept Spark Connect requests

Stating the obvious: The first thing we need if we are going to accept gRPC requests is a gRPC server. How we implement this server is reasonably straightforward: We need to create our own Servicer class that extends SparkConnectServiceServicer (source), then create and add the servicer to a gRPC server we create:

import grpc
from concurrent import futures
import pyspark.sql.connect.proto.base_pb2_grpc as proto_base_pb2_grpc


class MySparkConnectServicer(
    proto_base_pb2_grpc.SparkConnectServiceServicer
):
    # TODO: Override request handlers
    pass


server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
proto_base_pb2_grpc.add_SparkConnectServiceServicer_to_server(
    MySparkConnectServicer(), server
)
server.add_insecure_port("[::]:15002")
server.start()
server.wait_for_termination()

You can run the above code yourself and connect to it on localhost from a different Python script/repl:

from pyspark.sql.connect.session import SparkSession

spark = SparkSession.builder.remote("sc://localhost:15002").getOrCreate()
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

Since we haven’t implemented any request handlers, every request will result in a message that looks like this:

SparkConnectGrpcException: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.UNIMPLEMENTED
	details = "Method not implemented!"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Method not implemented!", grpc_status:12}"
>

With a successful localhost stub implementation, it’s time to start handling the requests properly. 

Hello World: Make Spark code behave as expected

One of the core tenets of the product development was to embrace Spark behaviors. The Snowpark client is consistent with Snowflake SQL, and so naturally any differences between Snowflake SQL and Spark behaviors would cause papercuts when trying to migrate. We decided to choose a relatively simple workload that would require some minor code changes to migrate from Spark to Snowpark. We landed on this:

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
df.show()
print(df.count())
df.select(df.id).show()
df.select("age").show()
df.withColumn(
    "plus1k", df["age"] + 1_000
).select("id", "plus1k").show()
parquet_df = spark.read.parquet("s3://my/bucket/myfile.parquet")
parquet_df.show()

This workload contains:

  • Creating a DataFrame in the client with lowercase column names

  • Printing the dataframe content

  • A group by (df.count)

  • A column projection with the dot operator (dr.id) and string ("age")

  • Doing a simple operation on a projected column

  • Create a new column with the result of the simple operation

  • Reading a Parquet file from S3

In the interest of keeping this blog post at a manageable length, we will focus on the first two operations: createDataFrame and show. You can use the remaining code as you build your own Hello World Spark Connect implementation.

How does a createDataFrame work in Spark Connect?

You may recall that no queries are sent to the server until execution is triggered. The createDataFrame operation is received at the same time as show, since show triggers execution. createDataFrame arrives in the form of a local_relation field in the Relation protobuf (source). The relation field containing the Relation protobuf is contained within the request.plan.root field of the ExecutePlanRequest we receive in ExecutePlan, (technically it is found in the input field of another Relation).

local_relation {
    data: "\377\377\377\377\250\000\000\000\020
            <binary data continues>
           \000\377\377\377\377\000\000\000\000"
    schema: "{\"fields\":[{\"metadata\":{},\"name\":\"id\",\"nullable\":true,\"type\":\"long\"},{\"metadata\":{},\"name\":\"age\",\"nullable\":true,\"type\":\"long\"}],\"type\":\"struct\"}"
}

We have two fields: data and schema. The data field appears to be a string of bytes, and if we look at the Spark source, we get a hint for how to go about deserializing the bytes:

message LocalRelation {
  // (Optional) Local collection data serialized into Arrow IPC streaming format which contains
  // the schema of the data.
  optional bytes data = 1;


  // (Optional) The schema of local data.
  // It should be either a DDL-formatted type string or a JSON string.
  //
  // The server side will update the column names and data types according to this schema.
  // If the 'data' is not provided, then this schema will be required.
  optional string schema = 2;
}

Great! Now we can go about converting the data into an Arrow table by using pyarrow.ipc:

import pyarrow as pa

# rel is the Relation protobuf object referenced above
buffer_reader = pa.BufferReader(rel.local_relation.data)
with pa.ipc.open_stream(buffer_reader) as reader:
    pyarrow_table = pa.Table.from_batches([batch for batch in reader])

Now that we’ve got our data in a pyarrow table, we can easily move it into Snowflake via Snowpark, but there’s also a schema field that we need to account for. In our case, schema is a JSON string. We can use json.loads to convert it into a nested dictionary and then build our own Snowpark schema by mapping the individual types, defined here in their string name (e.g., ”long”) to the Snowpark equivalent. The logic for converting types is omitted because it’s trivial in most cases (but not all!):

import json

snowpark_schema = convert_json_schema(json.loads(rel.local_relation.schema))

With both the pyarrow table and the schema, we can easily create a Snowpark DataFrame:

snowpark_df = snowpark_session.create_dataframe(
    pyarrow_table.to_pandas(),
    snowpark_schema,
)

With the snowpark_df created, we are ready to move on to the df.show() part of the Hello World example.

How does show work in Spark Connect?

df.show() on the Spark client is passed through Spark Connect as a show_string relation object (source). On the server, the show_string relation looks like this:

show_string {
  input {
    ...  # This is the Relation containing the local_relation
  }
  num_rows: 20
  truncate: 20
} 

This is pretty straightforward, so let’s take a look at how the Spark Connect client expects the data to be returned. In the pyspark.sql.connect.DataFrame class, we see the following code in _show_string, which is called by show (source):

table, _ = DataFrame(
    plan.ShowString(
        child=self._plan,
        num_rows=n,
        truncate=_truncate,
        vertical=vertical,
    ),
    session=self._session,
)._to_table()
return table[0][0].as_py()

where _to_table returns a pyarrow.Table object. So basically, we need to serialize the string we want to show into an Arrow IPC stream from a 1x1 pyarrow.Table. That’s a pretty elegant way to handle show, since it reuses the Arrow-based transport mechanism already used for other operations (e.g., collect), and this also allows the server to completely control the string formatting.

For the sake of simplicity, we can just use Snowpark’s internal _show_string method to generate the string we want to return (Note: this method is not intended for external use). Then, we can create the required 1x1 pyarrow.Table with the result:

spark_show_string = snowpark_df._show_string()
one_by_one = pa.Table.from_arrays(
    [pa.array([spark_show_string])], 
    names=["show_string"],
)
outstream = pa.BufferOutputStream()
batch = one_by_one.to_batches()
with pa.ipc.new_stream(outstream, batch.schema) as writer:
    writer.write_batch(batch)
# Convert to bytes so we can send it over gRPC
show_string_bytes = outstream.getvalue().to_pybytes()

The result is now ready to be passed back from the server by returning an ExecutePlanResponse from the ExecutePlan method on our Servicer object:

import pyspark.sql.connect.proto.base_pb2 as proto_base

yield proto_base.ExecutePlanResponse(
    session_id=request.session_id,
    operation_id=MY_OP_ID,  # The operation id is controlled by the server
    arrow_batch=proto_base.ExecutePlanResponse.ArrowBatch(
        row_count=1,  # Our show string object only has 1 row
        data=show_string_bytes,
    ),
)

At this point our ExecutePlan method looks something like this:

import pyarrow as pa
import pyspark.sql.connect.proto.base_pb2 as proto_base

def ExecutePlan(self, request: proto_base.ExecutePlanRequest, context):
    match request.plan.WhichOneof("op_type"):
        case "root":
            # root is the root Relation protobuf message
            match request.plan.root.WhichOneof("rel_type"):
                case "show_string":
                    # Our hacky hello world example :)
                    rel = request.plan.root.show_string.input.to_df.input
                    buffer_reader = pa.BufferReader(rel.local_relation.data)
                    with pa.ipc.open_stream(buffer_reader) as reader:
                        pyarrow_table = pa.Table.from_batches(
                            [batch for batch in reader]
                        )
                    snowpark_df = snowpark_session.create_dataframe(
                        pyarrow_table.to_pandas(),
                        snowpark_schema,
                    )
                    spark_show_string = snowpark_df._show_string()
                    one_by_one = pa.Table.from_arrays(
                        [pa.array([spark_show_string])], 
                        names=["show_string"],
                    )
                    outstream = pa.BufferOutputStream()
                    batch = one_by_one.to_batches()
                    with pa.ipc.new_stream(outstream, batch.schema) as writer:
                        writer.write_batch(batch)
                    # Convert to bytes so we can send it over gRPC
                    show_string_bytes = outstream.getvalue().to_pybytes()
                    yield proto_base.ExecutePlanResponse(
                        session_id=request.session_id,
                        operation_id=MY_OP_ID,
                        arrow_batch=proto_base.ExecutePlanResponse.ArrowBatch(
                            row_count=1,
                            data=show_string_bytes,
                        ),
                    )
                case _:
                    raise NotImplementedError()
        case _:
            raise NotImplementedError()
    yield proto_base.ExecutePlanResponse(
        session_id=request.session_id,
        operation_id=SERVER_SIDE_SESSION_ID,
        result_complete=(
            proto_base.ExecutePlanResponse.ResultComplete(),
        )
    )

This is obviously not the way you’d want to implement this in a production system, but for the sake of the “Hello World” example, this works. If you were to build this yourself, you’d need to be able to parse the whole Spark query plan tree and map the operations to the system you’re implementing it for. 

Snowpark Connect for Apache Spark architecture

Now that we’ve explored how a “Hello World” example would look, let’s talk in more detail about the architecture of Snowpark Connect for Apache Spark. Figure 2 shows an overview of the architecture.

Figure 2: Snowpark was originally built on the same premise of client-server separation as Snowpark Connect. In this architectural view, you can see how paired with Spark Connect, we’re able to transparently bring the ease of use, performance benefits and reliability of the Snowflake platform to Spark workloads.
Figure 2: Snowpark was originally built on the same premise of client-server separation as Snowpark Connect. In this architectural view, you can see how paired with Spark Connect, we’re able to transparently bring the ease of use, performance benefits and reliability of the Snowflake platform to Spark workloads.

As we mentioned, Spark Connect enables us to implement a server inside Snowflake, such that the user can simply connect to the Snowflake Spark Connect endpoint from their open source Spark installation. How this is done in practice is covered in the getting started docs.

Once Snowflake receives the Spark query plan through Spark Connect, it goes through a series of transformations to create an optimal Snowpark query plan.

SparkSQL Parser

One of the most interesting challenges we had to solve was transpiling SparkSQL into Snowflake SQL. We’ll reserve the deep details of the SparkSQL parser for a future post. But in short, our implementation relies on the SparkSQL catalyst parser, and then converts catalyst plans to Spark Connect protobuf. This enabled us to ensure there is only one implementation for each operator necessary, but it also presented some new challenges.

SparkSQL is more expressive than the Spark DataFrame API, meaning that there are multiple SQL operators that are not present in the protobuf definition. Fortunately, Spark developers anticipated the need for extending the Spark Connect protocol, so we use Spark Connect’s extension protobuf for the Relation object for any operators that are in SparkSQL but not in the protobuf definition. Since our parsing logic is all handled server-side, we don’t even need to ship a new client to the user!

Spark query plan interpreter

This component comprised the majority of the engineering effort. There are several subtle differences between the Snowpark client and Apache Spark. The Snowpark client is consistent with Snowflake, so even simple defaults like name capitalization needed to be considered in the design.

Testing

One of the big initial questions was testing. We started by using the open source Apache Spark test suite, thinking that since the open source community uses these tests to check correctness, they would be a good start. Indeed they were very helpful, but there were a lot of other things we needed to consider, since the same assumptions of the open source developers would not hold with the Snowpark engine.

Currently, Snowpark Connect for Apache Spark passes >80% of open source SparkSQL tests (HiveSQL and ANSI SQL) and >85% of open source PySpark DataFrame, Column, Function and other utility tests.

In addition to the open source Spark tests, we built a test suite with a combination of hand-written tests (designed to test the edge cases in behavior mismatches between Spark and Snowpark Connect) and AI generated workloads (designed to mirror the types of operations that Spark users write on a daily basis). This testing strategy helped us catch a number of bugs and issues that we might not have been able to catch before Public Preview, particularly in cases where default Spark behavior is undocumented.

Snowpark execution and vectorized query engine

Once we built the Spark query plan interpreter, it was simply a matter of letting Snowpark execution handle the optimized query plan we generate via Snowflake’s vectorized query engine. The development of Snowpark Connect for Apache Spark inspired several updates and improvements throughout the stack. For example, improving support for seed-based random samples, performance improvements for complex query plans and more. This architecture effectively brings all of the benefits of Snowflake’s ecosystem directly to Apache Spark users and vice versa.

Conclusion

Snowpark Connect for Apache Spark is now in public preview, try it today! Snowflake is hiring for many roles in Product and Engineering. Explore open opportunities and apply today to help build exciting new features like this.

Share Article

Subscribe to our blog newsletter

Get the best, coolest and latest delivered to your inbox each week

Where Data Does More

  • 30-day free trial
  • No credit card required
  • Cancel anytime