...

Source file src/scheduler/scheduler/scheduler.go

Documentation: scheduler/scheduler

     1  /*
     2   * P2PFaaS - A framework for FaaS Load Balancing
     3   * Copyright (c) 2019 - 2022. Gabriele Proietti Mattia <pm.gabriele@outlook.com>
     4   *
     5   * This program is free software: you can redistribute it and/or modify
     6   * it under the terms of the GNU General Public License as published by
     7   * the Free Software Foundation, either version 3 of the License, or
     8   * (at your option) any later version.
     9   *
    10   * This program is distributed in the hope that it will be useful,
    11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
    12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13   * GNU General Public License for more details.
    14   *
    15   * You should have received a copy of the GNU General Public License
    16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
    17   */
    18  
    19  // Package scheduler implements the core scheduler of the system.
    20  package scheduler
    21  
    22  import (
    23  	"encoding/json"
    24  	"io/ioutil"
    25  	"scheduler/config"
    26  	"scheduler/log"
    27  	"scheduler/memdb"
    28  	"scheduler/service_learning"
    29  	"scheduler/types"
    30  	"strconv"
    31  	"time"
    32  )
    33  
    34  /*
    35   * Interfaces
    36   */
    37  
    38  // scheduler defines how a scheduler is made
    39  type scheduler interface {
    40  	// GetFullName used for returning the name of the scheduler with the parameters
    41  	GetFullName() string
    42  	// GetScheduler returns the representation of the scheduler
    43  	GetScheduler() *types.SchedulerDescriptor
    44  	// Schedule a job. This function must be blocking and must return only when the job has been completed, or we cannot
    45  	// schedule it. When this function returns we assume that the client will receive a reply
    46  	Schedule(req *types.ServiceRequest) (*JobResult, error)
    47  }
    48  
    49  /*
    50   * Code
    51   */
    52  
    53  var schedulerCurrent scheduler
    54  var schedulerNoScheduler scheduler
    55  var schedulerForward scheduler
    56  var schedulerReject scheduler
    57  
    58  func Start() {
    59  
    60  }
    61  
    62  func init() {
    63  	log.Log.Infof("Starting initialization of scheduler module")
    64  
    65  	useDefault := false
    66  	// try to read the configuration file
    67  	file, err := ioutil.ReadFile(config.GetConfigSchedulerFilePath())
    68  	if err != nil {
    69  		log.Log.Warningf("Could not read the scheduler configuration file at %s, using default", config.GetConfigSchedulerFilePath())
    70  		useDefault = true
    71  	}
    72  
    73  	if file != nil {
    74  		log.Log.Debugf("Read file is %s", file)
    75  	}
    76  
    77  	var proposedScheduler = types.SchedulerDescriptor{}
    78  	err = json.Unmarshal(file, &proposedScheduler)
    79  	if err != nil {
    80  		log.Log.Warningf("Could not decode scheduler config file, using default")
    81  		useDefault = true
    82  	} else {
    83  		err = SetScheduler(&proposedScheduler)
    84  		if err != nil {
    85  			useDefault = true
    86  		}
    87  	}
    88  
    89  	if useDefault {
    90  		schedulerCurrent = getDefaultScheduler()
    91  	} else {
    92  		log.Log.Debugf("Used configuration file")
    93  	}
    94  
    95  	// init the no-scheduler
    96  	schedulerNoScheduler = NoSchedulingScheduler{true}
    97  	schedulerForward = ForwardScheduler{1}
    98  	schedulerReject = RejectScheduler{}
    99  
   100  	log.Log.Infof("Init with '%s' scheduler", schedulerCurrent.GetFullName())
   101  }
   102  
   103  /*
   104   * Actions
   105   */
   106  
   107  // Schedule schedules a service request with the current set scheduler
   108  func Schedule(req *types.ServiceRequest) (*JobResult, error) {
   109  	return schedulerCurrent.Schedule(req)
   110  }
   111  
   112  // ScheduleBypassAlgorithm schedules a service request with the NoScheduler algorithm which always execute locally the function
   113  func ScheduleBypassAlgorithm(req *types.ServiceRequest) (*JobResult, error) {
   114  	return schedulerNoScheduler.Schedule(req)
   115  }
   116  
   117  // ScheduleForward schedules a service request with the ForwardScheduler algorithm which always forward the request to a random node
   118  func ScheduleForward(req *types.ServiceRequest) (*JobResult, error) {
   119  	return schedulerForward.Schedule(req)
   120  }
   121  
   122  // ScheduleReject schedules a service request with the RejectScheduler algorithm which always reject the request
   123  func ScheduleReject(req *types.ServiceRequest) (*JobResult, error) {
   124  	return schedulerReject.Schedule(req)
   125  }
   126  
   127  /*
   128   * types.SchedulerDescriptor info related
   129   */
   130  
   131  // GetName returns the friendly name of current set scheduler
   132  func GetName() string {
   133  	return schedulerCurrent.GetFullName()
   134  }
   135  
   136  // GetScheduler returns the types.SchedulerDescriptor object of current set scheduler
   137  func GetScheduler() *types.SchedulerDescriptor {
   138  	return schedulerCurrent.GetScheduler()
   139  }
   140  
   141  // SetScheduler replaces the current scheduler with the passed one
   142  func SetScheduler(sched *types.SchedulerDescriptor) error {
   143  	log.Log.Debugf("Starting setting of scheduler %s", sched)
   144  
   145  	if memdb.GetTotalRunningFunctions() != 0 {
   146  		return CannotChangeScheduler{}
   147  	}
   148  
   149  	switch sched.Name {
   150  
   151  	case NoSchedulingSchedulerName:
   152  		if len(sched.Parameters) < 1 {
   153  			return BadSchedulerParameters{}
   154  		}
   155  		l, err := strconv.ParseBool(sched.Parameters[0])
   156  		if err != nil {
   157  			return BadSchedulerParameters{}
   158  		}
   159  		schedulerCurrent = &NoSchedulingScheduler{Loss: l}
   160  		break
   161  
   162  	case ForwardSchedulerName:
   163  		if len(sched.Parameters) < 1 {
   164  			return BadSchedulerParameters{}
   165  		}
   166  		m, err := strconv.ParseUint(sched.Parameters[0], 10, 32)
   167  		if err != nil {
   168  			return BadSchedulerParameters{}
   169  		}
   170  		schedulerCurrent = &ForwardScheduler{
   171  			MaxHops: uint(m),
   172  		}
   173  		break
   174  
   175  	case PowerOfNSchedulerName:
   176  		if len(sched.Parameters) < 4 {
   177  			return BadSchedulerParameters{}
   178  		}
   179  		f, err1 := strconv.ParseUint(sched.Parameters[0], 10, 32)
   180  		t, err2 := strconv.ParseUint(sched.Parameters[1], 10, 32)
   181  		l, err3 := strconv.ParseBool(sched.Parameters[2])
   182  		m, err4 := strconv.ParseUint(sched.Parameters[3], 10, 32)
   183  		if err1 != nil || err2 != nil || err3 != nil || err4 != nil {
   184  			return BadSchedulerParameters{}
   185  		}
   186  		schedulerCurrent = &PowerOfNScheduler{
   187  			F:       uint(f),
   188  			T:       uint(t),
   189  			Loss:    l,
   190  			MaxHops: uint(m),
   191  		}
   192  		break
   193  
   194  	case PowerOfNSchedulerTauName:
   195  		if len(sched.Parameters) < 5 {
   196  			return BadSchedulerParameters{}
   197  		}
   198  		f, err1 := strconv.ParseUint(sched.Parameters[0], 10, 32)
   199  		T, err2 := strconv.ParseUint(sched.Parameters[1], 10, 32)
   200  		l, err3 := strconv.ParseBool(sched.Parameters[2])
   201  		m, err4 := strconv.ParseUint(sched.Parameters[3], 10, 32)
   202  		t, err5 := time.ParseDuration(sched.Parameters[4]) // duration: 10s, 200ms, etc.
   203  		if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil {
   204  			return BadSchedulerParameters{}
   205  		}
   206  		schedulerCurrent = &PowerOfNSchedulerTau{
   207  			F:       uint(f),
   208  			T:       uint(T),
   209  			Loss:    l,
   210  			MaxHops: uint(m),
   211  			Tau:     t,
   212  		}
   213  		break
   214  
   215  	case RoundRobinWithMasterSchedulerName:
   216  		if len(sched.Parameters) < 3 {
   217  			return BadSchedulerParameters{}
   218  		}
   219  		m, err1 := strconv.ParseBool(sched.Parameters[0])
   220  		i := sched.Parameters[1]
   221  		l, err2 := strconv.ParseBool(sched.Parameters[2])
   222  		if err1 != nil || err2 != nil {
   223  			return BadSchedulerParameters{}
   224  		}
   225  		schedulerCurrent = &RoundRobinWithMasterScheduler{
   226  			Master:       m,
   227  			MasterIP:     i,
   228  			Loss:         l,
   229  			currentIndex: 0,
   230  		}
   231  
   232  	case LearningSchedulerName:
   233  		if len(sched.Parameters) < 1 {
   234  			return BadSchedulerParameters{}
   235  		}
   236  
   237  		jobTypes, err := strconv.ParseUint(sched.Parameters[0], 10, 64)
   238  		if err != nil {
   239  			return BadSchedulerParameters{}
   240  		}
   241  
   242  		schedulerCurrent = &LearningScheduler{jobTypes}
   243  
   244  	default:
   245  		return BadSchedulerParameters{}
   246  
   247  	}
   248  
   249  	// start the learning module if learning scheduler
   250  	if _, ok := schedulerCurrent.(*LearningScheduler); ok {
   251  		service_learning.Start()
   252  	} else {
   253  		service_learning.Stop()
   254  	}
   255  
   256  	return nil
   257  }
   258  
   259  func getDefaultScheduler() scheduler {
   260  	/*
   261  		return NoSchedulingScheduler{
   262  			Loss: true,
   263  		}
   264  	*/
   265  	return &PowerOfNScheduler{
   266  		F:       1,
   267  		T:       2,
   268  		Loss:    true,
   269  		MaxHops: 1,
   270  	}
   271  }
   272  

View as plain text