1
18
19 package api_peer
20
21 import (
22 "encoding/base64"
23 "encoding/json"
24 "github.com/gorilla/mux"
25 "io/ioutil"
26 "net/http"
27 "reflect"
28 "scheduler/api"
29 "scheduler/config"
30 "scheduler/errors"
31 "scheduler/log"
32 "scheduler/memdb"
33 "scheduler/scheduler"
34 "scheduler/service_discovery"
35 "scheduler/types"
36 "scheduler/utils"
37 )
38
39
40 func FunctionExecute(w http.ResponseWriter, r *http.Request) {
41 var requestId uint64 = 0
42 tracingId := api.HeadersGetRequestTracingId(r)
43
44 if !headersCheckUserAgentMachine(r) {
45 errors.ReplyWithError(&w, errors.GenericError, nil)
46 log.Log.Errorf("[R#%d,T%s] called from not a machine", requestId, tracingId)
47 return
48 }
49
50
51 if log.GetEnv() != config.RunningEnvironmentProduction {
52 requestId = memdb.GetNextRequestNumberFromPeers()
53 }
54
55 log.Log.Debugf("[R#%d,T%s] Request to execute function from peer %s", requestId, tracingId, r.RemoteAddr)
56
57 vars := mux.Vars(r)
58 function := vars["function"]
59 if function == "" {
60 errors.ReplyWithError(&w, errors.GenericError, nil)
61 log.Log.Debugf("[R#%d,T%s] service is not specified", requestId, tracingId)
62 return
63 }
64
65 bytes, err := ioutil.ReadAll(r.Body)
66 if err != nil {
67 log.Log.Errorf("[R#%d,T%s] Cannot parse input: %s", requestId, tracingId, err)
68 errors.ReplyWithError(&w, errors.InputNotValid, nil)
69 return
70 }
71
72 var peerRequest types.PeerJobRequest
73 err = json.Unmarshal(bytes, &peerRequest)
74 if err != nil {
75 log.Log.Errorf("[R#%d,T%s] Cannot parse json input: %s", requestId, tracingId, err)
76 errors.ReplyWithError(&w, errors.InputNotValid, nil)
77 return
78 }
79
80 peerRequest.ServiceIdRequest = requestId
81 peerRequest.ServiceIdTracing = tracingId
82
83 serviceRequest := types.ServiceRequest{
84 Id: requestId,
85 IdTracing: tracingId,
86 External: true,
87 ExternalJobRequest: &peerRequest,
88 ServiceName: function,
89 Payload: []byte(peerRequest.Payload),
90 PayloadContentType: peerRequest.ContentType,
91 Headers: utils.HttpParseXHeaders(r.Header),
92 }
93
94 log.Log.Debugf("[R#%d,T%s] type=%s, len(payload)=%d", requestId, tracingId, serviceRequest.PayloadContentType, len(serviceRequest.Payload))
95 log.Log.Debugf("[R#%d,T%s] len(peers)=%d, service=%s", requestId, tracingId, len(peerRequest.PeersList), serviceRequest.ServiceName)
96
97
98 jobResult, err := scheduler.Schedule(&serviceRequest)
99
100
101 peerResponse := preparePeerResponse(&peerRequest, &serviceRequest, jobResult, err)
102
103 responseBodyBytes, err := json.Marshal(peerResponse)
104 if err != nil {
105 log.Log.Errorf("[R#%d,T%s] Cannot marshal peerResponse: %s", requestId, tracingId, err)
106 errors.ReplyWithError(&w, errors.MarshalError, nil)
107 return
108 }
109
110 utils.HttpSendJSONResponse(&w, peerResponse.StatusCode, string(responseBodyBytes), nil)
111 }
112
113
114
115 func preparePeerResponse(peerRequest *types.PeerJobRequest, serviceRequest *types.ServiceRequest, jobResult *scheduler.JobResult, scheduleErr error) *types.PeerJobResponse {
116 log.Log.Debugf("[R#%d,T%s] Preparing peer response of job", serviceRequest.Id, peerRequest.ServiceIdTracing)
117
118 var res = types.PeerJobResponse{}
119
120 utils.ComputeTimings(jobResult.TimingsStart, jobResult.Timings)
121
122
123 if jobResult.ExternalExecution {
124 log.Log.Debugf("[R#%d,T%s] Job has been executed externally", serviceRequest.Id, peerRequest.ServiceIdTracing)
125
126 jobResult.ExternalExecutionInfo.PeersList = append(jobResult.ExternalExecutionInfo.PeersList, service_discovery.GetPeerDescriptor(jobResult.Timings))
127 } else {
128 log.Log.Debugf("[R#%d,T%s] Job has been executed internally", serviceRequest.Id, peerRequest.ServiceIdTracing)
129 jobResult.ExternalExecutionInfo = &scheduler.ExternalExecutionInfo{
130 PeersList: []types.PeersListMember{service_discovery.GetPeerDescriptor(jobResult.Timings)},
131 }
132 }
133
134 res.PeersList = jobResult.ExternalExecutionInfo.PeersList
135 res.Body = ""
136
137
138 if jobResult.Response != nil && jobResult.Response.Body != nil {
139 res.Body = string(jobResult.Response.Body)
140 res.StatusCode = 200
141 }
142
143
144 if scheduleErr != nil {
145 log.Log.Debugf("[R#%d,T%s] Job has scheduler error [%s]: %s ", peerRequest.ServiceIdRequest, peerRequest.ServiceIdTracing, reflect.TypeOf(scheduleErr), scheduleErr)
146
147
148 var errorStatusCode = -1
149 var errorJsonString = ""
150 var err error
151
152 if _, ok := scheduleErr.(scheduler.JobCannotBeScheduled); ok {
153 errorStatusCode, errorJsonString, err = errors.GetErrorJsonMessage(errors.JobCannotBeScheduledError, scheduleErr.Error())
154 log.Log.Debugf("[R#%d,T%s] %s", peerRequest.ServiceIdRequest, peerRequest.ServiceIdTracing, scheduleErr.Error())
155 } else if _, ok = scheduleErr.(scheduler.JobDeliberatelyRejected); ok {
156 errorStatusCode, errorJsonString, err = errors.GetErrorJsonMessage(errors.JobDeliberatelyRejected, scheduleErr.Error())
157 log.Log.Debugf("[R#%d,T%s] %s", peerRequest.ServiceIdRequest, peerRequest.ServiceIdTracing, scheduleErr.Error())
158 } else if _, ok = scheduleErr.(scheduler.CannotRetrieveAction); ok {
159 errorStatusCode, errorJsonString, err = errors.GetErrorJsonMessage(errors.CannotRetrieveAction, scheduleErr.Error())
160 log.Log.Errorf("[R#%d,T%s] %s", peerRequest.ServiceIdRequest, peerRequest.ServiceIdTracing, scheduleErr.Error())
161 } else if _, ok = scheduleErr.(scheduler.JobCannotBeForwarded); ok {
162 errorStatusCode, errorJsonString, err = errors.GetErrorJsonMessage(errors.JobCouldNotBeForwarded, scheduleErr.Error())
163 log.Log.Errorf("[R#%d,T%s] %s", peerRequest.ServiceIdRequest, peerRequest.ServiceIdTracing, scheduleErr.Error())
164 } else if _, ok = scheduleErr.(scheduler.PeerResponseNil); ok {
165 errorStatusCode, errorJsonString, err = errors.GetErrorJsonMessage(errors.PeerResponseNil, scheduleErr.Error())
166 log.Log.Errorf("[R#%d,T%s] %s", peerRequest.ServiceIdRequest, peerRequest.ServiceIdTracing, scheduleErr.Error())
167 } else {
168 errorStatusCode, errorJsonString, err = errors.GetErrorJsonMessage(errors.GenericError, scheduleErr.Error())
169 log.Log.Errorf("[R#%d,T%s] %s", peerRequest.ServiceIdRequest, peerRequest.ServiceIdTracing, scheduleErr.Error())
170 }
171
172 log.Log.Debugf("[R#%d,T%s] Job with error prepared status=%d json=%s", peerRequest.ServiceIdRequest, peerRequest.ServiceIdTracing, errorStatusCode, errorJsonString)
173
174 if err != nil {
175 log.Log.Errorf("Cannot prepare error json to return")
176 }
177
178 res.StatusCode = errorStatusCode
179 res.Body = errorJsonString
180 }
181
182
183 if res.Body != "" {
184
185
186 if !jobResult.ExternalExecution {
187
188 res.Body = base64.StdEncoding.EncodeToString([]byte(res.Body))
189 }
190
191
192 }
193
194 return &res
195 }
196
View as plain text