...

Source file src/scheduler/scheduler/learning_scheduler.go

Documentation: scheduler/scheduler

     1  /*
     2   * P2PFaaS - A framework for FaaS Load Balancing
     3   * Copyright (c) 2019 - 2022. Gabriele Proietti Mattia <pm.gabriele@outlook.com>
     4   *
     5   * This program is free software: you can redistribute it and/or modify
     6   * it under the terms of the GNU General Public License as published by
     7   * the Free Software Foundation, either version 3 of the License, or
     8   * (at your option) any later version.
     9   *
    10   * This program is distributed in the hope that it will be useful,
    11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
    12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13   * GNU General Public License for more details.
    14   *
    15   * You should have received a copy of the GNU General Public License
    16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
    17   */
    18  
    19  package scheduler
    20  
    21  import (
    22  	"fmt"
    23  	"scheduler/log"
    24  	"scheduler/memdb"
    25  	"scheduler/queue"
    26  	"scheduler/scheduler_service"
    27  	"scheduler/service_discovery"
    28  	"scheduler/service_learning"
    29  	"scheduler/types"
    30  	"scheduler/utils"
    31  	"strconv"
    32  	"time"
    33  )
    34  
    35  const LearningSchedulerName = "LearningScheduler"
    36  
    37  const LearningSchedulerHeaderKeyState = "X-P2pfaas-Scheduler-Learning-State"
    38  const LearningSchedulerHeaderKeyAction = "X-P2pfaas-Scheduler-Learning-Action"
    39  const LearningSchedulerHeaderKeyEps = "X-P2pfaas-Scheduler-Learning-Eps"
    40  const LearningSchedulerHeaderKeyEid = "X-P2pfaas-Scheduler-Learning-Eid"
    41  
    42  const LearningSchedulerHeaderKeyFps = "X-P2pfaas-Scheduler-Learning-Fps"
    43  const LearningSchedulerHeaderKeyTaskType = "X-P2pfaas-Scheduler-Learning-Task-Type"
    44  
    45  // LearningScheduler is a scheduler with makes scheduling decisions based on the Learner service which implements RL models
    46  type LearningScheduler struct {
    47  	// NumberOfTaskTypes is the number of task types which can arrive to the node
    48  	NumberOfTaskTypes uint64
    49  }
    50  
    51  func (s LearningScheduler) GetFullName() string {
    52  	return fmt.Sprintf("%s(%d)", LearningSchedulerName, s.NumberOfTaskTypes)
    53  }
    54  
    55  func (s LearningScheduler) GetScheduler() *types.SchedulerDescriptor {
    56  	return &types.SchedulerDescriptor{
    57  		Name: LearningSchedulerName,
    58  		Parameters: []string{
    59  			fmt.Sprintf("%d", s.NumberOfTaskTypes),
    60  		},
    61  	}
    62  }
    63  
    64  // Schedule a service request. This call is blocking until the job has been executed locally or externally.
    65  func (s LearningScheduler) Schedule(req *types.ServiceRequest) (*JobResult, error) {
    66  	var err error
    67  	var actionRes *service_learning.EntryActOutput
    68  	var targetMachineIp string
    69  	var jobResult *JobResult
    70  
    71  	log.Log.Debugf("Scheduling job %s", req.ServiceName)
    72  	now := time.Now()
    73  	timingsStart := types.TimingsStart{ArrivedAt: &now}
    74  
    75  	// if request is external, execute it locally
    76  	if req.External {
    77  		return executeJobLocally(req, &timingsStart, s.GetFullName())
    78  	}
    79  
    80  	taskType := float64(0)
    81  	// parse the current task type
    82  	taskTypeStr := (*req.Headers)[LearningSchedulerHeaderKeyTaskType]
    83  	if taskTypeStr != "" {
    84  		taskType, err = strconv.ParseFloat(taskTypeStr, 64)
    85  		if err != nil {
    86  			log.Log.Warningf("Cannot parse fps in headers: %s", taskTypeStr)
    87  			taskType = 0
    88  		}
    89  	}
    90  	log.Log.Debugf("Using taskType=%f", taskType)
    91  
    92  	// update the service request task type
    93  	req.ServiceType = int64(taskType)
    94  
    95  	// prepare the state to be sent to the learner
    96  	mapRunningFunctionsOfType := memdb.GetTotalRunningFunctionsOfType()
    97  	mapQueueLengthOfType := queue.GetLengthOfTypes()
    98  	statesMaps := []map[int64]int64{mapRunningFunctionsOfType, mapQueueLengthOfType}
    99  
   100  	state := []float64{taskType}
   101  	totalLoad := int64(0)
   102  
   103  	/*
   104  		// state summation of queues
   105  		for i := 0; i < int(s.NumberOfTaskTypes); i++ {
   106  			totalJobs := int64(0)
   107  			for _, stateMap := range statesMaps {
   108  				loadOfState, exists := stateMap[int64(i)]
   109  				if !exists {
   110  					continue
   111  				}
   112  				totalJobs = totalJobs + loadOfState
   113  			}
   114  			state = append(state, float64(totalJobs))
   115  		}
   116  	*/
   117  
   118  	// state detailed queues
   119  	for _, stateMap := range statesMaps {
   120  		for i := 0; i < int(s.NumberOfTaskTypes); i++ {
   121  			loadOfState, exists := stateMap[int64(i)]
   122  			if !exists {
   123  				state = append(state, float64(0))
   124  				continue
   125  			}
   126  			totalLoad += loadOfState
   127  			state = append(state, float64(loadOfState))
   128  		}
   129  	}
   130  
   131  	actEntry := service_learning.EntryAct{
   132  		State: state,
   133  	}
   134  
   135  	// make decision
   136  	actionRes, err = service_learning.SocketAct(&actEntry)
   137  	if err != nil {
   138  		return nil, CannotRetrieveAction{err}
   139  	}
   140  
   141  	// actuate the action
   142  	actionInt := int64(actionRes.Action)
   143  	eps := actionRes.Eps
   144  
   145  	if actionInt == 0 { // reject
   146  		result := JobResult{TimingsStart: &timingsStart, Scheduler: s.GetFullName()}
   147  		s.addHeadersToResult(&result, req.Id, state, actionRes.Action, eps)
   148  
   149  		timingsStart.ScheduledAt = utils.GetTimeNow()
   150  		return &result, JobDeliberatelyRejected{}
   151  	}
   152  
   153  	if actionInt == 1 { // execute locally
   154  		timingsStart.ScheduledAt = utils.GetTimeNow()
   155  
   156  		jobResult, err = executeJobLocally(req, &timingsStart, s.GetFullName())
   157  		s.addHeadersToResult(jobResult, req.Id, state, actionRes.Action, eps)
   158  
   159  		return jobResult, err
   160  	}
   161  
   162  	if actionInt == 2 { // probe-and-forward
   163  		// save time
   164  		startedProbingTime := time.Now()
   165  		timingsStart.ProbingStartedAt = &startedProbingTime
   166  		// get N Random machines and ask them for mapRunningFunctionsOfType and pick the least loaded
   167  		leastLoaded, _, err := scheduler_service.GetLeastLoadedMachineOfNRandom(1, uint(totalLoad), true, true)
   168  		// save time
   169  		endProbingTime := time.Now()
   170  		timingsStart.ProbingEndedAt = &endProbingTime
   171  
   172  		if err != nil {
   173  			log.Log.Debugf("Error in retrieving machines %s", err.Error())
   174  			// no machine less loaded than us, we are obliged to run the job in this machine or discard the job
   175  			// if we cannot handle it
   176  			jobResult, err = executeJobLocally(req, &timingsStart, s.GetFullName())
   177  			s.addHeadersToResult(jobResult, req.Id, state, actionRes.Action, eps)
   178  
   179  			return jobResult, err
   180  		}
   181  
   182  		jobResult, err = executeJobExternally(req, leastLoaded, &timingsStart, s.GetFullName())
   183  		s.addHeadersToResult(jobResult, req.Id, state, actionRes.Action, eps)
   184  
   185  		return jobResult, err
   186  	}
   187  
   188  	// otherwise forward
   189  	targetMachineI := int64(actionRes.Action - 3)
   190  	targetMachineIp, err = service_discovery.GetMachineIpAtIndex(targetMachineI, true)
   191  	if err != nil {
   192  		log.Log.Errorf("Cannot schedule job to machine i=%d of %d: %s", targetMachineI, service_discovery.GetCachedMachineNumber(), err)
   193  		return nil, CannotRetrieveRecipientNode{err}
   194  	}
   195  	log.Log.Debugf("Forwarding to machine %s", targetMachineIp)
   196  
   197  	jobResult, err = executeJobExternally(req, targetMachineIp, &timingsStart, s.GetFullName())
   198  	s.addHeadersToResult(jobResult, req.Id, state, actionRes.Action, eps)
   199  
   200  	return jobResult, err
   201  }
   202  
   203  func (s LearningScheduler) addHeadersToResult(result *JobResult, reqId uint64, state []float64, action float64, eps float64) {
   204  	resultHeaders := map[string]string{}
   205  
   206  	resultHeaders[LearningSchedulerHeaderKeyEid] = fmt.Sprintf("%d", reqId)
   207  	resultHeaders[LearningSchedulerHeaderKeyState] = utils.ArrayFloatToStringCommas(state)
   208  	resultHeaders[LearningSchedulerHeaderKeyAction] = fmt.Sprintf("%f", action)
   209  	resultHeaders[LearningSchedulerHeaderKeyEps] = fmt.Sprintf("%f", eps)
   210  
   211  	if result != nil {
   212  		result.ResponseHeaders = &resultHeaders
   213  	} else {
   214  		log.Log.Errorf("result is nil, cannot add headers")
   215  	}
   216  }
   217  

View as plain text