1
18
19 package scheduler
20
21 import (
22 "fmt"
23 "scheduler/log"
24 "scheduler/memdb"
25 "scheduler/queue"
26 "scheduler/scheduler_service"
27 "scheduler/service_discovery"
28 "scheduler/service_learning"
29 "scheduler/types"
30 "scheduler/utils"
31 "strconv"
32 "time"
33 )
34
35 const LearningSchedulerName = "LearningScheduler"
36
37 const LearningSchedulerHeaderKeyState = "X-P2pfaas-Scheduler-Learning-State"
38 const LearningSchedulerHeaderKeyAction = "X-P2pfaas-Scheduler-Learning-Action"
39 const LearningSchedulerHeaderKeyEps = "X-P2pfaas-Scheduler-Learning-Eps"
40 const LearningSchedulerHeaderKeyEid = "X-P2pfaas-Scheduler-Learning-Eid"
41
42 const LearningSchedulerHeaderKeyFps = "X-P2pfaas-Scheduler-Learning-Fps"
43 const LearningSchedulerHeaderKeyTaskType = "X-P2pfaas-Scheduler-Learning-Task-Type"
44
45
46 type LearningScheduler struct {
47
48 NumberOfTaskTypes uint64
49 }
50
51 func (s LearningScheduler) GetFullName() string {
52 return fmt.Sprintf("%s(%d)", LearningSchedulerName, s.NumberOfTaskTypes)
53 }
54
55 func (s LearningScheduler) GetScheduler() *types.SchedulerDescriptor {
56 return &types.SchedulerDescriptor{
57 Name: LearningSchedulerName,
58 Parameters: []string{
59 fmt.Sprintf("%d", s.NumberOfTaskTypes),
60 },
61 }
62 }
63
64
65 func (s LearningScheduler) Schedule(req *types.ServiceRequest) (*JobResult, error) {
66 var err error
67 var actionRes *service_learning.EntryActOutput
68 var targetMachineIp string
69 var jobResult *JobResult
70
71 log.Log.Debugf("Scheduling job %s", req.ServiceName)
72 now := time.Now()
73 timingsStart := types.TimingsStart{ArrivedAt: &now}
74
75
76 if req.External {
77 return executeJobLocally(req, &timingsStart, s.GetFullName())
78 }
79
80 taskType := float64(0)
81
82 taskTypeStr := (*req.Headers)[LearningSchedulerHeaderKeyTaskType]
83 if taskTypeStr != "" {
84 taskType, err = strconv.ParseFloat(taskTypeStr, 64)
85 if err != nil {
86 log.Log.Warningf("Cannot parse fps in headers: %s", taskTypeStr)
87 taskType = 0
88 }
89 }
90 log.Log.Debugf("Using taskType=%f", taskType)
91
92
93 req.ServiceType = int64(taskType)
94
95
96 mapRunningFunctionsOfType := memdb.GetTotalRunningFunctionsOfType()
97 mapQueueLengthOfType := queue.GetLengthOfTypes()
98 statesMaps := []map[int64]int64{mapRunningFunctionsOfType, mapQueueLengthOfType}
99
100 state := []float64{taskType}
101 totalLoad := int64(0)
102
103
117
118
119 for _, stateMap := range statesMaps {
120 for i := 0; i < int(s.NumberOfTaskTypes); i++ {
121 loadOfState, exists := stateMap[int64(i)]
122 if !exists {
123 state = append(state, float64(0))
124 continue
125 }
126 totalLoad += loadOfState
127 state = append(state, float64(loadOfState))
128 }
129 }
130
131 actEntry := service_learning.EntryAct{
132 State: state,
133 }
134
135
136 actionRes, err = service_learning.SocketAct(&actEntry)
137 if err != nil {
138 return nil, CannotRetrieveAction{err}
139 }
140
141
142 actionInt := int64(actionRes.Action)
143 eps := actionRes.Eps
144
145 if actionInt == 0 {
146 result := JobResult{TimingsStart: &timingsStart, Scheduler: s.GetFullName()}
147 s.addHeadersToResult(&result, req.Id, state, actionRes.Action, eps)
148
149 timingsStart.ScheduledAt = utils.GetTimeNow()
150 return &result, JobDeliberatelyRejected{}
151 }
152
153 if actionInt == 1 {
154 timingsStart.ScheduledAt = utils.GetTimeNow()
155
156 jobResult, err = executeJobLocally(req, &timingsStart, s.GetFullName())
157 s.addHeadersToResult(jobResult, req.Id, state, actionRes.Action, eps)
158
159 return jobResult, err
160 }
161
162 if actionInt == 2 {
163
164 startedProbingTime := time.Now()
165 timingsStart.ProbingStartedAt = &startedProbingTime
166
167 leastLoaded, _, err := scheduler_service.GetLeastLoadedMachineOfNRandom(1, uint(totalLoad), true, true)
168
169 endProbingTime := time.Now()
170 timingsStart.ProbingEndedAt = &endProbingTime
171
172 if err != nil {
173 log.Log.Debugf("Error in retrieving machines %s", err.Error())
174
175
176 jobResult, err = executeJobLocally(req, &timingsStart, s.GetFullName())
177 s.addHeadersToResult(jobResult, req.Id, state, actionRes.Action, eps)
178
179 return jobResult, err
180 }
181
182 jobResult, err = executeJobExternally(req, leastLoaded, &timingsStart, s.GetFullName())
183 s.addHeadersToResult(jobResult, req.Id, state, actionRes.Action, eps)
184
185 return jobResult, err
186 }
187
188
189 targetMachineI := int64(actionRes.Action - 3)
190 targetMachineIp, err = service_discovery.GetMachineIpAtIndex(targetMachineI, true)
191 if err != nil {
192 log.Log.Errorf("Cannot schedule job to machine i=%d of %d: %s", targetMachineI, service_discovery.GetCachedMachineNumber(), err)
193 return nil, CannotRetrieveRecipientNode{err}
194 }
195 log.Log.Debugf("Forwarding to machine %s", targetMachineIp)
196
197 jobResult, err = executeJobExternally(req, targetMachineIp, &timingsStart, s.GetFullName())
198 s.addHeadersToResult(jobResult, req.Id, state, actionRes.Action, eps)
199
200 return jobResult, err
201 }
202
203 func (s LearningScheduler) addHeadersToResult(result *JobResult, reqId uint64, state []float64, action float64, eps float64) {
204 resultHeaders := map[string]string{}
205
206 resultHeaders[LearningSchedulerHeaderKeyEid] = fmt.Sprintf("%d", reqId)
207 resultHeaders[LearningSchedulerHeaderKeyState] = utils.ArrayFloatToStringCommas(state)
208 resultHeaders[LearningSchedulerHeaderKeyAction] = fmt.Sprintf("%f", action)
209 resultHeaders[LearningSchedulerHeaderKeyEps] = fmt.Sprintf("%f", eps)
210
211 if result != nil {
212 result.ResponseHeaders = &resultHeaders
213 } else {
214 log.Log.Errorf("result is nil, cannot add headers")
215 }
216 }
217
View as plain text