...

Source file src/scheduler/scheduler_service/utils.go

Documentation: scheduler/scheduler_service

     1  /*
     2   * P2PFaaS - A framework for FaaS Load Balancing
     3   * Copyright (c) 2019 - 2022. Gabriele Proietti Mattia <pm.gabriele@outlook.com>
     4   *
     5   * This program is free software: you can redistribute it and/or modify
     6   * it under the terms of the GNU General Public License as published by
     7   * the Free Software Foundation, either version 3 of the License, or
     8   * (at your option) any later version.
     9   *
    10   * This program is distributed in the hope that it will be useful,
    11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
    12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13   * GNU General Public License for more details.
    14   *
    15   * You should have received a copy of the GNU General Public License
    16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
    17   */
    18  
    19  package scheduler_service
    20  
    21  import (
    22  	"scheduler/log"
    23  	"scheduler/service_discovery"
    24  	"scheduler/utils"
    25  	"sync"
    26  	"time"
    27  )
    28  
    29  // GetLeastLoadedMachineOfNRandom retrieves the least loaded machine from an array of ips, if all machines are full loaded,
    30  // the least queue is returned, and if there is no less loaded queue than us, an error is returned. This function returns
    31  // (ip, mean_probing_time, errors)
    32  func GetLeastLoadedMachineOfNRandom(n uint, currentLoad uint, checkQueues bool, cached bool) (string, float64, error) {
    33  	startProbingTime := time.Now()
    34  
    35  	// get n random machines from service_discovery
    36  	machines, err := service_discovery.GetNRandomMachines(n, cached)
    37  	if err != nil {
    38  		log.Log.Errorf("Cannot get random machines from service_discovery service: %s", err)
    39  		return "", 0.0, err
    40  	}
    41  
    42  	log.Log.Debugf("len(machines)=%d", len(machines))
    43  	loads := make([]uint, n) // list of loads
    44  	// queues := make([]float64, n) // percentage of queue fill
    45  	probeErr := make([]bool, n) // list of probe errors
    46  
    47  	wg := sync.WaitGroup{}
    48  	// get and compute the load of all the available machines in parallel
    49  	for i, ip := range machines {
    50  		wg.Add(1)
    51  
    52  		ip := ip
    53  		i := i
    54  		go func() {
    55  			machineLoad, _, err := GetLoad(ip)
    56  			if err != nil {
    57  				log.Log.Errorf("Cannot get load from machine %s", ip)
    58  				probeErr[i] = true
    59  				wg.Done()
    60  				return
    61  			}
    62  
    63  			// load := machineLoad.FunctionsRunningMax - machineLoad.FunctionsRunning
    64  			/*
    65  				freeQueue := float64(machineLoad.QueueFill) / float64(machineLoad.QueueLengthMax)
    66  				if freeQueue == math.NaN() {
    67  					log.Log.Debugf("Queue fill value is NaN from machine %s", ip)
    68  					probeErr[i] = true
    69  					wg.Done()
    70  					return
    71  				}
    72  			*/
    73  
    74  			loads[i] = uint(machineLoad)
    75  			// queues[i] = 0
    76  			probeErr[i] = false
    77  			wg.Done()
    78  		}()
    79  
    80  	}
    81  	wg.Wait()
    82  
    83  	log.Log.Debugf("loads=%s", loads)
    84  	log.Log.Debugf("probeErrs=%s", probeErr)
    85  
    86  	probingTime := time.Since(startProbingTime).Seconds()
    87  
    88  	// Check if we have enough correct loads
    89  	probeErrors := 0
    90  	for i := 0; i < int(n); i++ {
    91  		if probeErr[i] {
    92  			probeErrors += 1
    93  		}
    94  	}
    95  	if probeErrors == int(n) {
    96  		return "", probingTime, NoLessLoadedMachine{"all probe errors"}
    97  	}
    98  
    99  	// pick the less loaded
   100  	minLoad, _ := utils.MinOfArrayUint(loads)
   101  	// if no other machine has free slots, see which queue is less loaded
   102  	if minLoad >= currentLoad {
   103  		return "", probingTime, NoLessLoadedMachine{"minLoad >= currentLoad"}
   104  		/*
   105  			==> Queues are no more supported! ==>
   106  
   107  			// if we can lose jobs we have not to check queues
   108  			if !checkQueues {
   109  				return "", probingTime, NoLessLoadedMachine{}
   110  			}
   111  			// find the least loaded queue
   112  			leastLoadedQueueValue, leastLoadedQueueIndex := utils.MaxOfArrayFloat(queues)
   113  			ourFreeQueue := float64(queue.GetQueueFill()) / float64(config.Configuration.GetQueueLengthMax())
   114  			if leastLoadedQueueValue < ourFreeQueue {
   115  				return machines[leastLoadedQueueIndex], probingTime, nil
   116  			} else {
   117  				return "", probingTime, NoLessLoadedMachine{}
   118  			}
   119  		*/
   120  	} else {
   121  		// pick one random machine among the less loaded than us
   122  		valuableMachinesIds := utils.LoadsBelowSpecificLoad(loads, currentLoad)
   123  		return machines[valuableMachinesIds[utils.GetRandomInteger(len(valuableMachinesIds))]], probingTime, nil
   124  	}
   125  }
   126  

View as plain text