1
18
19 package api
20
21 import (
22 "fmt"
23 "github.com/gorilla/mux"
24 "io/ioutil"
25 "net/http"
26 "scheduler/errors"
27 "scheduler/log"
28 "scheduler/memdb"
29 "scheduler/metrics"
30 "scheduler/scheduler"
31 "scheduler/service_discovery"
32 "scheduler/types"
33 "scheduler/utils"
34 )
35
36 func FunctionPost(w http.ResponseWriter, r *http.Request) {
37 executeFunction(w, r)
38 }
39
40 func FunctionGet(w http.ResponseWriter, r *http.Request) {
41 executeFunction(w, r)
42 }
43
44
47
48 func executeFunction(w http.ResponseWriter, r *http.Request) {
49 var err error
50 var jobResult *scheduler.JobResult
51 tracingId := HeadersGetRequestTracingId(r)
52
53 vars := mux.Vars(r)
54 function := vars["function"]
55 if function == "" {
56 errors.ReplyWithErrorMessage(&w, errors.GenericError, fmt.Sprintf("[T%s] service is not specified", tracingId), nil)
57 log.Log.Debugf("[T%s] service is not specified", tracingId)
58 return
59 }
60
61 var requestId uint64 = 0
62
63
64 requestId = memdb.GetNextRequestNumber()
65
66
67 log.Log.Debugf("[R#%d,T%s] Execute function called for %s", requestId, tracingId, function)
68
69 payload, _ := ioutil.ReadAll(r.Body)
70 req := types.ServiceRequest{
71 Id: requestId,
72 IdTracing: tracingId,
73 ServiceName: function,
74 Payload: payload,
75 PayloadContentType: r.Header.Get("Content-Type"),
76 External: false,
77 Headers: utils.HttpParseXHeaders(r.Header),
78 }
79
80
81
82 if headersCheckSchedulerBypass(r) {
83 jobResult, err = scheduler.ScheduleBypassAlgorithm(&req)
84 } else if headersCheckSchedulerForward(r) {
85 jobResult, err = scheduler.ScheduleForward(&req)
86 } else if headersCheckSchedulerReject(r) {
87 jobResult, err = scheduler.ScheduleReject(&req)
88 } else {
89 jobResult, err = scheduler.Schedule(&req)
90 }
91
92
93
94
95 if err != nil {
96 if _, ok := err.(scheduler.JobCannotBeScheduled); ok {
97 ReplyWithErrorFromJobResult(&w, errors.JobCannotBeScheduledError, jobResult, err.Error())
98 log.Log.Debugf("[R#%d,T%s] %s", requestId, tracingId, err.Error())
99 return
100 }
101 if _, ok := err.(scheduler.JobDeliberatelyRejected); ok {
102 ReplyWithErrorFromJobResult(&w, errors.JobDeliberatelyRejected, jobResult, err.Error())
103 log.Log.Debugf("[R#%d,T%s] %s", requestId, tracingId, err.Error())
104 return
105 }
106 if _, ok := err.(scheduler.CannotRetrieveAction); ok {
107 ReplyWithErrorFromJobResult(&w, errors.CannotRetrieveAction, jobResult, err.Error())
108 log.Log.Errorf("[R#%d,T%s] %s", requestId, tracingId, err.Error())
109 return
110 }
111 if _, ok := err.(scheduler.JobCannotBeForwarded); ok {
112 ReplyWithErrorFromJobResult(&w, errors.JobCouldNotBeForwarded, jobResult, err.Error())
113 log.Log.Errorf("[R#%d,T%s] %s", requestId, tracingId, err.Error())
114 return
115 }
116 if _, ok := err.(scheduler.PeerResponseNil); ok {
117 ReplyWithErrorFromJobResult(&w, errors.PeerResponseNil, jobResult, err.Error())
118 log.Log.Errorf("[R#%d,T%s] %s", requestId, tracingId, err.Error())
119 return
120 }
121 if _, ok := err.(scheduler.CannotRetrieveRecipientNode); ok {
122 ReplyWithErrorFromJobResult(&w, errors.CannotRetrieveRecipientNode, jobResult, err.Error())
123 log.Log.Errorf("[R#%d,T%s] %s", requestId, tracingId, err.Error())
124 return
125 }
126 ReplyWithErrorFromJobResult(&w, errors.GenericError, jobResult, fmt.Sprintf("[R#%d,T%s] Cannot schedule the service request: %s", requestId, tracingId, err.Error()))
127 log.Log.Debugf("[R#%d,T%s] Cannot schedule the service request: %s", requestId, tracingId, err.Error())
128 return
129 }
130
131
132 if jobResult != nil && jobResult.Response != nil {
133 log.Log.Debugf("[R#%d,T%s] Execute function called for %s done: statusCode=%d", requestId, tracingId, function, jobResult.Response.StatusCode)
134 } else if jobResult == nil {
135 log.Log.Errorf("[R#%d,T%s] jobResult is nil", requestId, tracingId)
136 ReplyWithErrorFromJobResult(&w, errors.GenericError, jobResult, fmt.Sprintf("[R#%d,T%s] jobResult is nil", requestId, tracingId))
137 return
138 } else if jobResult.Response == nil {
139 log.Log.Errorf("[R#%d,T%s] jobResult.Response is nil", requestId, tracingId)
140 ReplyWithErrorFromJobResult(&w, errors.GenericError, jobResult, fmt.Sprintf("[R#%d,T%s] jobResult.Response is nil", requestId, tracingId))
141 return
142 }
143
144
145 utils.ComputeTimings(jobResult.TimingsStart, jobResult.Timings)
146
147
148 if jobResult.ExternalExecution && jobResult.ExternalExecutionInfo.PeersList != nil {
149 jobResult.ExternalExecutionInfo.PeersList = append(
150 jobResult.ExternalExecutionInfo.PeersList,
151 service_discovery.GetPeerDescriptor(jobResult.Timings),
152 )
153 } else if jobResult.ExternalExecution && jobResult.ExternalExecutionInfo.PeersList == nil {
154 log.Log.Fatalf("[R#%d,T%s] Job has been executed externally but its peers list is empty", requestId, tracingId)
155 }
156
157
158 ReplyWithBodyFromJobResult(&req, &w, jobResult)
159
160
161 defer metrics.PostJobInvocations(function, jobResult.Response.StatusCode)
162
163 defer log.Log.Debugf("[R#%d,T%s] %s success", requestId, tracingId, function)
164 }
165
View as plain text