1
18
19
20 package scheduler
21
22 import (
23 "encoding/json"
24 "io/ioutil"
25 "scheduler/config"
26 "scheduler/log"
27 "scheduler/memdb"
28 "scheduler/service_learning"
29 "scheduler/types"
30 "strconv"
31 "time"
32 )
33
34
37
38
39 type scheduler interface {
40
41 GetFullName() string
42
43 GetScheduler() *types.SchedulerDescriptor
44
45
46 Schedule(req *types.ServiceRequest) (*JobResult, error)
47 }
48
49
52
53 var schedulerCurrent scheduler
54 var schedulerNoScheduler scheduler
55 var schedulerForward scheduler
56 var schedulerReject scheduler
57
58 func Start() {
59
60 }
61
62 func init() {
63 log.Log.Infof("Starting initialization of scheduler module")
64
65 useDefault := false
66
67 file, err := ioutil.ReadFile(config.GetConfigSchedulerFilePath())
68 if err != nil {
69 log.Log.Warningf("Could not read the scheduler configuration file at %s, using default", config.GetConfigSchedulerFilePath())
70 useDefault = true
71 }
72
73 if file != nil {
74 log.Log.Debugf("Read file is %s", file)
75 }
76
77 var proposedScheduler = types.SchedulerDescriptor{}
78 err = json.Unmarshal(file, &proposedScheduler)
79 if err != nil {
80 log.Log.Warningf("Could not decode scheduler config file, using default")
81 useDefault = true
82 } else {
83 err = SetScheduler(&proposedScheduler)
84 if err != nil {
85 useDefault = true
86 }
87 }
88
89 if useDefault {
90 schedulerCurrent = getDefaultScheduler()
91 } else {
92 log.Log.Debugf("Used configuration file")
93 }
94
95
96 schedulerNoScheduler = NoSchedulingScheduler{true}
97 schedulerForward = ForwardScheduler{1}
98 schedulerReject = RejectScheduler{}
99
100 log.Log.Infof("Init with '%s' scheduler", schedulerCurrent.GetFullName())
101 }
102
103
106
107
108 func Schedule(req *types.ServiceRequest) (*JobResult, error) {
109 return schedulerCurrent.Schedule(req)
110 }
111
112
113 func ScheduleBypassAlgorithm(req *types.ServiceRequest) (*JobResult, error) {
114 return schedulerNoScheduler.Schedule(req)
115 }
116
117
118 func ScheduleForward(req *types.ServiceRequest) (*JobResult, error) {
119 return schedulerForward.Schedule(req)
120 }
121
122
123 func ScheduleReject(req *types.ServiceRequest) (*JobResult, error) {
124 return schedulerReject.Schedule(req)
125 }
126
127
130
131
132 func GetName() string {
133 return schedulerCurrent.GetFullName()
134 }
135
136
137 func GetScheduler() *types.SchedulerDescriptor {
138 return schedulerCurrent.GetScheduler()
139 }
140
141
142 func SetScheduler(sched *types.SchedulerDescriptor) error {
143 log.Log.Debugf("Starting setting of scheduler %s", sched)
144
145 if memdb.GetTotalRunningFunctions() != 0 {
146 return CannotChangeScheduler{}
147 }
148
149 switch sched.Name {
150
151 case NoSchedulingSchedulerName:
152 if len(sched.Parameters) < 1 {
153 return BadSchedulerParameters{}
154 }
155 l, err := strconv.ParseBool(sched.Parameters[0])
156 if err != nil {
157 return BadSchedulerParameters{}
158 }
159 schedulerCurrent = &NoSchedulingScheduler{Loss: l}
160 break
161
162 case ForwardSchedulerName:
163 if len(sched.Parameters) < 1 {
164 return BadSchedulerParameters{}
165 }
166 m, err := strconv.ParseUint(sched.Parameters[0], 10, 32)
167 if err != nil {
168 return BadSchedulerParameters{}
169 }
170 schedulerCurrent = &ForwardScheduler{
171 MaxHops: uint(m),
172 }
173 break
174
175 case PowerOfNSchedulerName:
176 if len(sched.Parameters) < 4 {
177 return BadSchedulerParameters{}
178 }
179 f, err1 := strconv.ParseUint(sched.Parameters[0], 10, 32)
180 t, err2 := strconv.ParseUint(sched.Parameters[1], 10, 32)
181 l, err3 := strconv.ParseBool(sched.Parameters[2])
182 m, err4 := strconv.ParseUint(sched.Parameters[3], 10, 32)
183 if err1 != nil || err2 != nil || err3 != nil || err4 != nil {
184 return BadSchedulerParameters{}
185 }
186 schedulerCurrent = &PowerOfNScheduler{
187 F: uint(f),
188 T: uint(t),
189 Loss: l,
190 MaxHops: uint(m),
191 }
192 break
193
194 case PowerOfNSchedulerTauName:
195 if len(sched.Parameters) < 5 {
196 return BadSchedulerParameters{}
197 }
198 f, err1 := strconv.ParseUint(sched.Parameters[0], 10, 32)
199 T, err2 := strconv.ParseUint(sched.Parameters[1], 10, 32)
200 l, err3 := strconv.ParseBool(sched.Parameters[2])
201 m, err4 := strconv.ParseUint(sched.Parameters[3], 10, 32)
202 t, err5 := time.ParseDuration(sched.Parameters[4])
203 if err1 != nil || err2 != nil || err3 != nil || err4 != nil || err5 != nil {
204 return BadSchedulerParameters{}
205 }
206 schedulerCurrent = &PowerOfNSchedulerTau{
207 F: uint(f),
208 T: uint(T),
209 Loss: l,
210 MaxHops: uint(m),
211 Tau: t,
212 }
213 break
214
215 case RoundRobinWithMasterSchedulerName:
216 if len(sched.Parameters) < 3 {
217 return BadSchedulerParameters{}
218 }
219 m, err1 := strconv.ParseBool(sched.Parameters[0])
220 i := sched.Parameters[1]
221 l, err2 := strconv.ParseBool(sched.Parameters[2])
222 if err1 != nil || err2 != nil {
223 return BadSchedulerParameters{}
224 }
225 schedulerCurrent = &RoundRobinWithMasterScheduler{
226 Master: m,
227 MasterIP: i,
228 Loss: l,
229 currentIndex: 0,
230 }
231
232 case LearningSchedulerName:
233 if len(sched.Parameters) < 1 {
234 return BadSchedulerParameters{}
235 }
236
237 jobTypes, err := strconv.ParseUint(sched.Parameters[0], 10, 64)
238 if err != nil {
239 return BadSchedulerParameters{}
240 }
241
242 schedulerCurrent = &LearningScheduler{jobTypes}
243
244 default:
245 return BadSchedulerParameters{}
246
247 }
248
249
250 if _, ok := schedulerCurrent.(*LearningScheduler); ok {
251 service_learning.Start()
252 } else {
253 service_learning.Stop()
254 }
255
256 return nil
257 }
258
259 func getDefaultScheduler() scheduler {
260
265 return &PowerOfNScheduler{
266 F: 1,
267 T: 2,
268 Loss: true,
269 MaxHops: 1,
270 }
271 }
272
View as plain text