How we use a shared Spark server to make our Spark infrastructure extra environment friendly
Spark Join is a comparatively new part within the Spark ecosystem that permits skinny purchasers to run Spark purposes on a distant Spark cluster. This know-how can supply some advantages to Spark purposes that use the DataFrame API. Spark has lengthy allowed to run SQL queries on a distant Thrift JDBC server. Nonetheless, this capacity to remotely run consumer purposes written in any supported language (Scala, Python) appeared solely in Spark 3.4.
On this article, I’ll share our expertise utilizing Spark Join (model 3.5). I’ll speak about the advantages we gained, technical particulars associated to operating Spark consumer purposes, and a few tips about tips on how to make your Spark Join setup extra environment friendly and secure.
Spark is among the key parts of the analytics platform at Joom. We’ve got numerous inner customers and over 1000 customized Spark purposes. These purposes run at totally different occasions of day, have totally different complexity, and require very totally different quantities of computing sources (starting from a number of cores for a few minutes to over 250 cores for a number of days). Beforehand, all of them had been at all times executed as separate Spark purposes (with their very own driver and executors), which, within the case of small and medium-sized purposes (we traditionally have many such purposes), led to noticeable overhead. With the introduction of Spark Join, it’s now doable to arrange a shared Spark Join server and run many Spark consumer purposes on it. Technically, the Spark Join server is a Spark utility with an embedded Spark Join endpoint.
Listed below are the advantages we had been capable of get from this:
Useful resource savings- When operating by way of Spark Join, consumer purposes don’t require their very own Spark driver (which generally makes use of over 1.5 GB of reminiscence). As a substitute, they use a skinny consumer with a typical reminiscence consumption of 200 MB.- Executor utilization improves since any executor can run the duties of a number of consumer purposes. For instance, suppose some Spark utility, in some unspecified time in the future in its execution, begins utilizing considerably fewer cores and reminiscence than initially requested. There are a lot of explanation why this may occur. Then, within the case of a separate Spark utility, at present unused sources are sometimes wasted since dynamic allocation typically doesn’t present environment friendly scale-down. Nonetheless, with the Spark Join server, the freed-up cores and reminiscence can instantly be used to run duties of different consumer purposes.Decreased startup wait time- For varied causes, we have now to restrict the variety of concurrently operating separate Spark purposes, they usually could wait within the queue for fairly a very long time if all slots are at present occupied. It could possibly negatively have an effect on knowledge readiness time and consumer expertise. Within the case of the Spark Join server, we have now thus far been capable of keep away from such limitations, and all Spark Join consumer purposes begin operating instantly after launch.- For ad-hoc executions, it’s fascinating to attenuate the time to get outcomes as a lot as doable and keep away from maintaining folks ready. Within the case of separate Spark purposes, launching a consumer utility typically requires provisioning further EC2 nodes for its driver and executors, in addition to initializing the motive force and executors. All of this collectively can take greater than 4 minutes. Within the case of the Spark Join server, no less than its driver is at all times up and able to settle for requests, so it’s only a matter of ready for added executors, and infrequently executors are already out there. This may occasionally considerably scale back the wait time for ad-hoc purposes to be prepared.
Our constraints
In the meanwhile, we don’t run long-running heavy purposes on Spark Join for the next causes:
They might trigger failure or unstable conduct of the Spark Join server (e.g., by overflowing disks on executor nodes). It could possibly result in large-scale issues for the whole platform.They typically require distinctive reminiscence settings and use particular optimization strategies (e.g., customized extraStrategies).We at present have an issue with giving the Spark Join server lots of executors to deal with a really giant simultaneous load (that is associated to the conduct of Spark Activity Scheduler and is past the scope of this text).
Due to this fact, heavy purposes nonetheless run as separate Spark purposes.
We use Spark on Kubernetes/EKS and Airflow. Some code examples will likely be particular to this surroundings.
We’ve got too many various, consistently altering Spark purposes, and it will take an excessive amount of time to manually decide for every one whether or not it ought to run on Spark Join in response to our standards or not. Moreover, the checklist of purposes operating on Spark Join must be up to date recurrently. For instance, suppose in the present day, some utility is mild sufficient, so we have now determined to run it on Spark Join. However tomorrow, its builders could add a number of giant joins, making it fairly heavy. Then, it will likely be preferable to run it as a separate Spark utility. The reverse scenario can also be doable.
Finally, we created a service to routinely decide tips on how to launch every particular consumer utility. This service analyzes the historical past of earlier runs for every utility, evaluating such metrics as Whole Activity Time, Shuffle Write, Disk Spill, and others (this knowledge is collected utilizing SparkListener). Customized parameters set for the purposes by builders (e.g., reminiscence settings of drivers and executors) are additionally thought of. Primarily based on this knowledge, the service routinely determines for every utility whether or not it must be run this time on the Spark Join server or as a separate Spark utility. Thus, all our purposes must be able to run in both of the 2 methods.
In our surroundings, every consumer utility is constructed independently of the others and has its personal JAR file containing the appliance code, in addition to particular dependencies (for instance, ML purposes typically use third-party libraries like CatBoost and so forth). The issue is that the SparkSession API for Spark Join is considerably totally different from the SparkSession API used for separate Spark purposes (Spark Join purchasers use the spark-connect-client-jvm artifact). Due to this fact, we’re alleged to know on the construct time of every consumer utility whether or not it’s going to run by way of Spark Join or not. However we have no idea that. The next describes our strategy to launching consumer purposes, which eliminates the necessity to construct and handle two variations of JAR artifact for a similar utility.
For every Spark consumer utility, we construct just one JAR file containing the appliance code and particular dependencies. This JAR is used each when operating on Spark Join and when operating as a separate Spark utility. Due to this fact, these consumer JARs don’t include particular Spark dependencies. The suitable Spark dependencies (spark-core/spark-sql or spark-connect-client-jvm) will likely be offered later within the Java classpath, relying on the run mode. In any case, all consumer purposes use the identical Scala code to initialize SparkSession, which operates relying on the run mode. All consumer utility JARs are constructed for the common Spark API. So, within the a part of the code supposed for Spark Join purchasers, the SparkSession strategies particular to the Spark Join API (distant, addArtifact) are known as by way of reflection:
val sparkConnectUri: Choice[String] = Choice(System.getenv(“SPARK_CONNECT_URI”))
val isSparkConnectMode: Boolean = sparkConnectUri.isDefined
def createSparkSession(): SparkSession = {if (isSparkConnectMode) {createRemoteSparkSession()} else {SparkSession.builder// No matter you have to do to configure SparkSession for a separate // Spark utility..getOrCreate}}
personal def createRemoteSparkSession(): SparkSession = {val uri = sparkConnectUri.getOrElse(throw new Exception(“Required surroundings variable ‘SPARK_CONNECT_URI’ just isn’t set.”))
val builder = SparkSession.builder// Reflection is used right here as a result of the common SparkSession API doesn’t // include these strategies. They’re solely out there within the SparkSession API // model for Spark Join.classOf[SparkSession.Builder].getDeclaredMethod(“distant”, classOf[String]).invoke(builder, uri)
// A set of identifiers for this utility (for use later).val scAppId = s”spark-connect-${UUID.randomUUID()}”val airflowTaskId = Choice(System.getenv(“AIRFLOW_TASK_ID”)).getOrElse(“unknown_airflow_task_id”)val session = builder.config(“spark.joom.scAppId”, scAppId).config(“spark.joom.airflowTaskId”, airflowTaskId).getOrCreate()
// If the consumer utility makes use of your Scala code (e.g., customized UDFs), // then you need to add the jar artifact containing this code in order that it // can be utilized on the Spark Join server facet.val addArtifact = Choice(System.getenv(“ADD_ARTIFACT_TO_SC_SESSION”)).forall(_.toBoolean)
if (addArtifact) {val mainApplicationFilePath = System.getenv(“SPARK_CONNECT_MAIN_APPLICATION_FILE_PATH”)classOf[SparkSession].getDeclaredMethod(“addArtifact”, classOf[String]).invoke(session, mainApplicationFilePath)}
Runtime.getRuntime.addShutdownHook(new Thread() {override def run(): Unit = {session.shut()}})
session}
Within the case of Spark Join mode, this consumer code may be run as an everyday Java utility anyplace. Since we use Kubernetes, this runs in a Docker container. All dependencies particular to Spark Join are packed right into a Docker picture used to run consumer purposes (a minimal instance of this picture may be discovered right here). The picture comprises not solely the spark-connect-client-jvm artifact but additionally different widespread dependencies utilized by nearly all consumer purposes (e.g., hadoop-aws since we nearly at all times interact with S3 storage on the consumer facet).
FROM openjdk:11-jre-slim
WORKDIR /app
# Right here, we copy the widespread artifacts required for any of our Spark Join # purchasers (primarily spark-connect-client-jvm, in addition to spark-hive, # hadoop-aws, scala-library, and many others.).COPY construct/libs/* /app/
COPY src/most important/docker/entrypoint.sh /app/RUN chmod +x ./entrypoint.shENTRYPOINT [“./entrypoint.sh”]
This widespread Docker picture is used to run all our consumer purposes in the case of operating them by way of Spark Join. On the identical time, it doesn’t include consumer JARs with the code of explicit purposes and their dependencies as a result of there are a lot of such purposes which might be consistently up to date and will rely upon any third-party libraries. As a substitute, when a specific consumer utility is launched, the situation of its JAR file is handed utilizing an surroundings variable, and that JAR is downloaded throughout initialization in entrypoint.sh:
#!/bin/bashset -eo pipefail
# This variable may even be used within the SparkSession builder inside # the appliance code.export SPARK_CONNECT_MAIN_APPLICATION_FILE_PATH=”/tmp/$(uuidgen).jar”
# Obtain the JAR with the code and particular dependencies of the consumer # utility to be run. All such JAR recordsdata are saved in S3, and when # making a consumer Pod, the trail to the required JAR is handed to it # by way of surroundings variables.java -cp “/app/*” com.joom.analytics.sc.consumer.S3Downloader ${MAIN_APPLICATION_FILE_S3_PATH} ${SPARK_CONNECT_MAIN_APPLICATION_FILE_PATH}
# Launch the consumer utility. Any MAIN_CLASS initializes a SparkSession # at the start of its execution utilizing the code offered above.java -cp ${SPARK_CONNECT_MAIN_APPLICATION_FILE_PATH}:”/app/*” ${MAIN_CLASS} “$@”
Lastly, when it comes time to launch the appliance, our customized SparkAirflowOperator routinely determines the execution mode (Spark Join or separate) primarily based on the statistics of earlier runs of this utility.
Within the case of Spark Join, we use KubernetesPodOperator to launch the consumer Pod of the appliance. KubernetesPodOperator takes as parameters the beforehand described Docker picture, in addition to the surroundings variables (MAIN_CLASS, JAR_PATH and others), which will likely be out there to be used inside entrypoint.sh and the appliance code. There is no such thing as a must allocate many sources to the consumer Pod (for instance, its typical consumption in our surroundings: reminiscence — 200 MB, vCPU — 0.15).Within the case of a separate Spark utility, we use our customized AirflowOperator, which runs Spark purposes utilizing spark-on-k8s-operator and the official Spark Docker picture. Let’s skip the small print about our Spark AirflowOperator for now, as it’s a giant subject deserving a separate article.
Not all current Spark purposes may be efficiently executed on Spark Join since its SparkSession API is totally different from the SparkSession API used for separate Spark purposes. For instance, in case your code makes use of sparkSession.sparkContext or sparkSession.sessionState, it’s going to fail within the Spark Join consumer as a result of the Spark Join model of SparkSession doesn’t have these properties.
In our case, the most typical reason for issues was utilizing sparkSession.sessionState.catalog and sparkSession.sparkContext.hadoopConfiguration. In some circumstances, sparkSession.sessionState.catalog may be changed with sparkSession.catalog, however not at all times. sparkSession.sparkContext.hadoopConfiguration could also be wanted if the code executed on the consumer facet comprises operations in your knowledge storage, similar to this:
def delete(path: Path, recursive: Boolean = true)(implicit hadoopConfig: Configuration): Boolean = {val fs = path.getFileSystem(hadoopConfig)fs.delete(path, recursive)}
Thankfully, it’s doable to create a standalone SessionCatalog to be used throughout the Spark Join consumer. On this case, the category path of the Spark Join consumer should additionally embody org.apache.spark:spark-hive_2.12, in addition to libraries for interacting along with your storage (since we use S3, so in our case, it’s org.apache.hadoop:hadoop-aws).
import org.apache.spark.SparkConfimport org.apache.hadoop.conf.Configurationimport org.apache.spark.sql.hive.StandaloneHiveExternalCatalogimport org.apache.spark.sql.catalyst.catalog.{ExternalCatalogWithListener, SessionCatalog}
// That is simply an instance of what the required properties would possibly appear like. // All of them ought to already be set for current Spark purposes in a single // means or one other, and their full checklist may be discovered within the UI of any// operating separate Spark utility on the Surroundings tab.val sessionCatalogConfig = Map(“spark.hadoop.hive.metastore.uris” -> “thrift://metastore.spark:9083″,”spark.sql.catalogImplementation” -> “hive”,”spark.sql.catalog.spark_catalog” -> “org.apache.spark.sql.delta.catalog.DeltaCatalog”,)
val hadoopConfig = Map(“hive.metastore.uris” -> “thrift://metastore.spark:9083″,”fs.s3.impl” -> “org.apache.hadoop.fs.s3a.S3AFileSystem”,”fs.s3a.aws.credentials.supplier” -> “com.amazonaws.auth.DefaultAWSCredentialsProviderChain”,”fs.s3a.endpoint” -> “s3.amazonaws.com”,// and others…)
def createStandaloneSessionCatalog(): (SessionCatalog, Configuration) = {val sparkConf = new SparkConf().setAll(sessionCatalogConfig)val hadoopConfiguration = new Configuration()hadoopConfig.foreach { case (key, worth) => hadoopConfiguration.set(key, worth) }
val externalCatalog = new StandaloneHiveExternalCatalog(sparkConf, hadoopConfiguration)val sessionCatalog = new SessionCatalog(new ExternalCatalogWithListener(externalCatalog))(sessionCatalog, hadoopConfiguration)}
You additionally must create a wrapper for HiveExternalCatalog accessible in your code (as a result of the HiveExternalCatalog class is personal to the org.apache.spark package deal):
package deal org.apache.spark.sql.hive
import org.apache.hadoop.conf.Configurationimport org.apache.spark.SparkConf
class StandaloneHiveExternalCatalog(conf: SparkConf, hadoopConf: Configuration) extends HiveExternalCatalog(conf, hadoopConf)
Moreover, it’s typically doable to exchange code that doesn’t work on Spark Join with an alternate, for instance:
sparkSession.createDataFrame(sparkSession.sparkContext.parallelize(knowledge), schema) ==> sparkSession.createDataFrame(knowledge.toList.asJava, schema)sparkSession.sparkContext.getConf.get(“some_property”) ==> sparkSession.conf.get(“some_property”)
Fallback to a separate Spark utility
Sadly, it’s not at all times simple to repair a specific Spark utility to make it work as a Spark Join consumer. For instance, third-party Spark parts used within the challenge pose a major danger, as they’re typically written with out contemplating compatibility with Spark Join. Since, in our surroundings, any Spark utility may be routinely launched on Spark Join, we discovered it affordable to implement a fallback to a separate Spark utility in case of failure. Simplified, the logic is as follows:
If some utility fails on Spark Join, we instantly attempt to rerun it as a separate Spark utility. On the identical time, we increment the counter of failures that occurred throughout execution on Spark Join (every consumer utility has its personal counter).The following time this utility is launched, we test the failure counter of this utility:- If there are fewer than 3 failures, we assume that the final time, the appliance could have failed not due to incompatibility with Spark Join however because of every other doable short-term causes. So, we attempt to run it on Spark Join once more. If it completes efficiently this time, the failure counter of this consumer utility is reset to zero.- If there are already 3 failures, we assume that the appliance can not work on Spark Join and cease making an attempt to run it there for now. Additional, it will likely be launched solely as a separate Spark utility.If the appliance has 3 failures on Spark Join, however the final one was greater than 2 months in the past, we attempt to run it on Spark Join once more (in case one thing has modified in it throughout that point, making it suitable with Spark Join). If it succeeds this time, we reset the failure counter to zero once more. If unsuccessful once more, the subsequent try will likely be in one other 2 months.
This strategy is considerably less complicated than sustaining code that identifies the explanations for failures from logs, and it really works nicely typically. Makes an attempt to run incompatible purposes on Spark Join often should not have any important damaging affect as a result of, within the overwhelming majority of circumstances, if an utility is incompatible with Spark Join, it fails instantly after launch with out losing time and sources. Nonetheless, you will need to point out that every one our purposes are idempotent.
As I already talked about, we gather Spark statistics for every Spark utility (most of our platform optimizations and alerts rely upon it). That is simple when the appliance runs as a separate Spark utility. Within the case of Spark Join, the levels and duties of every consumer utility must be separated from the levels and duties of all different consumer purposes that run concurrently throughout the shared Spark Join server.
You possibly can cross any identifiers to the Spark Join server by setting customized properties for the consumer SparkSession:
val session = builder.config(“spark.joom.scAppId”, scAppId).config(“spark.joom.airflowTaskId”, airflowTaskId).getOrCreate()
Then, within the SparkListener on the Spark Join server facet, you’ll be able to retrieve all of the handed data and affiliate every stage/job with the actual consumer utility.
class StatsReportingSparkListener extends SparkListener {
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {val stageId = stageSubmitted.stageInfo.stageIdval stageAttemptNumber = stageSubmitted.stageInfo.attemptNumber()val scAppId = stageSubmitted.properties.getProperty(“spark.joom.scAppId”)// …}}
Right here, you could find the code for the StatsReportingSparkListener we use to gather statistics. You may additionally have an interest on this free software for locating efficiency points in your Spark purposes.
The Spark Join server is a completely operating Spark utility the place numerous purchasers can run their Jobs. Due to this fact, it may be worthwhile to customise its properties, which might make it extra dependable and stop waste of sources. Listed below are some settings that turned out to be helpful in our case:
// Utilizing dynamicAllocation is vital for the Spark Join server // as a result of the workload may be very erratically distributed over time.spark.dynamicAllocation.enabled: true // default: false
// This pair of parameters is liable for the well timed elimination of idle // executors:spark.dynamicAllocation.cachedExecutorIdleTimeout: 5m // default: infinityspark.dynamicAllocation.shuffleTracking.timeout: 5m // default: infinity
// To create new executors solely when the prevailing ones can not deal with // the obtained duties for a major period of time. This enables you // to save lots of sources when a small variety of duties arrive in some unspecified time in the future // in time, which don’t require many executors for well timed processing. // With elevated schedulerBacklogTimeout, pointless executors don’t // have the chance to seem by the point all incoming duties are // accomplished. The time to finish the duties will increase barely with this, // however typically, this improve just isn’t important.spark.dynamicAllocation.schedulerBacklogTimeout: 30s // default: 1s
// If, for some motive, you have to cease the execution of a consumer // utility (and liberate sources), you’ll be able to forcibly terminate the consumer. // At present, even explicitly closing the consumer SparkSession doesn’t // instantly finish the execution of its corresponding Jobs on the server. // They’ll proceed to run for a period equal to ‘detachedTimeout’. // Due to this fact, it could be affordable to scale back it.spark.join.execute.supervisor.detachedTimeout: 2m // default: 5m
// We’ve got encountered a scenario when killed duties could cling for // an unpredictable period of time, resulting in unhealthy penalties for his or her // executors. On this case, it’s higher to take away the executor on which // this drawback occurred.spark.job.reaper.enabled: true // default: falsespark.job.reaper.killTimeout: 300s // default: -1
// The Spark Join server can run for an prolonged time period. Throughout // this time, executors could fail, together with for causes past our management // (e.g., AWS Spot interruptions). This selection is required to forestall // the whole server from failing in such circumstances.spark.executor.maxNumFailures: 1000
// In our expertise, BroadcastJoin can result in very critical efficiency // points in some circumstances. So, we determined to disable broadcasting. // Disabling this selection often doesn’t lead to a noticeable efficiency // degradation for our typical purposes anyway.spark.sql.autoBroadcastJoinThreshold: -1 // default: 10MB
// For a lot of of our consumer purposes, we have now so as to add an artifact to // the consumer session (methodology sparkSession.addArtifact()). // Utilizing ‘useFetchCache=true’ ends in double area consumption for // the appliance JAR recordsdata on executors’ disks, as they’re additionally duplicated // in an area cache folder. Generally, this even causes disk overflow with // subsequent issues for the executor.spark.recordsdata.useFetchCache: false // default: true
// To make sure truthful useful resource allocation when a number of purposes are // operating concurrently.spark.scheduler.mode: FAIR // default: FIFO
For instance, after we adjusted the idle timeout properties, the useful resource utilization modified as follows:
Preventive restart
In our surroundings, the Spark Join server (model 3.5) could grow to be unstable after a number of days of steady operation. Most frequently, we face randomly hanging consumer utility jobs for an infinite period of time, however there could also be different issues as nicely. Additionally, over time, the chance of a random failure of the whole Spark Join server will increase dramatically, and this may occur on the fallacious second.
As this part evolves, it’s going to possible grow to be extra secure (or we’ll discover out that we have now finished one thing fallacious in our Spark Join setup). However at present, the only resolution has turned out to be a each day preventive restart of the Spark Join server at an acceptable second (i.e., when no consumer purposes are operating on it). An instance of what the restart code would possibly appear like may be discovered right here.
On this article, I described our expertise utilizing Spark Connect with run numerous various Spark purposes.
To summarize the above:
This part might help save sources and scale back the wait time for the execution of Spark consumer purposes.It’s higher to watch out about which purposes must be run on the shared Spark Join server, as resource-intensive purposes could trigger issues for the whole system.You possibly can create an infrastructure for launching consumer purposes in order that the choice on tips on how to run any utility (both as a separate Spark utility or as a Spark Join consumer) may be made routinely in the meanwhile of launch.You will need to notice that not all purposes will be capable of run on Spark Join, however the variety of such circumstances may be considerably diminished. If there’s a chance of operating purposes that haven’t been examined for compatibility with the Spark Join model of SparkSession API, it’s value implementing a fallback to separate Spark purposes.It’s value taking note of the Spark properties that may enhance useful resource utilization and improve the general stability of the Spark Join server. It could even be affordable to arrange a periodic preventive restart of the Spark Join server to scale back the chance of unintended failure and undesirable conduct.
General, we have now had a optimistic expertise utilizing Spark Join in our firm. We’ll proceed to look at the event of this know-how with nice curiosity, and there’s a plan to broaden its use.