...
1
18
19
20 package queue
21
22 import (
23 "scheduler/config"
24 "scheduler/log"
25 "scheduler/memdb"
26 "scheduler/metrics"
27 "scheduler/types"
28 "scheduler/utils"
29 "sync"
30 "time"
31 )
32
33 var jobsQueue []*QueuedJob
34 var jobsQueueLength = 0
35 var jobsQueueLengthOfTypes = make(map[int64]int64)
36
37
38
39 var mutex sync.Mutex
40
41 var jobsSem = make(utils.Semaphore, 0)
42 var consumersSem = make(utils.Semaphore, config.GetRunningFunctionMax())
43
44 func init() {
45
46 metrics.PostParallelJobsSlots(int(config.GetRunningFunctionMax()))
47 metrics.PostQueueSize(int(config.GetQueueLengthMax()))
48 }
49
50
51 func EnqueueJob(request *types.ServiceRequest) (*QueuedJob, error) {
52 mutex.Lock()
53
54 if jobsQueueLength > 0 && jobsQueueLength >= int(config.GetQueueLengthMax()) {
55 log.Log.Debugf("[R#%d] Cannot enqueue job %s, queue is full", request.Id, request.ServiceName)
56 mutex.Unlock()
57 return nil, ErrorFull{}
58 }
59
60
61 sem := make(utils.Semaphore, 0)
62 job := &QueuedJob{
63 Request: request,
64 Semaphore: &sem,
65 Timings: &Timings{
66 ExecutionTime: 0.0,
67 FaasExecutionTime: 0.0,
68 QueueTime: 0.0,
69 },
70 }
71
72 jobsQueue = append(jobsQueue, job)
73 jobsQueueLength += 1
74 lengthIncreaseOfType(request.ServiceType)
75
76 log.Log.Debugf("[R#%d] Enqueued job %s", job.Request.Id, job.Request.ServiceName)
77
78
79 metrics.PostQueueAssignedSlot()
80
81
82 mutex.Unlock()
83
84
85 jobsSem.Signal()
86
87
88 startQueueTime := time.Now()
89
90
91 job.Semaphore.Wait(1)
92
93
94 job.Timings.QueueTime = time.Since(startQueueTime).Seconds()
95
96 return job, nil
97 }
98
99 func dequeueJob() *QueuedJob {
100 jobsSem.Wait(1)
101 mutex.Lock()
102
103 job := jobsQueue[0]
104
105 if len(jobsQueue) == 1 {
106 jobsQueue = []*QueuedJob{}
107 } else {
108 jobsQueue = jobsQueue[1:]
109 }
110
111 jobsQueueLength -= 1
112 lengthDecreaseOfType(job.Request.ServiceType)
113
114
115 metrics.PostQueueFreedSlot()
116
117 mutex.Unlock()
118 return job
119 }
120
121
124
125 func GetLength() int {
126 return jobsQueueLength
127 }
128
129 func GetLengthOfTypes() map[int64]int64 {
130 out := make(map[int64]int64)
131
132 mutex.Lock()
133 for index, element := range jobsQueueLengthOfTypes {
134 out[index] = element
135 }
136 mutex.Unlock()
137
138 return out
139 }
140
141
144
145 func lengthIncreaseOfType(jobType int64) {
146 num, exists := jobsQueueLengthOfTypes[jobType]
147 if !exists {
148 jobsQueueLengthOfTypes[jobType] = 1
149 return
150 }
151
152 jobsQueueLengthOfTypes[jobType] = num + 1
153 }
154
155 func lengthDecreaseOfType(jobType int64) {
156 num, exists := jobsQueueLengthOfTypes[jobType]
157 if !exists {
158 return
159 }
160
161 jobsQueueLengthOfTypes[jobType] = num - 1
162 }
163
164
167
168 func Looper() {
169 for {
170
171 consumersSem.Wait(1)
172 log.Log.Debugf("Consumer available! Queue has %d jobs in queue and %d running", GetLength(), memdb.GetTotalRunningFunctions())
173
174
175 job := dequeueJob()
176 log.Log.Debugf("QueuedJob available! Queue has %d jobs in queue and %d running", GetLength(), memdb.GetTotalRunningFunctions())
177
178
179 go executeNow(job)
180
181
182 }
183 }
184
View as plain text