...

Source file src/scheduler/service_learning/socket.go

Documentation: scheduler/service_learning

     1  /*
     2   * P2PFaaS - A framework for FaaS Load Balancing
     3   * Copyright (c) 2019 - 2022. Gabriele Proietti Mattia <pm.gabriele@outlook.com>
     4   *
     5   * This program is free software: you can redistribute it and/or modify
     6   * it under the terms of the GNU General Public License as published by
     7   * the Free Software Foundation, either version 3 of the License, or
     8   * (at your option) any later version.
     9   *
    10   * This program is distributed in the hope that it will be useful,
    11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
    12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13   * GNU General Public License for more details.
    14   *
    15   * You should have received a copy of the GNU General Public License
    16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
    17   */
    18  
    19  package service_learning
    20  
    21  import (
    22  	"fmt"
    23  	"github.com/gorilla/websocket"
    24  	"scheduler/config"
    25  	"scheduler/log"
    26  	"scheduler/utils"
    27  	"strconv"
    28  	"strings"
    29  	"sync"
    30  	"time"
    31  )
    32  
    33  type SocketActRequest struct {
    34  	entryAct  *EntryAct
    35  	doneMutex *sync.WaitGroup
    36  
    37  	outputAction *EntryActOutput
    38  	outputError  error
    39  }
    40  
    41  const SocketPoolSize = 20
    42  
    43  var socketPool []*websocket.Conn
    44  var socketPoolBusy []bool
    45  var socketPoolMutex = sync.Mutex{}
    46  
    47  var socketServerInit = false
    48  
    49  var socketPendingRequestsList []*SocketActRequest
    50  var socketPendingRequestsListMutex = sync.Mutex{}
    51  
    52  var socketConsumersSemaphore = make(utils.Semaphore, SocketPoolSize)
    53  
    54  var socketFreeSlotsSemaphore = make(utils.Semaphore, SocketPoolSize*3) // queue len for act requests
    55  var socketBusySlotsSemaphore = make(utils.Semaphore, 0)
    56  
    57  func Start() {
    58  	var err error
    59  	var conn *websocket.Conn
    60  
    61  	log.Log.Debugf("Starting learner service socket pool")
    62  
    63  	// reset everything
    64  	socketPool = []*websocket.Conn{}
    65  	socketPoolBusy = []bool{}
    66  	socketPoolMutex = sync.Mutex{}
    67  
    68  	socketPendingRequestsList = []*SocketActRequest{}
    69  	socketPendingRequestsListMutex = sync.Mutex{}
    70  
    71  	socketConsumersSemaphore = make(utils.Semaphore, SocketPoolSize)
    72  	socketFreeSlotsSemaphore = make(utils.Semaphore, SocketPoolSize*3) // queue len for act requests
    73  	socketBusySlotsSemaphore = make(utils.Semaphore, 0)
    74  
    75  	// try to connect
    76  	for i := 0; i < SocketPoolSize; i++ {
    77  		for {
    78  			conn, err = socketConnect()
    79  			if err != nil {
    80  				log.Log.Errorf("Cannot connect to learner socket #%d, retrying in 5 seconds: err=%s", i, err)
    81  				time.Sleep(5 * time.Second)
    82  				continue
    83  			}
    84  
    85  			log.Log.Infof("Initialized socket #%d: addr=%s==>%s", i+1, conn.LocalAddr().String(), conn.RemoteAddr().String())
    86  			break
    87  		}
    88  
    89  		socketPool = append(socketPool, conn)
    90  		socketPoolBusy = append(socketPoolBusy, false)
    91  	}
    92  
    93  	log.Log.Infof("Successfully prepared socket pool for learning service")
    94  
    95  	socketServerInit = true
    96  
    97  	go socketLooper()
    98  }
    99  
   100  func Stop() {
   101  	if !socketServerInit {
   102  		return
   103  	}
   104  
   105  	var err error
   106  
   107  	log.Log.Debugf("Closing socket pool gracefully: len(socketPool)=%d", len(socketPool))
   108  
   109  	for i, sock := range socketPool {
   110  		err = sock.Close()
   111  		if err != nil {
   112  			log.Log.Warningf("Cannot close socket #%d: err=%s", i, err)
   113  		}
   114  	}
   115  
   116  	socketServerInit = false
   117  
   118  	// trigger busy for stopping the looper
   119  	socketBusySlotsSemaphore.Signal()
   120  }
   121  
   122  // SocketAct executes the act by using the websocket to learner service
   123  func SocketAct(act *EntryAct) (*EntryActOutput, error) {
   124  	req := SocketActRequest{
   125  		entryAct:  act,
   126  		doneMutex: &sync.WaitGroup{},
   127  
   128  		outputAction: &EntryActOutput{},
   129  	}
   130  	req.doneMutex.Add(1)
   131  
   132  	// wait for free slots
   133  	socketFreeSlotsSemaphore.Wait(1)
   134  
   135  	// enqueue
   136  	socketPendingRequestsListMutex.Lock()
   137  	socketPendingRequestsList = append(socketPendingRequestsList, &req)
   138  	socketPendingRequestsListMutex.Unlock()
   139  
   140  	// signal busy slots
   141  	socketBusySlotsSemaphore.Signal()
   142  
   143  	// wait for the result
   144  	req.doneMutex.Wait()
   145  
   146  	// return to client
   147  	return req.outputAction, nil
   148  }
   149  
   150  /*
   151   * Internals
   152   */
   153  
   154  func socketConnect() (*websocket.Conn, error) {
   155  	var err error
   156  	var socket *websocket.Conn
   157  
   158  	socketUrl := fmt.Sprintf("ws://%s:8765", config.GetServiceLearningListeningHost()) //  + "/socket"
   159  
   160  	log.Log.Debugf("Trying to connect to websocket to: %s", socketUrl)
   161  
   162  	socket, _, err = websocket.DefaultDialer.Dial(socketUrl, nil)
   163  	if err != nil {
   164  		log.Log.Errorf("Error connecting to websocket server:", err)
   165  		return nil, err
   166  	}
   167  
   168  	return socket, nil
   169  }
   170  
   171  func socketLooper() {
   172  	log.Log.Infof("Started looper for learning requests")
   173  
   174  	for {
   175  		log.Log.Debugf("Waiting for free socket")
   176  		// wait for free space
   177  		socketConsumersSemaphore.Wait(1)
   178  
   179  		log.Log.Debugf("Waiting for request to process")
   180  		// wait for request
   181  		socketBusySlotsSemaphore.Wait(1)
   182  
   183  		if !socketServerInit {
   184  			log.Log.Infof("Exiting from socketLooper")
   185  			break
   186  		}
   187  
   188  		log.Log.Debugf("Processing request")
   189  		// dequeue and process
   190  		socketPendingRequestsListMutex.Lock()
   191  
   192  		requestToProcess := socketPendingRequestsList[0]
   193  		socketPendingRequestsList = socketPendingRequestsList[1:]
   194  		go socketProcessRequest(requestToProcess)
   195  
   196  		socketPendingRequestsListMutex.Unlock()
   197  
   198  		// signal free slot
   199  		socketFreeSlotsSemaphore.Signal()
   200  	}
   201  }
   202  
   203  func socketProcessRequest(req *SocketActRequest) {
   204  	var err error
   205  	var msg []byte
   206  
   207  	// find free socket
   208  	bookedSlotIndex := socketPoolSlotBook()
   209  
   210  	log.Log.Debugf("Processing request: bookedSlotIndex=%d req=%s", bookedSlotIndex, *req)
   211  
   212  	// send and receive from that socket
   213  	connection := socketPool[bookedSlotIndex]
   214  
   215  	// free
   216  	defer func() {
   217  		log.Log.Debugf("Releasing resources for bookedSlotIndex=%d", bookedSlotIndex)
   218  
   219  		socketPoolSlotRelease(bookedSlotIndex)
   220  		socketConsumersSemaphore.Signal()
   221  		req.doneMutex.Done()
   222  	}()
   223  
   224  	firstTime := true
   225  
   226  	for {
   227  		// retry to recreate the socket if not first time
   228  		if !firstTime {
   229  			log.Log.Infof("Socket #%d recreating...", bookedSlotIndex)
   230  
   231  			_ = connection.Close()
   232  
   233  			// if error retry to set up the socket
   234  			socketPool[bookedSlotIndex], err = socketConnect()
   235  			if err != nil {
   236  				log.Log.Errorf("Cannot re-create the socket, giving up: %s", err)
   237  				req.outputError = err
   238  				return
   239  			}
   240  
   241  			connection = socketPool[bookedSlotIndex]
   242  			log.Log.Infof("Socket #%d recreated successfully", bookedSlotIndex)
   243  		}
   244  
   245  		// write message
   246  		// messageBytes, err := json.Marshal(req.state)
   247  		err = connection.WriteJSON(req.entryAct.State)
   248  		if err != nil {
   249  			req.outputError = err
   250  			log.Log.Errorf("Cannot write message to socket (firstTime=%v): %s", firstTime, err)
   251  
   252  			if !firstTime {
   253  				return
   254  			}
   255  
   256  			firstTime = false
   257  			continue
   258  		}
   259  
   260  		// receive message
   261  		_, msg, err = connection.ReadMessage()
   262  		if err != nil {
   263  			req.outputError = err
   264  			log.Log.Errorf("Cannot parse read reply message (firstTime=%v): %s", firstTime, err)
   265  
   266  			if !firstTime {
   267  				return
   268  			}
   269  
   270  			firstTime = false
   271  			continue
   272  		}
   273  
   274  		// err = json.Unmarshal(msg, &req.outputAction)
   275  		msgComponents := strings.Split(string(msg), ",")
   276  		req.outputAction.Action, err = strconv.ParseFloat(msgComponents[0], 64)
   277  		req.outputAction.Eps, err = strconv.ParseFloat(msgComponents[1], 64)
   278  		if err != nil {
   279  			req.outputError = err
   280  			log.Log.Errorf("Cannot parse float from reply message (firstTime=%v): %s", firstTime, err)
   281  
   282  			if !firstTime {
   283  				return
   284  			}
   285  
   286  			firstTime = false
   287  			continue
   288  		}
   289  
   290  		break
   291  	}
   292  
   293  }
   294  
   295  func socketPoolSlotBook() int64 {
   296  	socketPoolMutex.Lock()
   297  	i := 0
   298  	for {
   299  		if !socketPoolBusy[i] {
   300  			socketPoolBusy[i] = true
   301  			socketPoolMutex.Unlock()
   302  			return int64(i)
   303  		}
   304  
   305  		i = (i + 1) % SocketPoolSize
   306  	}
   307  }
   308  
   309  func socketPoolSlotRelease(index int64) {
   310  	socketPoolMutex.Lock()
   311  	socketPoolBusy[index] = false
   312  	socketPoolMutex.Unlock()
   313  }
   314  

View as plain text