...

Source file src/discovery/watcher/watcher.go

Documentation: discovery/watcher

     1  /*
     2   * P2PFaaS - A framework for FaaS Load Balancing
     3   * Copyright (c) 2019. Gabriele Proietti Mattia <pm.gabriele@outlook.com>
     4   *
     5   * This program is free software: you can redistribute it and/or modify
     6   * it under the terms of the GNU General Public License as published by
     7   * the Free Software Foundation, either version 3 of the License, or
     8   * (at your option) any later version.
     9   *
    10   * This program is distributed in the hope that it will be useful,
    11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
    12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13   * GNU General Public License for more details.
    14   *
    15   * You should have received a copy of the GNU General Public License
    16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
    17   */
    18  
    19  // Package watcher implements the poller thread to update the list of known nodes
    20  package watcher
    21  
    22  import (
    23  	"discovery/config"
    24  	"discovery/db"
    25  	"discovery/log"
    26  	"discovery/types"
    27  	"encoding/json"
    28  	"time"
    29  )
    30  
    31  // var httpTransport *http.Transport
    32  
    33  func init() {
    34  	/*
    35  		httpTransport = &http.Transport{
    36  			MaxIdleConns:        24,
    37  			IdleConnTimeout:     64,
    38  			MaxIdleConnsPerHost: 8,
    39  			DisableKeepAlives:   false,
    40  			DialContext: (&net.Dialer{
    41  				KeepAlive: 120 * time.Second,
    42  			}).DialContext,
    43  		}
    44  	*/
    45  }
    46  
    47  func PollingLooper() {
    48  	for {
    49  		// check if we have basic configuration parameters
    50  		if config.GetMachineIp() == "" {
    51  			log.Log.Warningf("Machine has not configured its IP, service is idle. Retrying in 30 seconds...")
    52  			time.Sleep(30 * time.Second)
    53  			continue
    54  		}
    55  
    56  		machinesToPoll, err := db.MachinesGetAliveAndSuspected()
    57  		if err != nil {
    58  			log.Log.Debugf("Cannot get machines to poll, retrying in 30 seconds")
    59  			time.Sleep(30 * time.Second)
    60  			continue
    61  		}
    62  
    63  		log.Log.Debugf("Starting poll for %d machines", len(machinesToPoll))
    64  		for _, m := range machinesToPoll {
    65  			log.Log.Debugf("Polling machine %s", m.IP)
    66  
    67  			// check if machine is actually the current node
    68  			if m.IP == config.GetMachineIp() {
    69  				// remove the entry from the db
    70  				log.Log.Infof("Removing current machine '%s' from entry list", m.IP)
    71  				err = db.MachineRemove(m.IP)
    72  				if err != nil {
    73  					log.Log.Errorf("Cannot remove self machine entry in list: %s", err)
    74  				}
    75  				continue
    76  			}
    77  
    78  			// poll machine
    79  			ping, err := pollMachine(&m)
    80  
    81  			// check if poll succeed or not
    82  			if err != nil {
    83  				db.DeclarePollFailed(&m)
    84  			} else {
    85  				db.DeclarePollSucceeded(&m, ping.Seconds())
    86  			}
    87  		}
    88  
    89  		time.Sleep(time.Duration(config.GetPollTime()) * time.Second)
    90  	}
    91  }
    92  
    93  // PollMachine checks if a machine is alive but the polled machine returns all the alive machine IPs that it knows
    94  func pollMachine(machine *types.Machine) (*time.Duration, error) {
    95  	// make the get
    96  	startTime := time.Now()
    97  	res, err := GetForPoll(machine.IP)
    98  	elapsedTime := time.Since(startTime)
    99  	if err != nil {
   100  		log.Log.Debugf("Error while polling machine %s: %s", machine.IP, err.Error())
   101  		return nil, err
   102  	}
   103  
   104  	// check the answering machine's ip, if it is different from our it means that the machine changed
   105  	// its ip, so update it
   106  	if res.Header.Get(config.GetParamIp) != machine.IP {
   107  		answeringMachine, err := db.MachineGet(machine.IP)
   108  		if err == nil && answeringMachine != nil {
   109  			answeringMachine.IP = res.Header.Get(config.GetParamIp)
   110  			answeringMachine.Name = res.Header.Get(config.GetParamName)
   111  			answeringMachine.GroupName = res.Header.Get(config.GetParamGroupName)
   112  			_, _ = db.MachineUpdate(answeringMachine)
   113  		}
   114  	}
   115  
   116  	// update parameters, they may change over time
   117  	machine.Name = res.Header.Get(config.GetParamName)
   118  	machine.GroupName = res.Header.Get(config.GetParamGroupName)
   119  
   120  	// decode machine list
   121  	var machines []types.Machine
   122  	err = json.NewDecoder(res.Body).Decode(&machines)
   123  	_ = res.Body.Close()
   124  	if err != nil {
   125  		log.Log.Debugf("Error while parsing polled machine %s response: %s", machine.IP, err.Error())
   126  		return nil, err
   127  	}
   128  	// add machines list to db
   129  	for _, m := range machines {
   130  		err = db.MachineAdd(&m, true)
   131  	}
   132  
   133  	return &elapsedTime, nil
   134  }
   135  

View as plain text