...

Source file src/scheduler/api/function.go

Documentation: scheduler/api

     1  /*
     2   * P2PFaaS - A framework for FaaS Load Balancing
     3   * Copyright (c) 2019 - 2022. Gabriele Proietti Mattia <pm.gabriele@outlook.com>
     4   *
     5   * This program is free software: you can redistribute it and/or modify
     6   * it under the terms of the GNU General Public License as published by
     7   * the Free Software Foundation, either version 3 of the License, or
     8   * (at your option) any later version.
     9   *
    10   * This program is distributed in the hope that it will be useful,
    11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
    12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13   * GNU General Public License for more details.
    14   *
    15   * You should have received a copy of the GNU General Public License
    16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
    17   */
    18  
    19  package api
    20  
    21  import (
    22  	"fmt"
    23  	"github.com/gorilla/mux"
    24  	"io/ioutil"
    25  	"net/http"
    26  	"scheduler/errors"
    27  	"scheduler/log"
    28  	"scheduler/memdb"
    29  	"scheduler/metrics"
    30  	"scheduler/scheduler"
    31  	"scheduler/service_discovery"
    32  	"scheduler/types"
    33  	"scheduler/utils"
    34  )
    35  
    36  func FunctionPost(w http.ResponseWriter, r *http.Request) {
    37  	executeFunction(w, r)
    38  }
    39  
    40  func FunctionGet(w http.ResponseWriter, r *http.Request) {
    41  	executeFunction(w, r)
    42  }
    43  
    44  /*
    45   * utils
    46   */
    47  
    48  func executeFunction(w http.ResponseWriter, r *http.Request) {
    49  	var err error
    50  	var jobResult *scheduler.JobResult
    51  	tracingId := HeadersGetRequestTracingId(r)
    52  
    53  	vars := mux.Vars(r)
    54  	function := vars["function"]
    55  	if function == "" {
    56  		errors.ReplyWithErrorMessage(&w, errors.GenericError, fmt.Sprintf("[T%s] service is not specified", tracingId), nil)
    57  		log.Log.Debugf("[T%s] service is not specified", tracingId)
    58  		return
    59  	}
    60  
    61  	var requestId uint64 = 0
    62  	// assign id to requests if development
    63  	// if log.GetEnv() != config.RunningEnvironmentProduction {
    64  	requestId = memdb.GetNextRequestNumber()
    65  	//}
    66  
    67  	log.Log.Debugf("[R#%d,T%s] Execute function called for %s", requestId, tracingId, function)
    68  
    69  	payload, _ := ioutil.ReadAll(r.Body)
    70  	req := types.ServiceRequest{
    71  		Id:                 requestId,
    72  		IdTracing:          tracingId,
    73  		ServiceName:        function,
    74  		Payload:            payload,
    75  		PayloadContentType: r.Header.Get("Content-Type"),
    76  		External:           false,
    77  		Headers:            utils.HttpParseXHeaders(r.Header),
    78  	}
    79  
    80  	// schedule the function execution forced if development
    81  	// if config.IsRunningEnvironmentDevelopment() {
    82  	if headersCheckSchedulerBypass(r) {
    83  		jobResult, err = scheduler.ScheduleBypassAlgorithm(&req)
    84  	} else if headersCheckSchedulerForward(r) {
    85  		jobResult, err = scheduler.ScheduleForward(&req)
    86  	} else if headersCheckSchedulerReject(r) {
    87  		jobResult, err = scheduler.ScheduleReject(&req)
    88  	} else {
    89  		jobResult, err = scheduler.Schedule(&req)
    90  	}
    91  
    92  	/* This is blocking */
    93  
    94  	// check if any error
    95  	if err != nil {
    96  		if _, ok := err.(scheduler.JobCannotBeScheduled); ok {
    97  			ReplyWithErrorFromJobResult(&w, errors.JobCannotBeScheduledError, jobResult, err.Error())
    98  			log.Log.Debugf("[R#%d,T%s] %s", requestId, tracingId, err.Error())
    99  			return
   100  		}
   101  		if _, ok := err.(scheduler.JobDeliberatelyRejected); ok {
   102  			ReplyWithErrorFromJobResult(&w, errors.JobDeliberatelyRejected, jobResult, err.Error())
   103  			log.Log.Debugf("[R#%d,T%s] %s", requestId, tracingId, err.Error())
   104  			return
   105  		}
   106  		if _, ok := err.(scheduler.CannotRetrieveAction); ok {
   107  			ReplyWithErrorFromJobResult(&w, errors.CannotRetrieveAction, jobResult, err.Error())
   108  			log.Log.Errorf("[R#%d,T%s] %s", requestId, tracingId, err.Error())
   109  			return
   110  		}
   111  		if _, ok := err.(scheduler.JobCannotBeForwarded); ok {
   112  			ReplyWithErrorFromJobResult(&w, errors.JobCouldNotBeForwarded, jobResult, err.Error())
   113  			log.Log.Errorf("[R#%d,T%s] %s", requestId, tracingId, err.Error())
   114  			return
   115  		}
   116  		if _, ok := err.(scheduler.PeerResponseNil); ok {
   117  			ReplyWithErrorFromJobResult(&w, errors.PeerResponseNil, jobResult, err.Error())
   118  			log.Log.Errorf("[R#%d,T%s] %s", requestId, tracingId, err.Error())
   119  			return
   120  		}
   121  		if _, ok := err.(scheduler.CannotRetrieveRecipientNode); ok {
   122  			ReplyWithErrorFromJobResult(&w, errors.CannotRetrieveRecipientNode, jobResult, err.Error())
   123  			log.Log.Errorf("[R#%d,T%s] %s", requestId, tracingId, err.Error())
   124  			return
   125  		}
   126  		ReplyWithErrorFromJobResult(&w, errors.GenericError, jobResult, fmt.Sprintf("[R#%d,T%s] Cannot schedule the service request: %s", requestId, tracingId, err.Error()))
   127  		log.Log.Debugf("[R#%d,T%s] Cannot schedule the service request: %s", requestId, tracingId, err.Error())
   128  		return
   129  	}
   130  
   131  	// check results
   132  	if jobResult != nil && jobResult.Response != nil {
   133  		log.Log.Debugf("[R#%d,T%s] Execute function called for %s done: statusCode=%d", requestId, tracingId, function, jobResult.Response.StatusCode)
   134  	} else if jobResult == nil {
   135  		log.Log.Errorf("[R#%d,T%s] jobResult is nil", requestId, tracingId)
   136  		ReplyWithErrorFromJobResult(&w, errors.GenericError, jobResult, fmt.Sprintf("[R#%d,T%s] jobResult is nil", requestId, tracingId))
   137  		return
   138  	} else if jobResult.Response == nil {
   139  		log.Log.Errorf("[R#%d,T%s] jobResult.Response is nil", requestId, tracingId)
   140  		ReplyWithErrorFromJobResult(&w, errors.GenericError, jobResult, fmt.Sprintf("[R#%d,T%s] jobResult.Response is nil", requestId, tracingId))
   141  		return
   142  	}
   143  
   144  	// Compute timings
   145  	utils.ComputeTimings(jobResult.TimingsStart, jobResult.Timings)
   146  
   147  	// Add us in list if job is executed externally
   148  	if jobResult.ExternalExecution && jobResult.ExternalExecutionInfo.PeersList != nil {
   149  		jobResult.ExternalExecutionInfo.PeersList = append(
   150  			jobResult.ExternalExecutionInfo.PeersList,
   151  			service_discovery.GetPeerDescriptor(jobResult.Timings),
   152  		)
   153  	} else if jobResult.ExternalExecution && jobResult.ExternalExecutionInfo.PeersList == nil {
   154  		log.Log.Fatalf("[R#%d,T%s] Job has been executed externally but its peers list is empty", requestId, tracingId)
   155  	}
   156  
   157  	// reply to client
   158  	ReplyWithBodyFromJobResult(&req, &w, jobResult)
   159  
   160  	// metrics
   161  	defer metrics.PostJobInvocations(function, jobResult.Response.StatusCode)
   162  
   163  	defer log.Log.Debugf("[R#%d,T%s] %s success", requestId, tracingId, function)
   164  }
   165  

View as plain text