...

Source file src/scheduler/api/utils.go

Documentation: scheduler/api

     1  /*
     2   * P2PFaaS - A framework for FaaS Load Balancing
     3   * Copyright (c) 2019 - 2022. Gabriele Proietti Mattia <pm.gabriele@outlook.com>
     4   *
     5   * This program is free software: you can redistribute it and/or modify
     6   * it under the terms of the GNU General Public License as published by
     7   * the Free Software Foundation, either version 3 of the License, or
     8   * (at your option) any later version.
     9   *
    10   * This program is distributed in the hope that it will be useful,
    11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
    12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13   * GNU General Public License for more details.
    14   *
    15   * You should have received a copy of the GNU General Public License
    16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
    17   */
    18  
    19  package api
    20  
    21  import (
    22  	"encoding/base64"
    23  	"encoding/json"
    24  	"fmt"
    25  	"net/http"
    26  	"scheduler/config"
    27  	"scheduler/errors"
    28  	"scheduler/log"
    29  	"scheduler/scheduler"
    30  	"scheduler/types"
    31  	"scheduler/utils"
    32  )
    33  
    34  /*
    35   * Utils
    36   */
    37  
    38  func headersCheckSchedulerBypass(req *http.Request) bool {
    39  	return req.Header.Get(utils.HttpHeaderP2PFaaSSchedulerBypass) != ""
    40  }
    41  
    42  func headersCheckSchedulerForward(req *http.Request) bool {
    43  	return req.Header.Get(utils.HttpHeaderP2PFaaSSchedulerForward) != ""
    44  }
    45  
    46  func headersCheckSchedulerReject(req *http.Request) bool {
    47  	return req.Header.Get(utils.HttpHeaderP2PFaaSSchedulerReject) != ""
    48  }
    49  
    50  func HeadersGetRequestTracingId(req *http.Request) string {
    51  	return req.Header.Get(utils.HttpHeaderP2PFaaSSchedulerTracingId)
    52  }
    53  
    54  func HttpGetHeadersFromFramework() map[string]string {
    55  	return map[string]string{
    56  		utils.HttpHeaderP2PFaaSVersion:   config.AppVersion,
    57  		utils.HttpHeaderP2PFaaSScheduler: scheduler.GetName(),
    58  	}
    59  }
    60  
    61  func HttpGetHeadersFromJobResult(result *scheduler.JobResult) map[string]string {
    62  	if result == nil {
    63  		return map[string]string{}
    64  	}
    65  
    66  	// add base headers
    67  	output := map[string]string{
    68  		utils.HttpHeaderP2PFaaSScheduler: result.Scheduler,
    69  	}
    70  
    71  	// add headers from job result
    72  	if result.ResponseHeaders != nil {
    73  		output = utils.MapsMerge(output, *result.ResponseHeaders)
    74  	}
    75  
    76  	return output
    77  }
    78  
    79  func HttpGetHeadersXFromResponse(apiResponse *types.APIResponse) map[string]string {
    80  	output := map[string]string{}
    81  
    82  	if apiResponse == nil || apiResponse.Headers == nil {
    83  		log.Log.Debugf("[R#%d] apiResponse is nil or headers is nil")
    84  		return output
    85  	}
    86  
    87  	for key, value := range apiResponse.Headers {
    88  		if string(key[0]) == "X" {
    89  			output[key] = value[0]
    90  		}
    91  	}
    92  
    93  	return output
    94  }
    95  
    96  func HttpGetHeadersFunctionExecution(jobResult *scheduler.JobResult) map[string]string {
    97  	output := map[string]string{}
    98  
    99  	if jobResult == nil {
   100  		return output
   101  	}
   102  
   103  	// jobResult has been executed internally, so we have a single time
   104  	if !jobResult.ExternalExecution {
   105  		if jobResult.Timings == nil {
   106  			return output
   107  		}
   108  		// these are treated as lists
   109  		if jobResult.Timings.TotalTime != nil {
   110  			output[utils.HttpHeaderP2PFaaSTotalTimingsList] = fmt.Sprintf("[%f]", *jobResult.Timings.TotalTime)
   111  		}
   112  		if jobResult.Timings.SchedulingTime != nil {
   113  			output[utils.HttpHeaderP2PFaaSSchedulingTimingsList] = fmt.Sprintf("[%f]", *jobResult.Timings.SchedulingTime)
   114  		}
   115  		if jobResult.Timings.ProbingTime != nil {
   116  			output[utils.HttpHeaderP2PFaaSProbingTimingsList] = fmt.Sprintf("[%f]", *jobResult.Timings.ProbingTime)
   117  		}
   118  
   119  		// this is always a single value
   120  		if jobResult.Timings.ExecutionTime != nil {
   121  			output[utils.HttpHeaderP2PFaaSExecutionTime] = fmt.Sprintf("%f", *jobResult.Timings.ExecutionTime)
   122  		}
   123  	}
   124  
   125  	// jobResult has been executed externally, so we have a list of times
   126  	if jobResult.ExternalExecution {
   127  		hops := len(jobResult.ExternalExecutionInfo.PeersList) - 1
   128  		output[utils.HttpHeaderP2PFaaSExternallyExecuted] = "True"
   129  		output[utils.HttpHeaderP2PFaaSHops] = fmt.Sprintf("%d", hops)
   130  
   131  		if len(jobResult.ExternalExecutionInfo.PeersList) == 0 {
   132  			log.Log.Fatalf("Peers list is empty and the jobResult has been executed externally")
   133  		}
   134  
   135  		if jobResult.ExternalExecutionInfo.PeersList[0].Timings.ExecutionTime != nil {
   136  			output[utils.HttpHeaderP2PFaaSExecutionTime] = fmt.Sprintf("%f", *jobResult.ExternalExecutionInfo.PeersList[0].Timings.ExecutionTime)
   137  		}
   138  
   139  		var ipList []string
   140  		var idList []string
   141  		var probingTimes []float64
   142  		var totalTimes []float64
   143  		var schedulingTimes []float64
   144  
   145  		peers := jobResult.ExternalExecutionInfo.PeersList
   146  		for i := len(jobResult.ExternalExecutionInfo.PeersList) - 1; i >= 0; i-- {
   147  			ipList = append(ipList, peers[i].MachineIp)
   148  			idList = append(idList, peers[i].MachineId)
   149  
   150  			if peers[i].Timings.ProbingTime != nil {
   151  				probingTimes = append(probingTimes, *peers[i].Timings.ProbingTime)
   152  			} else {
   153  				probingTimes = append(probingTimes, 0.0)
   154  			}
   155  
   156  			if peers[i].Timings.TotalTime != nil {
   157  				totalTimes = append(totalTimes, *peers[i].Timings.TotalTime)
   158  			} else {
   159  				totalTimes = append(totalTimes, 0.0)
   160  			}
   161  
   162  			if peers[i].Timings.SchedulingTime != nil {
   163  				schedulingTimes = append(schedulingTimes, *peers[i].Timings.SchedulingTime)
   164  			} else {
   165  				schedulingTimes = append(schedulingTimes, 0.0)
   166  			}
   167  		}
   168  
   169  		ipListJ, _ := json.Marshal(ipList)
   170  		idListJ, _ := json.Marshal(idList)
   171  		totalTimesJ, _ := json.Marshal(totalTimes)
   172  		schedulingTimesJ, _ := json.Marshal(schedulingTimes)
   173  		probingTimesJ, _ := json.Marshal(probingTimes)
   174  
   175  		output[utils.HttpHeaderP2PFaaSPeersListIp] = fmt.Sprintf("%s", string(ipListJ))
   176  		output[utils.HttpHeaderP2PFaaSPeersListId] = fmt.Sprintf("%s", string(idListJ))
   177  
   178  		output[utils.HttpHeaderP2PFaaSTotalTimingsList] = fmt.Sprintf("%s", string(totalTimesJ))
   179  		output[utils.HttpHeaderP2PFaaSProbingTimingsList] = fmt.Sprintf("%s", string(probingTimesJ))
   180  		output[utils.HttpHeaderP2PFaaSSchedulingTimingsList] = fmt.Sprintf("%s", string(schedulingTimesJ))
   181  	}
   182  
   183  	return output
   184  }
   185  
   186  func ReplyWithErrorFromJobResult(w *http.ResponseWriter, errorCode int, jobResult *scheduler.JobResult, message string) {
   187  	finalHeaders := HttpGetHeadersFromFramework()
   188  
   189  	// add headers from result
   190  	if jobResult != nil {
   191  		finalHeaders = utils.MapsMerge(
   192  			finalHeaders,
   193  			HttpGetHeadersFromJobResult(jobResult),
   194  		)
   195  
   196  		if jobResult.Response != nil {
   197  			finalHeaders = utils.MapsMerge(
   198  				finalHeaders,
   199  				HttpGetHeadersXFromResponse(jobResult.Response),
   200  			)
   201  		}
   202  	}
   203  
   204  	utils.HttpAddHeadersToResponse(w, &finalHeaders)
   205  
   206  	if message != "" {
   207  		errors.ReplyWithErrorMessage(w, errorCode, message, nil)
   208  	} else {
   209  		errors.ReplyWithError(w, errorCode, nil)
   210  	}
   211  }
   212  
   213  func ReplyWithBodyFromJobResult(serviceRequest *types.ServiceRequest, w *http.ResponseWriter, jobResult *scheduler.JobResult) {
   214  	var err error
   215  
   216  	// add headers
   217  	finalHeaders := utils.MapsMerge(
   218  		HttpGetHeadersFromFramework(),
   219  		HttpGetHeadersFromJobResult(jobResult),
   220  		HttpGetHeadersXFromResponse(jobResult.Response),
   221  		HttpGetHeadersFunctionExecution(jobResult),
   222  	)
   223  	utils.HttpAddHeadersToResponse(w, &finalHeaders)
   224  
   225  	(*w).WriteHeader(jobResult.Response.StatusCode)
   226  
   227  	// check if we need to write the body output
   228  	if jobResult.Response.Body != nil && len(jobResult.Response.Body) > 0 {
   229  		log.Log.Debugf("[R#%d,T%s] Job body has length %d, external=%t", serviceRequest.Id, serviceRequest.IdTracing, len(jobResult.Response.Body), jobResult.ExternalExecution)
   230  
   231  		var outputBody []byte
   232  
   233  		// decode the job output if it has been executed externally, since when a node offload a jobs, the remote note
   234  		// will reply with base64 encoded body
   235  		if jobResult.ExternalExecution {
   236  			outputBody, err = base64.StdEncoding.DecodeString(string(jobResult.Response.Body))
   237  			if err != nil {
   238  				log.Log.Errorf("[R#%d,T%s] Cannot decode job output: %s", serviceRequest.IdTracing, serviceRequest.Id, err)
   239  				return
   240  			}
   241  		} else {
   242  			outputBody = jobResult.Response.Body
   243  		}
   244  
   245  		// Write response
   246  		_, err = (*w).Write(outputBody)
   247  		if err != nil {
   248  			log.Log.Errorf("[R#%d,T%s] Cannot write job output: %s", serviceRequest.Id, serviceRequest.IdTracing, err.Error())
   249  			return
   250  		}
   251  	}
   252  }
   253  

View as plain text