...
Source file
src/scheduler/scheduler.go
1
18
19
20 package main
21
22 import (
23 "fmt"
24 "github.com/gorilla/mux"
25 "net/http"
26 "os"
27 "scheduler/api"
28 "scheduler/api/api_monitoring"
29 "scheduler/api/api_peer"
30 "scheduler/config"
31 "scheduler/log"
32 "scheduler/queue"
33 "scheduler/scheduler"
34 "scheduler/service_discovery"
35 "strings"
36 "sync"
37 )
38
39 import _ "net/http/pprof"
40
41 var wg sync.WaitGroup
42
43 func main() {
44 wg.Add(2)
45
46
47 config.Start()
48 scheduler.Start()
49 service_discovery.Start()
50
51
52 go worker()
53 go server()
54
55
56 if strings.ToLower(os.Getenv(config.EnvProfiling)) == "true" {
57 go pprof()
58 }
59
60 log.Log.Infof("Started p2p-fog scheduler v" + config.AppVersion)
61
62 wg.Wait()
63 }
64
65 func server() {
66 log.Log.Debugf("Starting webserver thread")
67
68
69 router := mux.NewRouter()
70 router.HandleFunc("/", api.Hello).Methods("GET", "POST")
71
72 router.HandleFunc("/system/functions", api.SystemFunctionsGet).Methods("GET")
73 router.HandleFunc("/system/functions", api.SystemFunctionsPost).Methods("POST")
74 router.HandleFunc("/system/functions", api.SystemFunctionsPut).Methods("PUT")
75 router.HandleFunc("/system/functions", api.SystemFunctionsDelete).Methods("DELETE")
76 router.HandleFunc("/system/function/{function}", api.SystemFunctionGet).Methods("GET")
77 router.HandleFunc("/system/scale-function/{function}", api.SystemScaleFunctionPost).Methods("POST")
78 router.HandleFunc("/function/{function}", api.FunctionPost).Methods("POST")
79 router.HandleFunc("/function/{function}", api.FunctionGet).Methods("GET")
80
81 router.HandleFunc("/monitoring/load", api_monitoring.LoadGetLoad).Methods("GET")
82 router.HandleFunc("/monitoring/scale-delay/{function}", api_monitoring.ScaleDelay).Methods("GET")
83 router.HandleFunc("/peer/function/{function}", api_peer.FunctionExecute).Methods("POST")
84
85
86
87 router.HandleFunc("/configuration", api.GetConfiguration).Methods("GET")
88 router.HandleFunc("/configuration/scheduler", api.GetScheduler).Methods("GET")
89
90
91 router.HandleFunc("/configuration", api.SetConfiguration).Methods("POST")
92 router.HandleFunc("/configuration/scheduler", api.SetScheduler).Methods("POST")
93
94
95
96 if config.GetRunningEnvironment() == config.RunningEnvironmentDevelopment {
97 router.HandleFunc("/dev/learning/act", api.LearningDevTestAct).Methods("GET")
98 router.HandleFunc("/dev/learning/act_ws", api.LearningDevTestActWs).Methods("GET")
99 router.HandleFunc("/dev/learning/train", api.LearningDevTestTrain).Methods("GET")
100
101 router.HandleFunc("/dev/http/get", api.HttpDevGet).Methods("GET")
102 router.HandleFunc("/dev/http/post", api.HttpDevPost).Methods("POST")
103
104 router.HandleFunc("/dev/test/parallel", api.TestDevParallelRequests).Methods("GET")
105 }
106
107 server := &http.Server{
108 Addr: fmt.Sprintf("0.0.0.0:%d", config.GetListeningPort()),
109 Handler: router,
110 }
111 server.SetKeepAlivesEnabled(false)
112
113 log.Log.Infof("Started listening on %d", config.GetListeningPort())
114 err := server.ListenAndServe()
115
116 log.Log.Fatalf("Error while starting server: %s", err)
117 wg.Done()
118 }
119
120 func worker() {
121 log.Log.Debugf("Starting queue worker thread")
122 queue.Looper()
123 wg.Done()
124 }
125
126 func pprof() {
127
128 _ = http.ListenAndServe("0.0.0.0:16060", nil)
129 }
130
View as plain text