山东大学软件工程应用与实践: Spark(十) 代码

家电修理 2023-07-16 19:17www.caominkang.com电器维修

2021SC@SDUSC


目录

TaskScheduler的启动

1.创建LocalActor

2.ExecutorSource的创建与注册

3.ExecutorActor的构建与注册

4.Spark自身ClassLoader的创建


TaskScheduler的启动

要想TaskScheduler发挥作用,必须要启动它,代码入如下

taskScheduler.start()

TaskScheduler在启动的时候,实际调用了backend的start方法,

override def start() {
 backend.start()
}

1.创建LocalActor

创建LocalActor的过程主要是构建本地的Executor,代码如下

// code 3-37
private[spark] class LocalActor (scheduler: TaskSchedulerImpl, executorBackend:
 LocalBackend, 
 private val totalCores: Int) extends Actor ith ActorLogReceive ith Logging { 
 import context.dispatcher // to use Akka's scheduler.scheduleonce() 
 private var freeCores = totalCores 
 private val localExecutorid = SparkContext.DRIVER_IDENTIFIER 
 private val localExecutorHostname = "localhost"
  
 val executor = ne Executor(
  localExecutorld, localExecutorHostname, scheduler.conf.getAll,
   totalCores, isLocal = true) 

 override def receiveWithLogging = {
  case ReviveOffers => 
   reviveOffers () 

  case StatusUpdate(taskid, state, serializedData) =>
   scheduler.statusUpdate(taskid, state, serializedData) 
   if (TaskState.isFinished (state)) { 
    freeCores += scheduler.CPUS_PER_TASK
    revi veOffers () 
  }

  case KillTask(taskId, interruptThread) => 
   executor.killTask(taskld, interruptThread) 

  case SExecutor => 
   executor.s()
 } 
}

Executor的构建, 见代码清单 3-37, 主要包括以下步骤

  1. 创建并注册ExecutorSource。
  2. 获取SparkEnv。如果是非local模式,Worker上的CoarseGrainedExecutorBackend向Driver上的CoarseGrainedExecutorBackend注册Executor时,则需要新建SparkEnv。可以修改属性spark.executor(默认为0,表示随机生成)来配置Executor中的ActorSystem的端口号。
  3. 创建并注册ExecutorActor。ExecutorActor负责接收发送给Executor的消息。
  4. urlClassLoader的创建。为什么需要创建这个ClassLoader?在非local模式中,Driver或者Worker上都会有多个Executor,每个Executor都设置自身的urlClassLoader,用于加载任务上传的jar包中的类,有效对任务的类加载环境进行隔离。
  5. 创建Executor执行Task的线程池。
  6. 启动Executor的心跳线程。此线程用于向Driver发送心跳。

,还包括Akka发送消息的帧大小(10485760字节)、结果总大小的字节限制(1073741824字节)、正在运行的task列表、设置serializer的默认ClassLoader为创建ClassLoader等。

//code 3-37
  val executorSource = ne ExecutorSource(this, executorId)
private val env = {
  if (!isLocal) {
   val port = conf.getInt("spark.executor.port", 0)
   val env = SparkEnv.createExecutorEnv(
    conf, executorId, executorHostname, port, numCores, isLocal, actorSystem)
   SparkEnv.set(_env)
   _env.metricsSystem.registerSource(executorSource)
   _env.blockManager.initialize(conf.getAppId)
   _env
  } else {
   SparkEnv.get
  }
 }

 private val executorActor = env.actorSystem.actorOf(
  Props(ne ExecutorActor(executorId)), "ExecutorActor")

 private val urlClassLoader = createClassLoader()
 private val replClassLoader = addReplClassLoaderIfNeed(urlClassLoader)
 env.serializer.setDefaultClassLoader(urlClassLoader)
  

 private val akkaframeSize = AkkaUtils.maxframeSizeBytes(conf)
 private val maxResultSize = Utils.getMaxResultSize(conf)
 
 val threadPool = Utils.neDaemonCachedThreadPool("Executor task launch orker")
 private val runningTasks = ne ConcurrentHashMap[Long, TaskRunner]
 startDriverHeartbeater()
 
 

2.ExecutorSource的创建与注册

ExecutorSource用于测量系统。通过metricRegistry的register方法注册计量,这些计量信息包括threadpool.activeTasks、threadpool.pleteTasks、threadpool.currentPool_size、threadpool.maxPool_size、filesystem.hdfs.rite_bytes、filesystem.hdfs.read_ops、filesystem.file.rite_bytes、filesystem.hdfs.largeRead_ops、filesystem.hdfs.rite_ops等,ExecutorSource的实现见代码3-38。

