1  
    18  
    19  package api
    20  
    21  import (
    22  	"fmt"
    23  	"github.com/gorilla/mux"
    24  	"io/ioutil"
    25  	"net/http"
    26  	"scheduler/errors"
    27  	"scheduler/log"
    28  	"scheduler/memdb"
    29  	"scheduler/metrics"
    30  	"scheduler/scheduler"
    31  	"scheduler/service_discovery"
    32  	"scheduler/types"
    33  	"scheduler/utils"
    34  )
    35  
    36  func FunctionPost(w http.ResponseWriter, r *http.Request) {
    37  	executeFunction(w, r)
    38  }
    39  
    40  func FunctionGet(w http.ResponseWriter, r *http.Request) {
    41  	executeFunction(w, r)
    42  }
    43  
    44  
    47  
    48  func executeFunction(w http.ResponseWriter, r *http.Request) {
    49  	var err error
    50  	var jobResult *scheduler.JobResult
    51  	tracingId := HeadersGetRequestTracingId(r)
    52  
    53  	vars := mux.Vars(r)
    54  	function := vars["function"]
    55  	if function == "" {
    56  		errors.ReplyWithErrorMessage(&w, errors.GenericError, fmt.Sprintf("[T%s] service is not specified", tracingId), nil)
    57  		log.Log.Debugf("[T%s] service is not specified", tracingId)
    58  		return
    59  	}
    60  
    61  	var requestId uint64 = 0
    62  	
    63  	
    64  	requestId = memdb.GetNextRequestNumber()
    65  	
    66  
    67  	log.Log.Debugf("[R#%d,T%s] Execute function called for %s", requestId, tracingId, function)
    68  
    69  	payload, _ := ioutil.ReadAll(r.Body)
    70  	req := types.ServiceRequest{
    71  		Id:                 requestId,
    72  		IdTracing:          tracingId,
    73  		ServiceName:        function,
    74  		Payload:            payload,
    75  		PayloadContentType: r.Header.Get("Content-Type"),
    76  		External:           false,
    77  		Headers:            utils.HttpParseXHeaders(r.Header),
    78  	}
    79  
    80  	
    81  	
    82  	if headersCheckSchedulerBypass(r) {
    83  		jobResult, err = scheduler.ScheduleBypassAlgorithm(&req)
    84  	} else if headersCheckSchedulerForward(r) {
    85  		jobResult, err = scheduler.ScheduleForward(&req)
    86  	} else if headersCheckSchedulerReject(r) {
    87  		jobResult, err = scheduler.ScheduleReject(&req)
    88  	} else {
    89  		jobResult, err = scheduler.Schedule(&req)
    90  	}
    91  
    92  	
    93  
    94  	
    95  	if err != nil {
    96  		if _, ok := err.(scheduler.JobCannotBeScheduled); ok {
    97  			ReplyWithErrorFromJobResult(&w, errors.JobCannotBeScheduledError, jobResult, err.Error())
    98  			log.Log.Debugf("[R#%d,T%s] %s", requestId, tracingId, err.Error())
    99  			return
   100  		}
   101  		if _, ok := err.(scheduler.JobDeliberatelyRejected); ok {
   102  			ReplyWithErrorFromJobResult(&w, errors.JobDeliberatelyRejected, jobResult, err.Error())
   103  			log.Log.Debugf("[R#%d,T%s] %s", requestId, tracingId, err.Error())
   104  			return
   105  		}
   106  		if _, ok := err.(scheduler.CannotRetrieveAction); ok {
   107  			ReplyWithErrorFromJobResult(&w, errors.CannotRetrieveAction, jobResult, err.Error())
   108  			log.Log.Errorf("[R#%d,T%s] %s", requestId, tracingId, err.Error())
   109  			return
   110  		}
   111  		if _, ok := err.(scheduler.JobCannotBeForwarded); ok {
   112  			ReplyWithErrorFromJobResult(&w, errors.JobCouldNotBeForwarded, jobResult, err.Error())
   113  			log.Log.Errorf("[R#%d,T%s] %s", requestId, tracingId, err.Error())
   114  			return
   115  		}
   116  		if _, ok := err.(scheduler.PeerResponseNil); ok {
   117  			ReplyWithErrorFromJobResult(&w, errors.PeerResponseNil, jobResult, err.Error())
   118  			log.Log.Errorf("[R#%d,T%s] %s", requestId, tracingId, err.Error())
   119  			return
   120  		}
   121  		if _, ok := err.(scheduler.CannotRetrieveRecipientNode); ok {
   122  			ReplyWithErrorFromJobResult(&w, errors.CannotRetrieveRecipientNode, jobResult, err.Error())
   123  			log.Log.Errorf("[R#%d,T%s] %s", requestId, tracingId, err.Error())
   124  			return
   125  		}
   126  		ReplyWithErrorFromJobResult(&w, errors.GenericError, jobResult, fmt.Sprintf("[R#%d,T%s] Cannot schedule the service request: %s", requestId, tracingId, err.Error()))
   127  		log.Log.Debugf("[R#%d,T%s] Cannot schedule the service request: %s", requestId, tracingId, err.Error())
   128  		return
   129  	}
   130  
   131  	
   132  	if jobResult != nil && jobResult.Response != nil {
   133  		log.Log.Debugf("[R#%d,T%s] Execute function called for %s done: statusCode=%d", requestId, tracingId, function, jobResult.Response.StatusCode)
   134  	} else if jobResult == nil {
   135  		log.Log.Errorf("[R#%d,T%s] jobResult is nil", requestId, tracingId)
   136  		ReplyWithErrorFromJobResult(&w, errors.GenericError, jobResult, fmt.Sprintf("[R#%d,T%s] jobResult is nil", requestId, tracingId))
   137  		return
   138  	} else if jobResult.Response == nil {
   139  		log.Log.Errorf("[R#%d,T%s] jobResult.Response is nil", requestId, tracingId)
   140  		ReplyWithErrorFromJobResult(&w, errors.GenericError, jobResult, fmt.Sprintf("[R#%d,T%s] jobResult.Response is nil", requestId, tracingId))
   141  		return
   142  	}
   143  
   144  	
   145  	utils.ComputeTimings(jobResult.TimingsStart, jobResult.Timings)
   146  
   147  	
   148  	if jobResult.ExternalExecution && jobResult.ExternalExecutionInfo.PeersList != nil {
   149  		jobResult.ExternalExecutionInfo.PeersList = append(
   150  			jobResult.ExternalExecutionInfo.PeersList,
   151  			service_discovery.GetPeerDescriptor(jobResult.Timings),
   152  		)
   153  	} else if jobResult.ExternalExecution && jobResult.ExternalExecutionInfo.PeersList == nil {
   154  		log.Log.Fatalf("[R#%d,T%s] Job has been executed externally but its peers list is empty", requestId, tracingId)
   155  	}
   156  
   157  	
   158  	ReplyWithBodyFromJobResult(&req, &w, jobResult)
   159  
   160  	
   161  	defer metrics.PostJobInvocations(function, jobResult.Response.StatusCode)
   162  
   163  	defer log.Log.Debugf("[R#%d,T%s] %s success", requestId, tracingId, function)
   164  }
   165  
View as plain text