山东大学软件工程应用与实践: Spark(十) 代码
2021SC@SDUSC
目录
TaskScheduler的启动
1.创建LocalActor
2.ExecutorSource的创建与注册
3.ExecutorActor的构建与注册
4.Spark自身ClassLoader的创建
TaskScheduler的启动
要想TaskScheduler发挥作用,必须要启动它,代码入如下
taskScheduler.start()
TaskScheduler在启动的时候,实际调用了backend的start方法,
override def start() { backend.start() }
1.创建LocalActor
创建LocalActor的过程主要是构建本地的Executor,代码如下
// code 3-37 private[spark] class LocalActor (scheduler: TaskSchedulerImpl, executorBackend: LocalBackend, private val totalCores: Int) extends Actor ith ActorLogReceive ith Logging { import context.dispatcher // to use Akka's scheduler.scheduleonce() private var freeCores = totalCores private val localExecutorid = SparkContext.DRIVER_IDENTIFIER private val localExecutorHostname = "localhost" val executor = ne Executor( localExecutorld, localExecutorHostname, scheduler.conf.getAll, totalCores, isLocal = true) override def receiveWithLogging = { case ReviveOffers => reviveOffers () case StatusUpdate(taskid, state, serializedData) => scheduler.statusUpdate(taskid, state, serializedData) if (TaskState.isFinished (state)) { freeCores += scheduler.CPUS_PER_TASK revi veOffers () } case KillTask(taskId, interruptThread) => executor.killTask(taskld, interruptThread) case SExecutor => executor.s() } }
Executor的构建, 见代码清单 3-37, 主要包括以下步骤
- 创建并注册ExecutorSource。
- 获取SparkEnv。如果是非local模式,Worker上的CoarseGrainedExecutorBackend向Driver上的CoarseGrainedExecutorBackend注册Executor时,则需要新建SparkEnv。可以修改属性spark.executor(默认为0,表示随机生成)来配置Executor中的ActorSystem的端口号。
- 创建并注册ExecutorActor。ExecutorActor负责接收发送给Executor的消息。
- urlClassLoader的创建。为什么需要创建这个ClassLoader?在非local模式中,Driver或者Worker上都会有多个Executor,每个Executor都设置自身的urlClassLoader,用于加载任务上传的jar包中的类,有效对任务的类加载环境进行隔离。
- 创建Executor执行Task的线程池。
- 启动Executor的心跳线程。此线程用于向Driver发送心跳。
,还包括Akka发送消息的帧大小(10485760字节)、结果总大小的字节限制(1073741824字节)、正在运行的task列表、设置serializer的默认ClassLoader为创建ClassLoader等。
//code 3-37 val executorSource = ne ExecutorSource(this, executorId) private val env = { if (!isLocal) { val port = conf.getInt("spark.executor.port", 0) val env = SparkEnv.createExecutorEnv( conf, executorId, executorHostname, port, numCores, isLocal, actorSystem) SparkEnv.set(_env) _env.metricsSystem.registerSource(executorSource) _env.blockManager.initialize(conf.getAppId) _env } else { SparkEnv.get } } private val executorActor = env.actorSystem.actorOf( Props(ne ExecutorActor(executorId)), "ExecutorActor") private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeed(urlClassLoader) env.serializer.setDefaultClassLoader(urlClassLoader) private val akkaframeSize = AkkaUtils.maxframeSizeBytes(conf) private val maxResultSize = Utils.getMaxResultSize(conf) val threadPool = Utils.neDaemonCachedThreadPool("Executor task launch orker") private val runningTasks = ne ConcurrentHashMap[Long, TaskRunner] startDriverHeartbeater()
2.ExecutorSource的创建与注册
ExecutorSource用于测量系统。通过metricRegistry的register方法注册计量,这些计量信息包括threadpool.activeTasks、threadpool.pleteTasks、threadpool.currentPool_size、threadpool.maxPool_size、filesystem.hdfs.rite_bytes、filesystem.hdfs.read_ops、filesystem.file.rite_bytes、filesystem.hdfs.largeRead_ops、filesystem.hdfs.rite_ops等,ExecutorSource的实现见代码3-38。
// code 3-38 private[spark] class ExecutorSource (val executor: Executor, executor Id: String) extends Source ( private def fileStats(scheme: String) : Option[FileSystern.Statistics] = FileSystem.getAllStatistics().filter (s => s.getScherne.equals (scheme)).headOption private def registerFileSystemStat [T] ( scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = { metricRegistry.register (MetricRegistry. name ("filesystem", scheme, name), ne Gauge[T] { override def getValue: T = fileStats(scheme).map(f).getOrElse(defaultValue) }} } override val metricRegistry = ne MetricRegistry() override val sourceName = "executor" metricRegistry.register(MetricRegistry.name("threadPool", "activeTasks"), ne Gauge[Int] { override def getValue: Int = executor.threadPool.getActiveCount() }) metricRegistry.register(MetricRegistry.name("threadpool", "pleteTasks"), ne Gauge[Long] { override def getValue: Long = executor.threadPool.getCompletedTaskCount() }) metricRegistry.register(MetricRegistry.name("threadpool", "currentPool_size"), ne Gauge[Int] { override def getValue:Int = executor.threadPool.getPoolSize() }) metricRegistry.register(MetricRegistry.name("threadpool", "maxPool_size"), ne Gauge[Int] { override def getValue: Int = executor.threadPool.getMaximumPoolSize() }) //Gauge for filr system stats of this executor for (scheme <- Array("hdfs", "file")) { registerFileSystemStat(scheme, "read_bytes", _.getBytesRead(), 0L) registerFileSystemStat(scheme, "rite_bytes", _.getBytesWritten(), 0L) registerFileSystemStat(scheme, "read_ops", _.getReadOps(), 0) registerFileSystemStat(scheme, "largeRead—ops", _.getLargeReadOps(), 0) registerFileSystemStat(scheme, "rite_ops", _.getWriteOps(), 0) } }
创建完ExecutorSource后,调用MetricsSystem的registerSource方法将ExecutorSource注册到MetricsSystem。registerSource方法使用MetricRegistry的register方法,将Source注册到MetricRegistry,见代码3-39。
// code 3-39 def registerSource(source: Source) { sources += source try { val regName = buildRegistryName(source) registry.register(regName, source.metricRegistry) } catch { case e: IllegalArgumentException => logInfo("Metric already registered", e) } }
3.ExecutorActor的构建与注册
ExecutorActor很简单,当接收到SparkUI发来的消息时,将所有线程的栈消息发送回去,代码如下
override def receiveWithLogging = { case TriggerThreadDump => sender ! Utils.getThreadDump() }
4.Spark自身ClassLoader的创建
获取要创建的ClassLoader的父加载器currentLoader,然后根据currentJars生成URL数组,spark.files.userClassPathFirst属性指定加载类时是否先从用户的classpath下载,创建ExecutorURLClassLoader或者ChildExecutorURLClassLoader,代码如下
// code 3-40 private def createClassLoader(): MutableURLClassLoader = { val currentLoader = Utils.getContextOrSparkClassLoader val urls = currentJars.keySet.map { uri => ne File(uri.split("/").last).toURI.toURL }.toArray val userClassPathFirst = conf.getBoolean("spark.files.userClassPathFirst", false) userClassPathFirst match{ case true => ne ChildExecutorURLClassLoader(urls, currentLoader) case false => ne ExecutorURLClassLoader(urls, currentLoader) } }
ExecutorURLClassLoader或者ChildExecutorURLClassLoader实际上都继承了URLClassLoader,代码如下
// code 3-41 private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader) extends MutableURLClassLoader { private object userClassLoader extends URLClassLoader(urls, null){ override def addURL(url: URL) { super.addURL(url) } override def findClass(name: String) : Class[_] = { super.findClass(name) } } private val parentClassLoader = ne ParentClassLoader(parent) override def findClass (name: String) : Class[_] = { try { userClassLoader.findClass(name) } catch { case e: ClassNotFoundExeception => { parentClassLoader.loadClass(name) } } } def addURL (url: URL) { userClassLoader.addURL(url) } def getURLs() = { userClassLoader.getURLs() } } private[spark] class ExecutortJRLClassLoader(urls: Array[URL], parent: ClassLoader) extends URLClassLoader(urls, parent) ith MutableURLClassLoader ( override def addURL (url: URL) { super. addURL (url) } }
空调维修
- 海信电视维修站 海信电视维修站点
- 格兰仕空调售后电话 格兰仕空调维修售后服务电
- 家电售后服务 家电售后服务流程
- 华扬太阳能维修 华扬太阳能维修收费标准表
- 三菱电机空调维修 三菱电机空调维修费用高吗
- 美的燃气灶维修 美的燃气灶维修收费标准明细
- 科龙空调售后服务 科龙空调售后服务网点
- 华帝热水器维修 华帝热水器维修常见故障
- 康泉热水器维修 康泉热水器维修故障
- 华凌冰箱维修电话 华凌冰箱维修点电话
- 海尔维修站 海尔维修站点地址在哪里
- 北京海信空调维修 北京海信空调售后服务
- 科龙空调维修 科龙空调维修故障
- 皇明太阳能售后 皇明太阳能售后维修点
- 海信冰箱售后服务 海信冰箱售后服务热线电话
- 海尔热水器服务热线