Current location - Recipe Complete Network - Diet recipes - Detailed explanation of spark yarn scheduler
Detailed explanation of spark yarn scheduler
First, the choice of scheduler.

There are three schedulers to choose from in Yarn: FIFO scheduler, capacity scheduler and bazaar scheduler.

The FIFO scheduler arranges the applications in a queue according to the order of submission, which is a first-in first-out queue. When allocating resources, first allocate resources to the top application in the queue, then allocate them to the next application after meeting the requirements of the top application, and so on.

FIFO scheduler is the simplest and most understandable scheduler, which does not need any configuration, but it is not suitable for * * * shared cluster. Large applications may occupy all cluster resources, causing other applications to be blocked. In * * * shared cluster, it is more suitable to adopt capacity scheduler or fair scheduler, both of which allow large tasks and small tasks to obtain certain system resources while submitting.

The following "yarn scheduler comparison diagram" shows the differences between these schedulers. As can be seen from the figure, in the FIFO scheduler, small tasks will be blocked by large tasks.

For capacity scheduler, there are special queues to run small tasks, but setting queues for small tasks will occupy certain cluster resources in advance, resulting in the execution time of large tasks lagging behind that when FIFO scheduler is used.

In the fair scheduler, we don't need to occupy certain system resources in advance, and the fair scheduler will dynamically adjust the system resources for all running jobs. As shown below, when the first big job was submitted, only this job was running, and all the cluster resources had been obtained at this time; When the second small task is submitted, the fair scheduler will allocate half of the resources to this small task, so that the two tasks can enjoy the cluster resources fairly.

It should be noted that in the fair scheduler shown in the figure below, there will be a certain delay from the submission of the second task to the acquisition of resources, because it needs to wait for the first task to release the occupied container. Small tasks will also release their occupied resources after execution, and large tasks will get all system resources. The end result is that the fair scheduler can not only achieve high resource utilization, but also ensure the timely completion of small tasks.

Comparison diagram of yarn scheduling program:

Second, the configuration of capacity scheduler.

2. 1 Introduction to Container Dispatching

Capacity scheduler allows multiple organizations * * * to enjoy the whole cluster, and each organization can get part of the computing power of the cluster. By assigning special queues to each organization and then assigning specific cluster resources to each queue, the whole cluster can provide services to multiple organizations by setting up multiple queues. In addition, queues can be divided vertically, so that multiple members in an organization can enjoy queue resources. In the queue, resource scheduling is based on the first-in first-out (FIFO) policy.

As we already know from the above figure, a job may not use the resources of the whole queue. However, if there are multiple jobs running in this queue, if there are enough resources in this queue, they will be assigned to these jobs. What if the resources in this queue are not enough? In fact, the capacity scheduler may still allocate additional resources to the queue, which is the concept of "queue elasticity".

In normal operation, the capacity scheduler will not force the release of containers. When a queue is not enough, it can only get the container resources released by other queues. Of course, we can set a maximum resource usage for the queue to prevent this queue from taking up too many idle resources and making other queues unable to use these idle resources. This is where the "elastic queue" needs to be weighed.

2.2 Configuration of container dispatching

Suppose we have the following levels of queues:

root

-product

└ ── Dev

├-engineering company

-Science

The following is a simple configuration file of capacity scheduler, named capacity-scheduler.xml In this configuration, two subqueues, prod and dev, are defined under the root queue, accounting for 40% and 60% of the capacity respectively. It should be noted that the configuration of the queue is specified by the attribute yarn. scheduler. capacity ..., which represents the inheritance tree of the queue, such as root.prod queue, which generally refers to capacity and maximum capacity.

We can see that the dev queue is divided into two sub-queues with the same capacity, eng and science. The maximum-capacity property of dev is set to 75%, so even if the prod queue is completely idle, dev will not occupy all cluster resources, that is, the prod queue still has 25% available resources for emergencies. We notice that the maximum-capacity attribute is not set in the eng and science queues, which means that jobs in the eng or science queues may use all the resources of the entire dev queue (up to 75% of the cluster). Similarly, prod may occupy all the resources of the cluster because it does not set the maximum-capacity attribute.

Capacity container can not only configure the queue and its capacity, but also configure the maximum number of resources that a user or application can allocate, how many applications can run at the same time, ACL authentication of the queue, and so on.

2.3 Queue Settings

The setting of the queue depends on our specific application. For example, in MapReduce, we can specify the queue to use through the mapreduce.job.queuename property. If the queue does not exist, we will receive an error when submitting the task. If we don't define any queues, all applications will be put into the default queue.

Note: For Capacity scheduler, our queue name must be the last part of the queue tree. If we use the queue tree, it will not be recognized. For example, in the above configuration, we can use prod and eng as queue names, but it is invalid if we use root.dev.eng or dev.eng

Thirdly, the configuration of fair scheduler.

3. 1 fair scheduling

