...

Source file src/scheduler/scheduler.go

Documentation: scheduler

     1  /*
     2   * P2PFaaS - A framework for FaaS Load Balancing
     3   * Copyright (c) 2019 - 2022. Gabriele Proietti Mattia <pm.gabriele@outlook.com>
     4   *
     5   * This program is free software: you can redistribute it and/or modify
     6   * it under the terms of the GNU General Public License as published by
     7   * the Free Software Foundation, either version 3 of the License, or
     8   * (at your option) any later version.
     9   *
    10   * This program is distributed in the hope that it will be useful,
    11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
    12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13   * GNU General Public License for more details.
    14   *
    15   * You should have received a copy of the GNU General Public License
    16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
    17   */
    18  
    19  // Package main is the entrypoint of the scheduler service
    20  package main
    21  
    22  import (
    23  	"fmt"
    24  	"github.com/gorilla/mux"
    25  	"net/http"
    26  	"os"
    27  	"scheduler/api"
    28  	"scheduler/api/api_monitoring"
    29  	"scheduler/api/api_peer"
    30  	"scheduler/config"
    31  	"scheduler/log"
    32  	"scheduler/queue"
    33  	"scheduler/scheduler"
    34  	"scheduler/service_discovery"
    35  	"strings"
    36  	"sync"
    37  )
    38  
    39  import _ "net/http/pprof"
    40  
    41  var wg sync.WaitGroup
    42  
    43  func main() {
    44  	wg.Add(2)
    45  
    46  	// init modules
    47  	config.Start()
    48  	scheduler.Start()
    49  	service_discovery.Start()
    50  	// metrics.Start()
    51  
    52  	go worker()
    53  	go server()
    54  
    55  	// Check if profiling should be enabled
    56  	if strings.ToLower(os.Getenv(config.EnvProfiling)) == "true" {
    57  		go pprof()
    58  	}
    59  
    60  	log.Log.Infof("Started p2p-fog scheduler v" + config.AppVersion)
    61  
    62  	wg.Wait()
    63  }
    64  
    65  func server() {
    66  	log.Log.Debugf("Starting webserver thread")
    67  
    68  	// init api
    69  	router := mux.NewRouter()
    70  	router.HandleFunc("/", api.Hello).Methods("GET", "POST")
    71  	// OpenFaaS APIs
    72  	router.HandleFunc("/system/functions", api.SystemFunctionsGet).Methods("GET")
    73  	router.HandleFunc("/system/functions", api.SystemFunctionsPost).Methods("POST")
    74  	router.HandleFunc("/system/functions", api.SystemFunctionsPut).Methods("PUT")
    75  	router.HandleFunc("/system/functions", api.SystemFunctionsDelete).Methods("DELETE")
    76  	router.HandleFunc("/system/function/{function}", api.SystemFunctionGet).Methods("GET")
    77  	router.HandleFunc("/system/scale-function/{function}", api.SystemScaleFunctionPost).Methods("POST")
    78  	router.HandleFunc("/function/{function}", api.FunctionPost).Methods("POST")
    79  	router.HandleFunc("/function/{function}", api.FunctionGet).Methods("GET")
    80  	// new APIs
    81  	router.HandleFunc("/monitoring/load", api_monitoring.LoadGetLoad).Methods("GET")
    82  	router.HandleFunc("/monitoring/scale-delay/{function}", api_monitoring.ScaleDelay).Methods("GET")
    83  	router.HandleFunc("/peer/function/{function}", api_peer.FunctionExecute).Methods("POST")
    84  	// prometheus
    85  	// router.Handle("/metrics", promhttp.Handler())
    86  	// dev apis
    87  	router.HandleFunc("/configuration", api.GetConfiguration).Methods("GET")
    88  	router.HandleFunc("/configuration/scheduler", api.GetScheduler).Methods("GET")
    89  	// TODO add auth check on configuration APIs
    90  	// if config.Configuration.GetRunningEnvironment() == config.RunningEnvironmentDevelopment {
    91  	router.HandleFunc("/configuration", api.SetConfiguration).Methods("POST")
    92  	router.HandleFunc("/configuration/scheduler", api.SetScheduler).Methods("POST")
    93  	// }
    94  
    95  	// dev apis
    96  	if config.GetRunningEnvironment() == config.RunningEnvironmentDevelopment {
    97  		router.HandleFunc("/dev/learning/act", api.LearningDevTestAct).Methods("GET")
    98  		router.HandleFunc("/dev/learning/act_ws", api.LearningDevTestActWs).Methods("GET")
    99  		router.HandleFunc("/dev/learning/train", api.LearningDevTestTrain).Methods("GET")
   100  
   101  		router.HandleFunc("/dev/http/get", api.HttpDevGet).Methods("GET")
   102  		router.HandleFunc("/dev/http/post", api.HttpDevPost).Methods("POST")
   103  
   104  		router.HandleFunc("/dev/test/parallel", api.TestDevParallelRequests).Methods("GET")
   105  	}
   106  
   107  	server := &http.Server{
   108  		Addr:    fmt.Sprintf("0.0.0.0:%d", config.GetListeningPort()),
   109  		Handler: router,
   110  	}
   111  	server.SetKeepAlivesEnabled(false)
   112  
   113  	log.Log.Infof("Started listening on %d", config.GetListeningPort())
   114  	err := server.ListenAndServe()
   115  
   116  	log.Log.Fatalf("Error while starting server: %s", err)
   117  	wg.Done()
   118  }
   119  
   120  func worker() {
   121  	log.Log.Debugf("Starting queue worker thread")
   122  	queue.Looper()
   123  	wg.Done()
   124  }
   125  
   126  func pprof() {
   127  	// pprof
   128  	_ = http.ListenAndServe("0.0.0.0:16060", nil)
   129  }
   130  

View as plain text