...
1
18
19 package scheduler
20
21 import (
22 "fmt"
23 "scheduler/log"
24 "scheduler/service_discovery"
25 "scheduler/types"
26 "time"
27 )
28
29 const ForwardSchedulerName = "ForwardScheduler"
30
31
32 type ForwardScheduler struct {
33
34 MaxHops uint
35 }
36
37 func (s ForwardScheduler) GetFullName() string {
38 return fmt.Sprintf("%s(%d)", ForwardSchedulerName, s.MaxHops)
39 }
40
41 func (s ForwardScheduler) GetScheduler() *types.SchedulerDescriptor {
42 return &types.SchedulerDescriptor{
43 Name: ForwardSchedulerName,
44 Parameters: []string{
45 fmt.Sprintf("%d", s.MaxHops),
46 },
47 }
48 }
49
50
51 func (s ForwardScheduler) Schedule(req *types.ServiceRequest) (*JobResult, error) {
52 log.Log.Debugf("Scheduling job %s", req.ServiceName)
53 now := time.Now()
54 timingsStart := types.TimingsStart{ArrivedAt: &now}
55
56 jobMustExecutedHere := req.External && req.ExternalJobRequest.Hops >= int(s.MaxHops)
57
58
59 if !jobMustExecutedHere {
60
61 startedProbingTime := time.Now()
62 timingsStart.ProbingStartedAt = &startedProbingTime
63
64 randomMachine, err := service_discovery.GetNRandomMachines(1, true)
65
66 endProbingTime := time.Now()
67 timingsStart.ProbingEndedAt = &endProbingTime
68 if err != nil {
69 log.Log.Debugf("Error in retrieving machines %s", err.Error())
70 return executeJobLocally(req, &timingsStart, s.GetFullName())
71 }
72 if len(randomMachine) == 0 {
73 log.Log.Debugf("No random machines retrieved")
74 return executeJobLocally(req, &timingsStart, s.GetFullName())
75 }
76 log.Log.Debugf("Forwarding to random machine: %s", randomMachine)
77 return executeJobExternally(req, randomMachine[0], &timingsStart, s.GetFullName())
78 }
79
80 return executeJobLocally(req, &timingsStart, s.GetFullName())
81 }
82
View as plain text