The design goal of fair scheduler is to allocate fair resources for all applications (the definition of fairness can be set by parameters). The above "yarn scheduler comparison diagram" shows the fair scheduling of two applications in a queue; Of course, fair scheduling can also work between multiple queues. For example, suppose there are two users A and B, and they both have a queue. When A starts a job and B has no task, A will get all the cluster resources; When B starts a job, A's job will continue to run, but after a period of time, the two tasks will each get half of the cluster resources. If B starts the second job while other jobs are still running, it will share the resources of this queue with B's first job * * *, that is, B's two jobs will use a quarter of the cluster resources, while A's job will still use half of the cluster resources. Therefore, resources will be shared equally between the two users. The process is shown in the following figure:

3.2 Enable fair scheduler

The use of the scheduler is configured through the yarn.resourcemanager.scheduler.class parameter in the yarn-site.xml configuration file, and the capacity scheduler is adopted by default. If we want to use FairScheduler, we need to configure the fully qualified name of FairScheduler class on this parameter: org. Apache . Hadoop . yarn . server . resource manager . scheduler . fair . fair scheduler。

3.3 Configuration of queues

The configuration file of the fair scheduler is located in the fair-scheduler.xml file under the classpath, and can be modified through the yarn.scheduler.fair.allocation.file property. Without this configuration file, the allocation strategy adopted by the Fair scheduler is similar to that described in section 3. 1: the scheduler will automatically create a queue for users when they submit their first application, and the name of the queue is the user name, and all applications will be allocated to the corresponding user queue.

We can configure each queue in the configuration file, and we can configure queues hierarchically like a capacity scheduler. For example, refer to capacity-scheduler.xml to configure a fair scheduler:

The hierarchy of queues is realized by nested elements. All queues are child queues of the root queue, even if we don't match elements. In this configuration, we divide the dev queue into two queues: eng and science.

The queue in the fair scheduler has a weight attribute (this weight is the definition of fairness), which is the basis of fair scheduling. In this example, when the scheduler allocates the resources of cluster 40:60 to prod and dev, it is considered fair, and the eng and science queues have no defined weights, so they will be allocated equally. The weight here is not a percentage. We replace the above 40 and 60 with 2 and 3 respectively, and the effect is the same. Please note that queues automatically created by users without profiles still have weight, and the weight value is 1.

There can still be different scheduling policies in each queue. The default scheduling policy of queues can be configured through top-level elements. If not configured, fair scheduling will be adopted by default.

Although it is a fair scheduler, it still supports queue-level FIFO scheduling. The scheduling policy of each queue can be overridden by its internal elements. In the above example, the prod queue is designated to use FIFO for scheduling, so the tasks submitted to the prod queue can be executed in the order of FIFO rules. It should be noted that the scheduling between prod and dev is still fair, and so is the scheduling between eng and science.

Although it is not shown in the above configuration, each queue can still be configured with the maximum and minimum resource occupancy and the maximum number of applications that can run.

3.4 Queue Settings

The fair scheduler uses a rule-based system to determine which queue the application should be placed in. In the above example, the element defines a list of rules, and each rule will be tried one by one until the match is successful. For example, if the first rule in the above example is specified, the application will be placed in the queue it specifies. If the application does not specify a queue name or the queue name does not exist, it means that the rules do not match, and then try the next rule. The primaryGroup rule will attempt to put the application in a queue named after the user's Unix group name. If there is no queue, try the next rule instead of creating a queue. When all the previous rules are not met, the default rule is triggered and the application is placed in the dev.eng queue.

Of course, we cannot configure the queuePlacementPolicy rule, and the scheduler adopts the following rules by default:

The above rules can be summarized in one sentence. Unless the queue definition is accurate, the queue will be created with the user name as the queue name.

There is also a simple configuration strategy, which puts all applications in the same queue (default), so that all applications can enjoy the cluster equally, not among users. The configuration is defined as follows:

To achieve the above functions, we can also set yarns. Scheduler. fair.user-as-default-queue = false without using the configuration file, so that the application will be placed in the default queue instead of the user name queue. In addition, we can set the yarn. Scheduler.fair.allow-undefined-pools = false, so users cannot create queues.

3.5 preemption

When a job is submitted to an empty queue in a busy cluster, it will not be executed immediately, but will be blocked until the running job releases system resources. In order to make the execution time of job submission more predictable (waiting timeout can be set), the fair scheduler supports preemption.

Preemption allows the scheduler to kill containers that occupy more than its share of resource queues, and these container resources can be allocated to queues that should enjoy these shared resources. It should be noted that preemption will reduce the execution efficiency of the cluster, because the terminated container needs to be re-executed.

You can enable preemption by setting the global parameter yarn. Dispatcher. Fair. Preemption = true. In addition, there are two parameters that control the expiration time of preemption (these two parameters are not configured by default, and at least one parameter needs to be configured to allow preemption of containers):

-Minimum shared preemption timeout

-Fair share preemption timeout

If the queue does not get the minimum resource guarantee within the time specified by the minimum shared preemption timeout, the scheduler will preempt the container. We can configure this timeout for all queues through the top-level elements in the configuration file; We can also configure the element in the element to specify a timeout for the queue.

Similarly, if the queue does not get half of the equal resources within the time specified by the fair share preemption timeout (this ratio can be configured), the scheduler will preempt the container. This timeout can configure the timeout of all queues and one queue through top-level elements and element-level elements respectively. The ratio mentioned above can be configured by (configuring all queues) and (configuring one queue), and the default is 0.5.