1
18
19 package service_learning
20
21 import (
22 "fmt"
23 "github.com/gorilla/websocket"
24 "scheduler/config"
25 "scheduler/log"
26 "scheduler/utils"
27 "strconv"
28 "strings"
29 "sync"
30 "time"
31 )
32
33 type SocketActRequest struct {
34 entryAct *EntryAct
35 doneMutex *sync.WaitGroup
36
37 outputAction *EntryActOutput
38 outputError error
39 }
40
41 const SocketPoolSize = 20
42
43 var socketPool []*websocket.Conn
44 var socketPoolBusy []bool
45 var socketPoolMutex = sync.Mutex{}
46
47 var socketServerInit = false
48
49 var socketPendingRequestsList []*SocketActRequest
50 var socketPendingRequestsListMutex = sync.Mutex{}
51
52 var socketConsumersSemaphore = make(utils.Semaphore, SocketPoolSize)
53
54 var socketFreeSlotsSemaphore = make(utils.Semaphore, SocketPoolSize*3)
55 var socketBusySlotsSemaphore = make(utils.Semaphore, 0)
56
57 func Start() {
58 var err error
59 var conn *websocket.Conn
60
61 log.Log.Debugf("Starting learner service socket pool")
62
63
64 socketPool = []*websocket.Conn{}
65 socketPoolBusy = []bool{}
66 socketPoolMutex = sync.Mutex{}
67
68 socketPendingRequestsList = []*SocketActRequest{}
69 socketPendingRequestsListMutex = sync.Mutex{}
70
71 socketConsumersSemaphore = make(utils.Semaphore, SocketPoolSize)
72 socketFreeSlotsSemaphore = make(utils.Semaphore, SocketPoolSize*3)
73 socketBusySlotsSemaphore = make(utils.Semaphore, 0)
74
75
76 for i := 0; i < SocketPoolSize; i++ {
77 for {
78 conn, err = socketConnect()
79 if err != nil {
80 log.Log.Errorf("Cannot connect to learner socket #%d, retrying in 5 seconds: err=%s", i, err)
81 time.Sleep(5 * time.Second)
82 continue
83 }
84
85 log.Log.Infof("Initialized socket #%d: addr=%s==>%s", i+1, conn.LocalAddr().String(), conn.RemoteAddr().String())
86 break
87 }
88
89 socketPool = append(socketPool, conn)
90 socketPoolBusy = append(socketPoolBusy, false)
91 }
92
93 log.Log.Infof("Successfully prepared socket pool for learning service")
94
95 socketServerInit = true
96
97 go socketLooper()
98 }
99
100 func Stop() {
101 if !socketServerInit {
102 return
103 }
104
105 var err error
106
107 log.Log.Debugf("Closing socket pool gracefully: len(socketPool)=%d", len(socketPool))
108
109 for i, sock := range socketPool {
110 err = sock.Close()
111 if err != nil {
112 log.Log.Warningf("Cannot close socket #%d: err=%s", i, err)
113 }
114 }
115
116 socketServerInit = false
117
118
119 socketBusySlotsSemaphore.Signal()
120 }
121
122
123 func SocketAct(act *EntryAct) (*EntryActOutput, error) {
124 req := SocketActRequest{
125 entryAct: act,
126 doneMutex: &sync.WaitGroup{},
127
128 outputAction: &EntryActOutput{},
129 }
130 req.doneMutex.Add(1)
131
132
133 socketFreeSlotsSemaphore.Wait(1)
134
135
136 socketPendingRequestsListMutex.Lock()
137 socketPendingRequestsList = append(socketPendingRequestsList, &req)
138 socketPendingRequestsListMutex.Unlock()
139
140
141 socketBusySlotsSemaphore.Signal()
142
143
144 req.doneMutex.Wait()
145
146
147 return req.outputAction, nil
148 }
149
150
153
154 func socketConnect() (*websocket.Conn, error) {
155 var err error
156 var socket *websocket.Conn
157
158 socketUrl := fmt.Sprintf("ws://%s:8765", config.GetServiceLearningListeningHost())
159
160 log.Log.Debugf("Trying to connect to websocket to: %s", socketUrl)
161
162 socket, _, err = websocket.DefaultDialer.Dial(socketUrl, nil)
163 if err != nil {
164 log.Log.Errorf("Error connecting to websocket server:", err)
165 return nil, err
166 }
167
168 return socket, nil
169 }
170
171 func socketLooper() {
172 log.Log.Infof("Started looper for learning requests")
173
174 for {
175 log.Log.Debugf("Waiting for free socket")
176
177 socketConsumersSemaphore.Wait(1)
178
179 log.Log.Debugf("Waiting for request to process")
180
181 socketBusySlotsSemaphore.Wait(1)
182
183 if !socketServerInit {
184 log.Log.Infof("Exiting from socketLooper")
185 break
186 }
187
188 log.Log.Debugf("Processing request")
189
190 socketPendingRequestsListMutex.Lock()
191
192 requestToProcess := socketPendingRequestsList[0]
193 socketPendingRequestsList = socketPendingRequestsList[1:]
194 go socketProcessRequest(requestToProcess)
195
196 socketPendingRequestsListMutex.Unlock()
197
198
199 socketFreeSlotsSemaphore.Signal()
200 }
201 }
202
203 func socketProcessRequest(req *SocketActRequest) {
204 var err error
205 var msg []byte
206
207
208 bookedSlotIndex := socketPoolSlotBook()
209
210 log.Log.Debugf("Processing request: bookedSlotIndex=%d req=%s", bookedSlotIndex, *req)
211
212
213 connection := socketPool[bookedSlotIndex]
214
215
216 defer func() {
217 log.Log.Debugf("Releasing resources for bookedSlotIndex=%d", bookedSlotIndex)
218
219 socketPoolSlotRelease(bookedSlotIndex)
220 socketConsumersSemaphore.Signal()
221 req.doneMutex.Done()
222 }()
223
224 firstTime := true
225
226 for {
227
228 if !firstTime {
229 log.Log.Infof("Socket #%d recreating...", bookedSlotIndex)
230
231 _ = connection.Close()
232
233
234 socketPool[bookedSlotIndex], err = socketConnect()
235 if err != nil {
236 log.Log.Errorf("Cannot re-create the socket, giving up: %s", err)
237 req.outputError = err
238 return
239 }
240
241 connection = socketPool[bookedSlotIndex]
242 log.Log.Infof("Socket #%d recreated successfully", bookedSlotIndex)
243 }
244
245
246
247 err = connection.WriteJSON(req.entryAct.State)
248 if err != nil {
249 req.outputError = err
250 log.Log.Errorf("Cannot write message to socket (firstTime=%v): %s", firstTime, err)
251
252 if !firstTime {
253 return
254 }
255
256 firstTime = false
257 continue
258 }
259
260
261 _, msg, err = connection.ReadMessage()
262 if err != nil {
263 req.outputError = err
264 log.Log.Errorf("Cannot parse read reply message (firstTime=%v): %s", firstTime, err)
265
266 if !firstTime {
267 return
268 }
269
270 firstTime = false
271 continue
272 }
273
274
275 msgComponents := strings.Split(string(msg), ",")
276 req.outputAction.Action, err = strconv.ParseFloat(msgComponents[0], 64)
277 req.outputAction.Eps, err = strconv.ParseFloat(msgComponents[1], 64)
278 if err != nil {
279 req.outputError = err
280 log.Log.Errorf("Cannot parse float from reply message (firstTime=%v): %s", firstTime, err)
281
282 if !firstTime {
283 return
284 }
285
286 firstTime = false
287 continue
288 }
289
290 break
291 }
292
293 }
294
295 func socketPoolSlotBook() int64 {
296 socketPoolMutex.Lock()
297 i := 0
298 for {
299 if !socketPoolBusy[i] {
300 socketPoolBusy[i] = true
301 socketPoolMutex.Unlock()
302 return int64(i)
303 }
304
305 i = (i + 1) % SocketPoolSize
306 }
307 }
308
309 func socketPoolSlotRelease(index int64) {
310 socketPoolMutex.Lock()
311 socketPoolBusy[index] = false
312 socketPoolMutex.Unlock()
313 }
314
View as plain text