...
  
  
     1  
    18  
    19  
    20  package queue
    21  
    22  import (
    23  	"scheduler/config"
    24  	"scheduler/log"
    25  	"scheduler/memdb"
    26  	"scheduler/metrics"
    27  	"scheduler/types"
    28  	"scheduler/utils"
    29  	"sync"
    30  	"time"
    31  )
    32  
    33  var jobsQueue []*QueuedJob
    34  var jobsQueueLength = 0
    35  var jobsQueueLengthOfTypes = make(map[int64]int64)
    36  
    37  
    38  
    39  var mutex sync.Mutex
    40  
    41  var jobsSem = make(utils.Semaphore, 0)
    42  var consumersSem = make(utils.Semaphore, config.GetRunningFunctionMax())
    43  
    44  func init() {
    45  	
    46  	metrics.PostParallelJobsSlots(int(config.GetRunningFunctionMax()))
    47  	metrics.PostQueueSize(int(config.GetQueueLengthMax()))
    48  }
    49  
    50  
    51  func EnqueueJob(request *types.ServiceRequest) (*QueuedJob, error) {
    52  	mutex.Lock()
    53  
    54  	if jobsQueueLength > 0 && jobsQueueLength >= int(config.GetQueueLengthMax()) {
    55  		log.Log.Debugf("[R#%d] Cannot enqueue job %s, queue is full", request.Id, request.ServiceName)
    56  		mutex.Unlock()
    57  		return nil, ErrorFull{}
    58  	}
    59  
    60  	
    61  	sem := make(utils.Semaphore, 0)
    62  	job := &QueuedJob{
    63  		Request:   request,
    64  		Semaphore: &sem,
    65  		Timings: &Timings{
    66  			ExecutionTime:     0.0,
    67  			FaasExecutionTime: 0.0,
    68  			QueueTime:         0.0,
    69  		},
    70  	}
    71  
    72  	jobsQueue = append(jobsQueue, job)
    73  	jobsQueueLength += 1
    74  	lengthIncreaseOfType(request.ServiceType)
    75  
    76  	log.Log.Debugf("[R#%d] Enqueued job %s", job.Request.Id, job.Request.ServiceName)
    77  
    78  	
    79  	metrics.PostQueueAssignedSlot()
    80  
    81  	
    82  	mutex.Unlock()
    83  
    84  	
    85  	jobsSem.Signal()
    86  
    87  	
    88  	startQueueTime := time.Now()
    89  
    90  	
    91  	job.Semaphore.Wait(1)
    92  
    93  	
    94  	job.Timings.QueueTime = time.Since(startQueueTime).Seconds()
    95  
    96  	return job, nil
    97  }
    98  
    99  func dequeueJob() *QueuedJob {
   100  	jobsSem.Wait(1)
   101  	mutex.Lock()
   102  
   103  	job := jobsQueue[0]
   104  
   105  	if len(jobsQueue) == 1 {
   106  		jobsQueue = []*QueuedJob{}
   107  	} else {
   108  		jobsQueue = jobsQueue[1:]
   109  	}
   110  
   111  	jobsQueueLength -= 1
   112  	lengthDecreaseOfType(job.Request.ServiceType)
   113  
   114  	
   115  	metrics.PostQueueFreedSlot()
   116  
   117  	mutex.Unlock()
   118  	return job
   119  }
   120  
   121  
   124  
   125  func GetLength() int {
   126  	return jobsQueueLength
   127  }
   128  
   129  func GetLengthOfTypes() map[int64]int64 {
   130  	out := make(map[int64]int64)
   131  
   132  	mutex.Lock()
   133  	for index, element := range jobsQueueLengthOfTypes {
   134  		out[index] = element
   135  	}
   136  	mutex.Unlock()
   137  
   138  	return out
   139  }
   140  
   141  
   144  
   145  func lengthIncreaseOfType(jobType int64) {
   146  	num, exists := jobsQueueLengthOfTypes[jobType]
   147  	if !exists {
   148  		jobsQueueLengthOfTypes[jobType] = 1
   149  		return
   150  	}
   151  
   152  	jobsQueueLengthOfTypes[jobType] = num + 1
   153  }
   154  
   155  func lengthDecreaseOfType(jobType int64) {
   156  	num, exists := jobsQueueLengthOfTypes[jobType]
   157  	if !exists {
   158  		return
   159  	}
   160  
   161  	jobsQueueLengthOfTypes[jobType] = num - 1
   162  }
   163  
   164  
   167  
   168  func Looper() {
   169  	for {
   170  		
   171  		consumersSem.Wait(1)
   172  		log.Log.Debugf("Consumer available! Queue has %d jobs in queue and %d running", GetLength(), memdb.GetTotalRunningFunctions())
   173  
   174  		
   175  		job := dequeueJob()
   176  		log.Log.Debugf("QueuedJob available! Queue has %d jobs in queue and %d running", GetLength(), memdb.GetTotalRunningFunctions())
   177  
   178  		
   179  		go executeNow(job)
   180  
   181  		
   182  	}
   183  }
   184  
View as plain text