...

Source file src/scheduler/memdb/memdb.go

Documentation: scheduler/memdb

     1  /*
     2   * P2PFaaS - A framework for FaaS Load Balancing
     3   * Copyright (c) 2019 - 2022. Gabriele Proietti Mattia <pm.gabriele@outlook.com>
     4   *
     5   * This program is free software: you can redistribute it and/or modify
     6   * it under the terms of the GNU General Public License as published by
     7   * the Free Software Foundation, either version 3 of the License, or
     8   * (at your option) any later version.
     9   *
    10   * This program is distributed in the hope that it will be useful,
    11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
    12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13   * GNU General Public License for more details.
    14   *
    15   * You should have received a copy of the GNU General Public License
    16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
    17   */
    18  
    19  // Package memdb implements a fast way for in-memory variables.
    20  package memdb
    21  
    22  import (
    23  	"scheduler/config"
    24  	"scheduler/log"
    25  	"scheduler/metrics"
    26  	"sync"
    27  )
    28  
    29  type Function struct {
    30  	Name             string
    31  	RunningInstances uint
    32  }
    33  
    34  type ErrorFunctionNotFound struct{}
    35  
    36  func (ErrorFunctionNotFound) Error() string {
    37  	return "Function not found"
    38  }
    39  
    40  /*
    41   * Code
    42   */
    43  
    44  var functions []*Function
    45  var totalRunningFunctions uint = 0
    46  
    47  var totalRunningFunctionsOfTypes = make(map[int64]int64) // the number of running tasks according to the type
    48  
    49  var requestNumber uint64 = 0
    50  var requestNumberFromPeers uint64 = 0
    51  
    52  var mutexRunningFunctions sync.Mutex
    53  var mutexRequestNumber sync.Mutex
    54  var mutexRequestNumberFromPeers sync.Mutex
    55  
    56  func GetRunningInstances(functionName string) (uint, error) {
    57  	mutexRunningFunctions.Lock()
    58  
    59  	fn := getFunction(functionName, true)
    60  	if fn == nil {
    61  		mutexRunningFunctions.Unlock()
    62  		return 0, ErrorFunctionNotFound{}
    63  	}
    64  
    65  	mutexRunningFunctions.Unlock()
    66  
    67  	return fn.RunningInstances, nil
    68  }
    69  
    70  func SetFunctionRunning(functionName string, functionType int64) error {
    71  	mutexRunningFunctions.Lock()
    72  
    73  	log.Log.Debugf("Setting %s as running", functionName)
    74  
    75  	fn := getFunction(functionName, true)
    76  	if fn == nil {
    77  		mutexRunningFunctions.Unlock()
    78  		return ErrorFunctionNotFound{}
    79  	}
    80  
    81  	fn.RunningInstances += 1
    82  	totalRunningFunctions += 1
    83  	totalRunningFunctionsOfTypeIncrease(functionType)
    84  
    85  	// metrics
    86  	metrics.PostStartedExecutingJob()
    87  
    88  	mutexRunningFunctions.Unlock()
    89  	return nil
    90  }
    91  
    92  func SetFunctionStopped(functionName string, functionType int64) error {
    93  	mutexRunningFunctions.Lock()
    94  
    95  	log.Log.Debugf("Setting %s as stopped", functionName)
    96  
    97  	fn := getFunction(functionName, false)
    98  	if fn == nil {
    99  		mutexRunningFunctions.Unlock()
   100  		return ErrorFunctionNotFound{}
   101  	}
   102  
   103  	fn.RunningInstances -= 1
   104  	totalRunningFunctions -= 1
   105  	totalRunningFunctionsOfTypeDecrease(functionType)
   106  
   107  	// metrics
   108  	metrics.PostStoppedExecutingJob()
   109  
   110  	mutexRunningFunctions.Unlock()
   111  	return nil
   112  }
   113  
   114  func GetTotalRunningFunctions() uint {
   115  	return totalRunningFunctions
   116  }
   117  
   118  func GetTotalRunningFunctionsOfType() map[int64]int64 {
   119  	out := make(map[int64]int64)
   120  
   121  	mutexRunningFunctions.Lock()
   122  	for index, element := range totalRunningFunctionsOfTypes {
   123  		out[index] = element
   124  	}
   125  	mutexRunningFunctions.Unlock()
   126  
   127  	return out
   128  }
   129  
   130  func GetFreeRunningSlots() int {
   131  	return int(config.GetRunningFunctionMax()) - int(GetTotalRunningFunctions())
   132  }
   133  
   134  // GetNextRequestNumber returns the next id for the request
   135  func GetNextRequestNumber() uint64 {
   136  	mutexRequestNumber.Lock()
   137  	requestNumber++
   138  	n := requestNumber
   139  	mutexRequestNumber.Unlock()
   140  	return n
   141  }
   142  
   143  // GetNextRequestNumberFromPeers returns the next id for the request
   144  func GetNextRequestNumberFromPeers() uint64 {
   145  	mutexRequestNumberFromPeers.Lock()
   146  	requestNumberFromPeers++
   147  	n := requestNumberFromPeers
   148  	mutexRequestNumberFromPeers.Unlock()
   149  	return n
   150  }
   151  
   152  /*
   153   * Utils
   154   */
   155  
   156  func getFunction(functionName string, createIfNotExists bool) *Function {
   157  	for _, fn := range functions {
   158  		if fn.Name == functionName {
   159  			return fn
   160  		}
   161  	}
   162  
   163  	log.Log.Debugf("%s function not found, creating", functionName)
   164  
   165  	if createIfNotExists {
   166  		newFn := Function{
   167  			Name:             functionName,
   168  			RunningInstances: 0,
   169  		}
   170  		functions = append(functions, &newFn)
   171  		return &newFn
   172  	}
   173  
   174  	return nil
   175  }
   176  
   177  func totalRunningFunctionsOfTypeIncrease(jobType int64) {
   178  	num, exists := totalRunningFunctionsOfTypes[jobType]
   179  	if !exists {
   180  		totalRunningFunctionsOfTypes[jobType] = 1
   181  		return
   182  	}
   183  
   184  	totalRunningFunctionsOfTypes[jobType] = num + 1
   185  }
   186  
   187  func totalRunningFunctionsOfTypeDecrease(jobType int64) {
   188  	num, exists := totalRunningFunctionsOfTypes[jobType]
   189  	if !exists {
   190  		return
   191  	}
   192  
   193  	totalRunningFunctionsOfTypes[jobType] = num - 1
   194  }
   195  

View as plain text