...
  
  
     1  
    18  
    19  
    20  package memdb
    21  
    22  import (
    23  	"scheduler/config"
    24  	"scheduler/log"
    25  	"scheduler/metrics"
    26  	"sync"
    27  )
    28  
    29  type Function struct {
    30  	Name             string
    31  	RunningInstances uint
    32  }
    33  
    34  type ErrorFunctionNotFound struct{}
    35  
    36  func (ErrorFunctionNotFound) Error() string {
    37  	return "Function not found"
    38  }
    39  
    40  
    43  
    44  var functions []*Function
    45  var totalRunningFunctions uint = 0
    46  
    47  var totalRunningFunctionsOfTypes = make(map[int64]int64) 
    48  
    49  var requestNumber uint64 = 0
    50  var requestNumberFromPeers uint64 = 0
    51  
    52  var mutexRunningFunctions sync.Mutex
    53  var mutexRequestNumber sync.Mutex
    54  var mutexRequestNumberFromPeers sync.Mutex
    55  
    56  func GetRunningInstances(functionName string) (uint, error) {
    57  	mutexRunningFunctions.Lock()
    58  
    59  	fn := getFunction(functionName, true)
    60  	if fn == nil {
    61  		mutexRunningFunctions.Unlock()
    62  		return 0, ErrorFunctionNotFound{}
    63  	}
    64  
    65  	mutexRunningFunctions.Unlock()
    66  
    67  	return fn.RunningInstances, nil
    68  }
    69  
    70  func SetFunctionRunning(functionName string, functionType int64) error {
    71  	mutexRunningFunctions.Lock()
    72  
    73  	log.Log.Debugf("Setting %s as running", functionName)
    74  
    75  	fn := getFunction(functionName, true)
    76  	if fn == nil {
    77  		mutexRunningFunctions.Unlock()
    78  		return ErrorFunctionNotFound{}
    79  	}
    80  
    81  	fn.RunningInstances += 1
    82  	totalRunningFunctions += 1
    83  	totalRunningFunctionsOfTypeIncrease(functionType)
    84  
    85  	
    86  	metrics.PostStartedExecutingJob()
    87  
    88  	mutexRunningFunctions.Unlock()
    89  	return nil
    90  }
    91  
    92  func SetFunctionStopped(functionName string, functionType int64) error {
    93  	mutexRunningFunctions.Lock()
    94  
    95  	log.Log.Debugf("Setting %s as stopped", functionName)
    96  
    97  	fn := getFunction(functionName, false)
    98  	if fn == nil {
    99  		mutexRunningFunctions.Unlock()
   100  		return ErrorFunctionNotFound{}
   101  	}
   102  
   103  	fn.RunningInstances -= 1
   104  	totalRunningFunctions -= 1
   105  	totalRunningFunctionsOfTypeDecrease(functionType)
   106  
   107  	
   108  	metrics.PostStoppedExecutingJob()
   109  
   110  	mutexRunningFunctions.Unlock()
   111  	return nil
   112  }
   113  
   114  func GetTotalRunningFunctions() uint {
   115  	return totalRunningFunctions
   116  }
   117  
   118  func GetTotalRunningFunctionsOfType() map[int64]int64 {
   119  	out := make(map[int64]int64)
   120  
   121  	mutexRunningFunctions.Lock()
   122  	for index, element := range totalRunningFunctionsOfTypes {
   123  		out[index] = element
   124  	}
   125  	mutexRunningFunctions.Unlock()
   126  
   127  	return out
   128  }
   129  
   130  func GetFreeRunningSlots() int {
   131  	return int(config.GetRunningFunctionMax()) - int(GetTotalRunningFunctions())
   132  }
   133  
   134  
   135  func GetNextRequestNumber() uint64 {
   136  	mutexRequestNumber.Lock()
   137  	requestNumber++
   138  	n := requestNumber
   139  	mutexRequestNumber.Unlock()
   140  	return n
   141  }
   142  
   143  
   144  func GetNextRequestNumberFromPeers() uint64 {
   145  	mutexRequestNumberFromPeers.Lock()
   146  	requestNumberFromPeers++
   147  	n := requestNumberFromPeers
   148  	mutexRequestNumberFromPeers.Unlock()
   149  	return n
   150  }
   151  
   152  
   155  
   156  func getFunction(functionName string, createIfNotExists bool) *Function {
   157  	for _, fn := range functions {
   158  		if fn.Name == functionName {
   159  			return fn
   160  		}
   161  	}
   162  
   163  	log.Log.Debugf("%s function not found, creating", functionName)
   164  
   165  	if createIfNotExists {
   166  		newFn := Function{
   167  			Name:             functionName,
   168  			RunningInstances: 0,
   169  		}
   170  		functions = append(functions, &newFn)
   171  		return &newFn
   172  	}
   173  
   174  	return nil
   175  }
   176  
   177  func totalRunningFunctionsOfTypeIncrease(jobType int64) {
   178  	num, exists := totalRunningFunctionsOfTypes[jobType]
   179  	if !exists {
   180  		totalRunningFunctionsOfTypes[jobType] = 1
   181  		return
   182  	}
   183  
   184  	totalRunningFunctionsOfTypes[jobType] = num + 1
   185  }
   186  
   187  func totalRunningFunctionsOfTypeDecrease(jobType int64) {
   188  	num, exists := totalRunningFunctionsOfTypes[jobType]
   189  	if !exists {
   190  		return
   191  	}
   192  
   193  	totalRunningFunctionsOfTypes[jobType] = num - 1
   194  }
   195  
View as plain text