...

Source file src/scheduler/queue/queue.go

Documentation: scheduler/queue

     1  /*
     2   * P2PFaaS - A framework for FaaS Load Balancing
     3   * Copyright (c) 2019 - 2022. Gabriele Proietti Mattia <pm.gabriele@outlook.com>
     4   *
     5   * This program is free software: you can redistribute it and/or modify
     6   * it under the terms of the GNU General Public License as published by
     7   * the Free Software Foundation, either version 3 of the License, or
     8   * (at your option) any later version.
     9   *
    10   * This program is distributed in the hope that it will be useful,
    11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
    12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13   * GNU General Public License for more details.
    14   *
    15   * You should have received a copy of the GNU General Public License
    16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
    17   */
    18  
    19  // Package queue implement a producer consumer queue for lossless models.
    20  package queue
    21  
    22  import (
    23  	"scheduler/config"
    24  	"scheduler/log"
    25  	"scheduler/memdb"
    26  	"scheduler/metrics"
    27  	"scheduler/types"
    28  	"scheduler/utils"
    29  	"sync"
    30  	"time"
    31  )
    32  
    33  var jobsQueue []*QueuedJob
    34  var jobsQueueLength = 0
    35  var jobsQueueLengthOfTypes = make(map[int64]int64)
    36  
    37  // implementing N producers fixed N consumers
    38  
    39  var mutex sync.Mutex
    40  
    41  var jobsSem = make(utils.Semaphore, 0)
    42  var consumersSem = make(utils.Semaphore, config.GetRunningFunctionMax())
    43  
    44  func init() {
    45  	// init metrics
    46  	metrics.PostParallelJobsSlots(int(config.GetRunningFunctionMax()))
    47  	metrics.PostQueueSize(int(config.GetQueueLengthMax()))
    48  }
    49  
    50  // EnqueueJob enqueues the passed job in the queue and it blocks the caller until the job has been executed
    51  func EnqueueJob(request *types.ServiceRequest) (*QueuedJob, error) {
    52  	mutex.Lock()
    53  
    54  	if jobsQueueLength > 0 && jobsQueueLength >= int(config.GetQueueLengthMax()) {
    55  		log.Log.Debugf("[R#%d] Cannot enqueue job %s, queue is full", request.Id, request.ServiceName)
    56  		mutex.Unlock()
    57  		return nil, ErrorFull{}
    58  	}
    59  
    60  	// critical section
    61  	sem := make(utils.Semaphore, 0)
    62  	job := &QueuedJob{
    63  		Request:   request,
    64  		Semaphore: &sem,
    65  		Timings: &Timings{
    66  			ExecutionTime:     0.0,
    67  			FaasExecutionTime: 0.0,
    68  			QueueTime:         0.0,
    69  		},
    70  	}
    71  
    72  	jobsQueue = append(jobsQueue, job)
    73  	jobsQueueLength += 1
    74  	lengthIncreaseOfType(request.ServiceType)
    75  
    76  	log.Log.Debugf("[R#%d] Enqueued job %s", job.Request.Id, job.Request.ServiceName)
    77  
    78  	// metrics
    79  	metrics.PostQueueAssignedSlot()
    80  
    81  	// end critical section
    82  	mutex.Unlock()
    83  
    84  	// add a job
    85  	jobsSem.Signal()
    86  
    87  	// start time
    88  	startQueueTime := time.Now()
    89  
    90  	// lock until job is completed
    91  	job.Semaphore.Wait(1)
    92  
    93  	// stop time
    94  	job.Timings.QueueTime = time.Since(startQueueTime).Seconds()
    95  
    96  	return job, nil
    97  }
    98  
    99  func dequeueJob() *QueuedJob {
   100  	jobsSem.Wait(1)
   101  	mutex.Lock()
   102  
   103  	job := jobsQueue[0]
   104  
   105  	if len(jobsQueue) == 1 {
   106  		jobsQueue = []*QueuedJob{}
   107  	} else {
   108  		jobsQueue = jobsQueue[1:]
   109  	}
   110  
   111  	jobsQueueLength -= 1
   112  	lengthDecreaseOfType(job.Request.ServiceType)
   113  
   114  	// metrics
   115  	metrics.PostQueueFreedSlot()
   116  
   117  	mutex.Unlock()
   118  	return job
   119  }
   120  
   121  /*
   122   * Utils
   123   */
   124  
   125  func GetLength() int {
   126  	return jobsQueueLength
   127  }
   128  
   129  func GetLengthOfTypes() map[int64]int64 {
   130  	out := make(map[int64]int64)
   131  
   132  	mutex.Lock()
   133  	for index, element := range jobsQueueLengthOfTypes {
   134  		out[index] = element
   135  	}
   136  	mutex.Unlock()
   137  
   138  	return out
   139  }
   140  
   141  /*
   142   * Internal
   143   */
   144  
   145  func lengthIncreaseOfType(jobType int64) {
   146  	num, exists := jobsQueueLengthOfTypes[jobType]
   147  	if !exists {
   148  		jobsQueueLengthOfTypes[jobType] = 1
   149  		return
   150  	}
   151  
   152  	jobsQueueLengthOfTypes[jobType] = num + 1
   153  }
   154  
   155  func lengthDecreaseOfType(jobType int64) {
   156  	num, exists := jobsQueueLengthOfTypes[jobType]
   157  	if !exists {
   158  		return
   159  	}
   160  
   161  	jobsQueueLengthOfTypes[jobType] = num - 1
   162  }
   163  
   164  /*
   165   * Core
   166   */
   167  
   168  func Looper() {
   169  	for {
   170  		// Block here if we do not have consumers
   171  		consumersSem.Wait(1)
   172  		log.Log.Debugf("Consumer available! Queue has %d jobs in queue and %d running", GetLength(), memdb.GetTotalRunningFunctions())
   173  
   174  		// Block here if we do not have jobs
   175  		job := dequeueJob()
   176  		log.Log.Debugf("QueuedJob available! Queue has %d jobs in queue and %d running", GetLength(), memdb.GetTotalRunningFunctions())
   177  
   178  		// Execute the job
   179  		go executeNow(job)
   180  
   181  		// If job is executed we will release the consumersSem in the executeNow thread
   182  	}
   183  }
   184  

View as plain text