1
18
19 package api
20
21 import (
22 "encoding/base64"
23 "encoding/json"
24 "fmt"
25 "net/http"
26 "scheduler/config"
27 "scheduler/errors"
28 "scheduler/log"
29 "scheduler/scheduler"
30 "scheduler/types"
31 "scheduler/utils"
32 )
33
34
37
38 func headersCheckSchedulerBypass(req *http.Request) bool {
39 return req.Header.Get(utils.HttpHeaderP2PFaaSSchedulerBypass) != ""
40 }
41
42 func headersCheckSchedulerForward(req *http.Request) bool {
43 return req.Header.Get(utils.HttpHeaderP2PFaaSSchedulerForward) != ""
44 }
45
46 func headersCheckSchedulerReject(req *http.Request) bool {
47 return req.Header.Get(utils.HttpHeaderP2PFaaSSchedulerReject) != ""
48 }
49
50 func HeadersGetRequestTracingId(req *http.Request) string {
51 return req.Header.Get(utils.HttpHeaderP2PFaaSSchedulerTracingId)
52 }
53
54 func HttpGetHeadersFromFramework() map[string]string {
55 return map[string]string{
56 utils.HttpHeaderP2PFaaSVersion: config.AppVersion,
57 utils.HttpHeaderP2PFaaSScheduler: scheduler.GetName(),
58 }
59 }
60
61 func HttpGetHeadersFromJobResult(result *scheduler.JobResult) map[string]string {
62 if result == nil {
63 return map[string]string{}
64 }
65
66
67 output := map[string]string{
68 utils.HttpHeaderP2PFaaSScheduler: result.Scheduler,
69 }
70
71
72 if result.ResponseHeaders != nil {
73 output = utils.MapsMerge(output, *result.ResponseHeaders)
74 }
75
76 return output
77 }
78
79 func HttpGetHeadersXFromResponse(apiResponse *types.APIResponse) map[string]string {
80 output := map[string]string{}
81
82 if apiResponse == nil || apiResponse.Headers == nil {
83 log.Log.Debugf("[R#%d] apiResponse is nil or headers is nil")
84 return output
85 }
86
87 for key, value := range apiResponse.Headers {
88 if string(key[0]) == "X" {
89 output[key] = value[0]
90 }
91 }
92
93 return output
94 }
95
96 func HttpGetHeadersFunctionExecution(jobResult *scheduler.JobResult) map[string]string {
97 output := map[string]string{}
98
99 if jobResult == nil {
100 return output
101 }
102
103
104 if !jobResult.ExternalExecution {
105 if jobResult.Timings == nil {
106 return output
107 }
108
109 if jobResult.Timings.TotalTime != nil {
110 output[utils.HttpHeaderP2PFaaSTotalTimingsList] = fmt.Sprintf("[%f]", *jobResult.Timings.TotalTime)
111 }
112 if jobResult.Timings.SchedulingTime != nil {
113 output[utils.HttpHeaderP2PFaaSSchedulingTimingsList] = fmt.Sprintf("[%f]", *jobResult.Timings.SchedulingTime)
114 }
115 if jobResult.Timings.ProbingTime != nil {
116 output[utils.HttpHeaderP2PFaaSProbingTimingsList] = fmt.Sprintf("[%f]", *jobResult.Timings.ProbingTime)
117 }
118
119
120 if jobResult.Timings.ExecutionTime != nil {
121 output[utils.HttpHeaderP2PFaaSExecutionTime] = fmt.Sprintf("%f", *jobResult.Timings.ExecutionTime)
122 }
123 }
124
125
126 if jobResult.ExternalExecution {
127 hops := len(jobResult.ExternalExecutionInfo.PeersList) - 1
128 output[utils.HttpHeaderP2PFaaSExternallyExecuted] = "True"
129 output[utils.HttpHeaderP2PFaaSHops] = fmt.Sprintf("%d", hops)
130
131 if len(jobResult.ExternalExecutionInfo.PeersList) == 0 {
132 log.Log.Fatalf("Peers list is empty and the jobResult has been executed externally")
133 }
134
135 if jobResult.ExternalExecutionInfo.PeersList[0].Timings.ExecutionTime != nil {
136 output[utils.HttpHeaderP2PFaaSExecutionTime] = fmt.Sprintf("%f", *jobResult.ExternalExecutionInfo.PeersList[0].Timings.ExecutionTime)
137 }
138
139 var ipList []string
140 var idList []string
141 var probingTimes []float64
142 var totalTimes []float64
143 var schedulingTimes []float64
144
145 peers := jobResult.ExternalExecutionInfo.PeersList
146 for i := len(jobResult.ExternalExecutionInfo.PeersList) - 1; i >= 0; i-- {
147 ipList = append(ipList, peers[i].MachineIp)
148 idList = append(idList, peers[i].MachineId)
149
150 if peers[i].Timings.ProbingTime != nil {
151 probingTimes = append(probingTimes, *peers[i].Timings.ProbingTime)
152 } else {
153 probingTimes = append(probingTimes, 0.0)
154 }
155
156 if peers[i].Timings.TotalTime != nil {
157 totalTimes = append(totalTimes, *peers[i].Timings.TotalTime)
158 } else {
159 totalTimes = append(totalTimes, 0.0)
160 }
161
162 if peers[i].Timings.SchedulingTime != nil {
163 schedulingTimes = append(schedulingTimes, *peers[i].Timings.SchedulingTime)
164 } else {
165 schedulingTimes = append(schedulingTimes, 0.0)
166 }
167 }
168
169 ipListJ, _ := json.Marshal(ipList)
170 idListJ, _ := json.Marshal(idList)
171 totalTimesJ, _ := json.Marshal(totalTimes)
172 schedulingTimesJ, _ := json.Marshal(schedulingTimes)
173 probingTimesJ, _ := json.Marshal(probingTimes)
174
175 output[utils.HttpHeaderP2PFaaSPeersListIp] = fmt.Sprintf("%s", string(ipListJ))
176 output[utils.HttpHeaderP2PFaaSPeersListId] = fmt.Sprintf("%s", string(idListJ))
177
178 output[utils.HttpHeaderP2PFaaSTotalTimingsList] = fmt.Sprintf("%s", string(totalTimesJ))
179 output[utils.HttpHeaderP2PFaaSProbingTimingsList] = fmt.Sprintf("%s", string(probingTimesJ))
180 output[utils.HttpHeaderP2PFaaSSchedulingTimingsList] = fmt.Sprintf("%s", string(schedulingTimesJ))
181 }
182
183 return output
184 }
185
186 func ReplyWithErrorFromJobResult(w *http.ResponseWriter, errorCode int, jobResult *scheduler.JobResult, message string) {
187 finalHeaders := HttpGetHeadersFromFramework()
188
189
190 if jobResult != nil {
191 finalHeaders = utils.MapsMerge(
192 finalHeaders,
193 HttpGetHeadersFromJobResult(jobResult),
194 )
195
196 if jobResult.Response != nil {
197 finalHeaders = utils.MapsMerge(
198 finalHeaders,
199 HttpGetHeadersXFromResponse(jobResult.Response),
200 )
201 }
202 }
203
204 utils.HttpAddHeadersToResponse(w, &finalHeaders)
205
206 if message != "" {
207 errors.ReplyWithErrorMessage(w, errorCode, message, nil)
208 } else {
209 errors.ReplyWithError(w, errorCode, nil)
210 }
211 }
212
213 func ReplyWithBodyFromJobResult(serviceRequest *types.ServiceRequest, w *http.ResponseWriter, jobResult *scheduler.JobResult) {
214 var err error
215
216
217 finalHeaders := utils.MapsMerge(
218 HttpGetHeadersFromFramework(),
219 HttpGetHeadersFromJobResult(jobResult),
220 HttpGetHeadersXFromResponse(jobResult.Response),
221 HttpGetHeadersFunctionExecution(jobResult),
222 )
223 utils.HttpAddHeadersToResponse(w, &finalHeaders)
224
225 (*w).WriteHeader(jobResult.Response.StatusCode)
226
227
228 if jobResult.Response.Body != nil && len(jobResult.Response.Body) > 0 {
229 log.Log.Debugf("[R#%d,T%s] Job body has length %d, external=%t", serviceRequest.Id, serviceRequest.IdTracing, len(jobResult.Response.Body), jobResult.ExternalExecution)
230
231 var outputBody []byte
232
233
234
235 if jobResult.ExternalExecution {
236 outputBody, err = base64.StdEncoding.DecodeString(string(jobResult.Response.Body))
237 if err != nil {
238 log.Log.Errorf("[R#%d,T%s] Cannot decode job output: %s", serviceRequest.IdTracing, serviceRequest.Id, err)
239 return
240 }
241 } else {
242 outputBody = jobResult.Response.Body
243 }
244
245
246 _, err = (*w).Write(outputBody)
247 if err != nil {
248 log.Log.Errorf("[R#%d,T%s] Cannot write job output: %s", serviceRequest.Id, serviceRequest.IdTracing, err.Error())
249 return
250 }
251 }
252 }
253
View as plain text