...

Source file src/scheduler/api/api_peer/function.go

Documentation: scheduler/api/api_peer

     1  /*
     2   * P2PFaaS - A framework for FaaS Load Balancing
     3   * Copyright (c) 2019 - 2022. Gabriele Proietti Mattia <pm.gabriele@outlook.com>
     4   *
     5   * This program is free software: you can redistribute it and/or modify
     6   * it under the terms of the GNU General Public License as published by
     7   * the Free Software Foundation, either version 3 of the License, or
     8   * (at your option) any later version.
     9   *
    10   * This program is distributed in the hope that it will be useful,
    11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
    12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13   * GNU General Public License for more details.
    14   *
    15   * You should have received a copy of the GNU General Public License
    16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
    17   */
    18  
    19  package api_peer
    20  
    21  import (
    22  	"encoding/base64"
    23  	"encoding/json"
    24  	"github.com/gorilla/mux"
    25  	"io/ioutil"
    26  	"net/http"
    27  	"reflect"
    28  	"scheduler/api"
    29  	"scheduler/config"
    30  	"scheduler/errors"
    31  	"scheduler/log"
    32  	"scheduler/memdb"
    33  	"scheduler/scheduler"
    34  	"scheduler/service_discovery"
    35  	"scheduler/types"
    36  	"scheduler/utils"
    37  )
    38  
    39  // FunctionExecute Execute a function. This function must called only by another node, and not a client.
    40  func FunctionExecute(w http.ResponseWriter, r *http.Request) {
    41  	var requestId uint64 = 0
    42  	tracingId := api.HeadersGetRequestTracingId(r)
    43  
    44  	if !headersCheckUserAgentMachine(r) {
    45  		errors.ReplyWithError(&w, errors.GenericError, nil)
    46  		log.Log.Errorf("[R#%d,T%s] called from not a machine", requestId, tracingId)
    47  		return
    48  	}
    49  
    50  	// assign id to requests if development
    51  	if log.GetEnv() != config.RunningEnvironmentProduction {
    52  		requestId = memdb.GetNextRequestNumberFromPeers()
    53  	}
    54  
    55  	log.Log.Debugf("[R#%d,T%s] Request to execute function from peer %s", requestId, tracingId, r.RemoteAddr)
    56  
    57  	vars := mux.Vars(r)
    58  	function := vars["function"]
    59  	if function == "" {
    60  		errors.ReplyWithError(&w, errors.GenericError, nil)
    61  		log.Log.Debugf("[R#%d,T%s] service is not specified", requestId, tracingId)
    62  		return
    63  	}
    64  
    65  	bytes, err := ioutil.ReadAll(r.Body)
    66  	if err != nil {
    67  		log.Log.Errorf("[R#%d,T%s] Cannot parse input: %s", requestId, tracingId, err)
    68  		errors.ReplyWithError(&w, errors.InputNotValid, nil)
    69  		return
    70  	}
    71  
    72  	var peerRequest types.PeerJobRequest
    73  	err = json.Unmarshal(bytes, &peerRequest)
    74  	if err != nil {
    75  		log.Log.Errorf("[R#%d,T%s] Cannot parse json input: %s", requestId, tracingId, err)
    76  		errors.ReplyWithError(&w, errors.InputNotValid, nil)
    77  		return
    78  	}
    79  
    80  	peerRequest.ServiceIdRequest = requestId
    81  	peerRequest.ServiceIdTracing = tracingId
    82  
    83  	serviceRequest := types.ServiceRequest{
    84  		Id:                 requestId,
    85  		IdTracing:          tracingId,
    86  		External:           true,
    87  		ExternalJobRequest: &peerRequest,
    88  		ServiceName:        function,
    89  		Payload:            []byte(peerRequest.Payload), // the payload is a string because request it's a peer request
    90  		PayloadContentType: peerRequest.ContentType,
    91  		Headers:            utils.HttpParseXHeaders(r.Header),
    92  	}
    93  
    94  	log.Log.Debugf("[R#%d,T%s] type=%s, len(payload)=%d", requestId, tracingId, serviceRequest.PayloadContentType, len(serviceRequest.Payload))
    95  	log.Log.Debugf("[R#%d,T%s] len(peers)=%d, service=%s", requestId, tracingId, len(peerRequest.PeersList), serviceRequest.ServiceName)
    96  
    97  	// schedule the job
    98  	jobResult, err := scheduler.Schedule(&serviceRequest)
    99  
   100  	// prepare response
   101  	peerResponse := preparePeerResponse(&peerRequest, &serviceRequest, jobResult, err)
   102  
   103  	responseBodyBytes, err := json.Marshal(peerResponse)
   104  	if err != nil {
   105  		log.Log.Errorf("[R#%d,T%s] Cannot marshal peerResponse: %s", requestId, tracingId, err)
   106  		errors.ReplyWithError(&w, errors.MarshalError, nil)
   107  		return
   108  	}
   109  
   110  	utils.HttpSendJSONResponse(&w, peerResponse.StatusCode, string(responseBodyBytes), nil)
   111  }
   112  
   113  // preparePeerResponse Prepares the response to another peer that invoked the function. Remember: jobResult MUST NOT be
   114  // nil even if there is a scheduleErr!
   115  func preparePeerResponse(peerRequest *types.PeerJobRequest, serviceRequest *types.ServiceRequest, jobResult *scheduler.JobResult, scheduleErr error) *types.PeerJobResponse {
   116  	log.Log.Debugf("[R#%d,T%s] Preparing peer response of job", serviceRequest.Id, peerRequest.ServiceIdTracing)
   117  
   118  	var res = types.PeerJobResponse{}
   119  
   120  	utils.ComputeTimings(jobResult.TimingsStart, jobResult.Timings)
   121  
   122  	// when job ends add us in the peers list
   123  	if jobResult.ExternalExecution {
   124  		log.Log.Debugf("[R#%d,T%s] Job has been executed externally", serviceRequest.Id, peerRequest.ServiceIdTracing)
   125  		// job has been executed externally even in from this node so external execution info is not nil
   126  		jobResult.ExternalExecutionInfo.PeersList = append(jobResult.ExternalExecutionInfo.PeersList, service_discovery.GetPeerDescriptor(jobResult.Timings))
   127  	} else {
   128  		log.Log.Debugf("[R#%d,T%s] Job has been executed internally", serviceRequest.Id, peerRequest.ServiceIdTracing)
   129  		jobResult.ExternalExecutionInfo = &scheduler.ExternalExecutionInfo{
   130  			PeersList: []types.PeersListMember{service_discovery.GetPeerDescriptor(jobResult.Timings)},
   131  		}
   132  	}
   133  
   134  	res.PeersList = jobResult.ExternalExecutionInfo.PeersList
   135  	res.Body = ""
   136  
   137  	// add response body
   138  	if jobResult.Response != nil && jobResult.Response.Body != nil {
   139  		res.Body = string(jobResult.Response.Body)
   140  		res.StatusCode = 200
   141  	}
   142  
   143  	// parse scheduler error
   144  	if scheduleErr != nil {
   145  		log.Log.Debugf("[R#%d,T%s] Job has scheduler error [%s]: %s ", peerRequest.ServiceIdRequest, peerRequest.ServiceIdTracing, reflect.TypeOf(scheduleErr), scheduleErr)
   146  
   147  		// compute the error json
   148  		var errorStatusCode = -1
   149  		var errorJsonString = ""
   150  		var err error
   151  
   152  		if _, ok := scheduleErr.(scheduler.JobCannotBeScheduled); ok {
   153  			errorStatusCode, errorJsonString, err = errors.GetErrorJsonMessage(errors.JobCannotBeScheduledError, scheduleErr.Error())
   154  			log.Log.Debugf("[R#%d,T%s] %s", peerRequest.ServiceIdRequest, peerRequest.ServiceIdTracing, scheduleErr.Error())
   155  		} else if _, ok = scheduleErr.(scheduler.JobDeliberatelyRejected); ok {
   156  			errorStatusCode, errorJsonString, err = errors.GetErrorJsonMessage(errors.JobDeliberatelyRejected, scheduleErr.Error())
   157  			log.Log.Debugf("[R#%d,T%s] %s", peerRequest.ServiceIdRequest, peerRequest.ServiceIdTracing, scheduleErr.Error())
   158  		} else if _, ok = scheduleErr.(scheduler.CannotRetrieveAction); ok {
   159  			errorStatusCode, errorJsonString, err = errors.GetErrorJsonMessage(errors.CannotRetrieveAction, scheduleErr.Error())
   160  			log.Log.Errorf("[R#%d,T%s] %s", peerRequest.ServiceIdRequest, peerRequest.ServiceIdTracing, scheduleErr.Error())
   161  		} else if _, ok = scheduleErr.(scheduler.JobCannotBeForwarded); ok {
   162  			errorStatusCode, errorJsonString, err = errors.GetErrorJsonMessage(errors.JobCouldNotBeForwarded, scheduleErr.Error())
   163  			log.Log.Errorf("[R#%d,T%s] %s", peerRequest.ServiceIdRequest, peerRequest.ServiceIdTracing, scheduleErr.Error())
   164  		} else if _, ok = scheduleErr.(scheduler.PeerResponseNil); ok {
   165  			errorStatusCode, errorJsonString, err = errors.GetErrorJsonMessage(errors.PeerResponseNil, scheduleErr.Error())
   166  			log.Log.Errorf("[R#%d,T%s] %s", peerRequest.ServiceIdRequest, peerRequest.ServiceIdTracing, scheduleErr.Error())
   167  		} else {
   168  			errorStatusCode, errorJsonString, err = errors.GetErrorJsonMessage(errors.GenericError, scheduleErr.Error())
   169  			log.Log.Errorf("[R#%d,T%s] %s", peerRequest.ServiceIdRequest, peerRequest.ServiceIdTracing, scheduleErr.Error())
   170  		}
   171  
   172  		log.Log.Debugf("[R#%d,T%s] Job with error prepared status=%d json=%s", peerRequest.ServiceIdRequest, peerRequest.ServiceIdTracing, errorStatusCode, errorJsonString)
   173  
   174  		if err != nil {
   175  			log.Log.Errorf("Cannot prepare error json to return")
   176  		}
   177  
   178  		res.StatusCode = errorStatusCode
   179  		res.Body = errorJsonString
   180  	}
   181  
   182  	// check if result has a response body
   183  	if res.Body != "" {
   184  		// If we have a peer request and we finally executed it here we need to encode the payload in base64
   185  		// We are the last node of the chain PC --> O --> O --> O <-this
   186  		if !jobResult.ExternalExecution {
   187  			// We need to base64 encode the output
   188  			res.Body = base64.StdEncoding.EncodeToString([]byte(res.Body))
   189  		} // else {
   190  		//	res.Body = string(jobResult.Response.Body) // job result from other nodes is always a base64 string
   191  		//}
   192  	}
   193  
   194  	return &res
   195  }
   196  

View as plain text