...
1
18
19
20 package memdb
21
22 import (
23 "scheduler/config"
24 "scheduler/log"
25 "scheduler/metrics"
26 "sync"
27 )
28
29 type Function struct {
30 Name string
31 RunningInstances uint
32 }
33
34 type ErrorFunctionNotFound struct{}
35
36 func (ErrorFunctionNotFound) Error() string {
37 return "Function not found"
38 }
39
40
43
44 var functions []*Function
45 var totalRunningFunctions uint = 0
46
47 var totalRunningFunctionsOfTypes = make(map[int64]int64)
48
49 var requestNumber uint64 = 0
50 var requestNumberFromPeers uint64 = 0
51
52 var mutexRunningFunctions sync.Mutex
53 var mutexRequestNumber sync.Mutex
54 var mutexRequestNumberFromPeers sync.Mutex
55
56 func GetRunningInstances(functionName string) (uint, error) {
57 mutexRunningFunctions.Lock()
58
59 fn := getFunction(functionName, true)
60 if fn == nil {
61 mutexRunningFunctions.Unlock()
62 return 0, ErrorFunctionNotFound{}
63 }
64
65 mutexRunningFunctions.Unlock()
66
67 return fn.RunningInstances, nil
68 }
69
70 func SetFunctionRunning(functionName string, functionType int64) error {
71 mutexRunningFunctions.Lock()
72
73 log.Log.Debugf("Setting %s as running", functionName)
74
75 fn := getFunction(functionName, true)
76 if fn == nil {
77 mutexRunningFunctions.Unlock()
78 return ErrorFunctionNotFound{}
79 }
80
81 fn.RunningInstances += 1
82 totalRunningFunctions += 1
83 totalRunningFunctionsOfTypeIncrease(functionType)
84
85
86 metrics.PostStartedExecutingJob()
87
88 mutexRunningFunctions.Unlock()
89 return nil
90 }
91
92 func SetFunctionStopped(functionName string, functionType int64) error {
93 mutexRunningFunctions.Lock()
94
95 log.Log.Debugf("Setting %s as stopped", functionName)
96
97 fn := getFunction(functionName, false)
98 if fn == nil {
99 mutexRunningFunctions.Unlock()
100 return ErrorFunctionNotFound{}
101 }
102
103 fn.RunningInstances -= 1
104 totalRunningFunctions -= 1
105 totalRunningFunctionsOfTypeDecrease(functionType)
106
107
108 metrics.PostStoppedExecutingJob()
109
110 mutexRunningFunctions.Unlock()
111 return nil
112 }
113
114 func GetTotalRunningFunctions() uint {
115 return totalRunningFunctions
116 }
117
118 func GetTotalRunningFunctionsOfType() map[int64]int64 {
119 out := make(map[int64]int64)
120
121 mutexRunningFunctions.Lock()
122 for index, element := range totalRunningFunctionsOfTypes {
123 out[index] = element
124 }
125 mutexRunningFunctions.Unlock()
126
127 return out
128 }
129
130 func GetFreeRunningSlots() int {
131 return int(config.GetRunningFunctionMax()) - int(GetTotalRunningFunctions())
132 }
133
134
135 func GetNextRequestNumber() uint64 {
136 mutexRequestNumber.Lock()
137 requestNumber++
138 n := requestNumber
139 mutexRequestNumber.Unlock()
140 return n
141 }
142
143
144 func GetNextRequestNumberFromPeers() uint64 {
145 mutexRequestNumberFromPeers.Lock()
146 requestNumberFromPeers++
147 n := requestNumberFromPeers
148 mutexRequestNumberFromPeers.Unlock()
149 return n
150 }
151
152
155
156 func getFunction(functionName string, createIfNotExists bool) *Function {
157 for _, fn := range functions {
158 if fn.Name == functionName {
159 return fn
160 }
161 }
162
163 log.Log.Debugf("%s function not found, creating", functionName)
164
165 if createIfNotExists {
166 newFn := Function{
167 Name: functionName,
168 RunningInstances: 0,
169 }
170 functions = append(functions, &newFn)
171 return &newFn
172 }
173
174 return nil
175 }
176
177 func totalRunningFunctionsOfTypeIncrease(jobType int64) {
178 num, exists := totalRunningFunctionsOfTypes[jobType]
179 if !exists {
180 totalRunningFunctionsOfTypes[jobType] = 1
181 return
182 }
183
184 totalRunningFunctionsOfTypes[jobType] = num + 1
185 }
186
187 func totalRunningFunctionsOfTypeDecrease(jobType int64) {
188 num, exists := totalRunningFunctionsOfTypes[jobType]
189 if !exists {
190 return
191 }
192
193 totalRunningFunctionsOfTypes[jobType] = num - 1
194 }
195
View as plain text