// code 3-38
private[spark] class ExecutorSource (val executor: Executor, executor Id: String)
 extends Source ( 
 private def fileStats(scheme: String) : Option[FileSystern.Statistics] =
  FileSystem.getAllStatistics().filter (s => s.getScherne.equals    
   (scheme)).headOption

 private def registerFileSystemStat [T] (
   scheme: String, name: String, f: FileSystem.Statistics => T,
    defaultValue: T) = { 
  metricRegistry.register (MetricRegistry. name ("filesystem", scheme, name),
   ne Gauge[T] { 
   override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue)
  }}
 }
 override val metricRegistry = ne MetricRegistry()
 override val sourceName = "executor"

metricRegistry.register(MetricRegistry.name("threadPool", "activeTasks"),
 ne Gauge[Int] {
  override def getValue: Int = executor.threadPool.getActiveCount()
 })
 metricRegistry.register(MetricRegistry.name("threadpool", "pleteTasks"),  
  ne Gauge[Long] {
  override def getValue: Long = executor.threadPool.getCompletedTaskCount()
 })
 metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), 
  ne Gauge[Int] {
  override def getValue:Int = executor.threadPool.getPoolSize()
 })
 metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"),
  ne Gauge[Int] {
  override def getValue: Int = executor.threadPool.getMaximumPoolSize()
 })


 //Gauge for filr system stats of this executor
 for (scheme <- Array("hdfs", "file")) { 
  registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L)   
  registerFileSystemStat(scheme, "rite_bytes", _.getBytesWritten(), 0L) 
  registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0) 
  registerFileSystemStat(scheme, "largeRead—ops", _.getLargeReadOps(), 0) 
  registerFileSystemStat(scheme, "rite_ops", _.getWriteOps(), 0) 
 }
}

创建完ExecutorSource后,调用MetricsSystem的registerSource方法将ExecutorSource注册到MetricsSystem。registerSource方法使用MetricRegistry的register方法,将Source注册到MetricRegistry,见代码3-39。

// code 3-39
def registerSource(source: Source) {
 sources += source
 try {
  val regName = buildRegistryName(source)
  registry.register(regName, source.metricRegistry)
 } catch {
  case e: IllegalArgumentException => logInfo("Metric already registered", e)
 }
}

3.ExecutorActor的构建与注册

ExecutorActor很简单,当接收到SparkUI发来的消息时,将所有线程的栈消息发送回去,代码如下

override def receiveWithLogging = {
 case TriggerThreadDump =>
  sender ! Utils.getThreadDump()
}

4.Spark自身ClassLoader的创建

获取要创建的ClassLoader的父加载器currentLoader,然后根据currentJars生成URL数组,spark.files.userClassPathFirst属性指定加载类时是否先从用户的classpath下载,创建ExecutorURLClassLoader或者ChildExecutorURLClassLoader,代码如下

// code 3-40
private def createClassLoader(): MutableURLClassLoader = {
 val currentLoader = Utils.getContextOrSparkClassLoader

 val urls = currentJars.keySet.map { uri =>
  ne File(uri.split("/").last).toURI.toURL
 }.toArray
 val userClassPathFirst = conf.getBoolean("spark.files.userClassPathFirst", false)
 userClassPathFirst match{
  case true => ne ChildExecutorURLClassLoader(urls, currentLoader)
  case false => ne ExecutorURLClassLoader(urls, currentLoader)
 }
}

ExecutorURLClassLoader或者ChildExecutorURLClassLoader实际上都继承了URLClassLoader,代码如下

// code 3-41
private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
 extends MutableURLClassLoader {

 private object userClassLoader extends URLClassLoader(urls, null){
  override def addURL(url: URL) {
   super.addURL(url)
    }
  override def findClass(name: String) : Class[_] = {
   super.findClass(name)
  }
 }

 private val parentClassLoader = ne ParentClassLoader(parent)
 
 override def findClass (name: String) : Class[_] = {
  try {
   userClassLoader.findClass(name) 
  } catch { 
   case e: ClassNotFoundExeception => {
    parentClassLoader.loadClass(name)
   }
  }
 }

  def addURL (url: URL) {
   userClassLoader.addURL(url)
  }

  def getURLs() = {
   userClassLoader.getURLs()
  }
 }
 
 private[spark] class ExecutortJRLClassLoader(urls: Array[URL], parent: ClassLoader)
  extends URLClassLoader(urls, parent) ith MutableURLClassLoader ( 
  override def addURL (url: URL) { 
     super. addURL (url)
  }
 }

Copyright © 2016-2025 www.caominkang.com 曹敏电脑维修网 版权所有 Power by