...

Source file src/discovery/db/sqlite.go

Documentation: discovery/db

     1  /*
     2   * P2PFaaS - A framework for FaaS Load Balancing
     3   * Copyright (c) 2020. Gabriele Proietti Mattia <pm.gabriele@outlook.com>
     4   *
     5   * This program is free software: you can redistribute it and/or modify
     6   * it under the terms of the GNU General Public License as published by
     7   * the Free Software Foundation, either version 3 of the License, or
     8   * (at your option) any later version.
     9   *
    10   * This program is distributed in the hope that it will be useful,
    11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
    12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    13   * GNU General Public License for more details.
    14   *
    15   * You should have received a copy of the GNU General Public License
    16   * along with this program.  If not, see <https://www.gnu.org/licenses/>.
    17   *
    18   */
    19  
    20  package db
    21  
    22  import (
    23  	"database/sql"
    24  	"discovery/config"
    25  	"discovery/log"
    26  	"discovery/types"
    27  	_ "github.com/mattn/go-sqlite3"
    28  	"os"
    29  	"strings"
    30  	"time"
    31  )
    32  
    33  var db *sql.DB
    34  
    35  type Error struct {
    36  	Reason string
    37  }
    38  
    39  func (e Error) Error() string {
    40  	return e.Reason
    41  }
    42  
    43  func initBackend() {
    44  	log.Log.Debugf("Initializing DB sqlite3 backend")
    45  
    46  	var err error
    47  
    48  	// create directory if does not exists
    49  	err = os.MkdirAll(getDatabaseDirPath(), 0755)
    50  	if err != nil {
    51  		log.Log.Fatalf("Cannot create db directory: %s", err.Error())
    52  		return
    53  	}
    54  
    55  	// init db
    56  	db, err = sql.Open("sqlite3", getDatabaseFilePath())
    57  	if err != nil {
    58  		log.Log.Fatal("Cannot init sqlite database: %s", err.Error())
    59  		return
    60  	}
    61  	log.Log.Debugf("Created DB at %s: %s", getDatabaseFilePath(), db)
    62  
    63  	err = initDb()
    64  	if err != nil {
    65  		log.Log.Fatal("Cannot init sqlite tables: %s", err.Error())
    66  		return
    67  	}
    68  
    69  	log.Log.Info("Sqlite DB init successfully")
    70  }
    71  
    72  func initDb() error {
    73  	var err error
    74  	_, err = db.Exec(`create table if not exists machines (
    75  								id integer primary key, 
    76  								ip text unique, 
    77  								name text, 
    78  								group_name text, 
    79  								ping real, 
    80  								last_update integer, 
    81  								alive integer, 
    82  								dead_polls integer
    83                              )`)
    84  	if err != nil {
    85  		log.Log.Errorf("Cannot init machines: %s", types.MachinesCollectionName)
    86  		return err
    87  	}
    88  	return nil
    89  }
    90  
    91  // MachineAdd tries to add the machine to database, if already present if declareAlive is true then the machine will be
    92  // redeclared as alive/home/gabrielepmattia/Coding/p2p-faas/stack-discovery
    93  func MachineAdd(machine *types.Machine, declareAlive bool) error {
    94  	// skip if we try to add the current machine
    95  	if machine.IP == config.GetMachineIp() {
    96  		return Error{Reason: "Could not add yourself as machine"}
    97  	}
    98  	// skip docker IPs
    99  	if strings.Index(machine.IP, "172.17") == 0 {
   100  		return Error{Reason: "Ignoring IPs that starts with docker subnet: 172.17.0.0/16"}
   101  	}
   102  
   103  	// check if machine already exists
   104  	machineRetrieved, err := MachineGet(machine.IP)
   105  	if machineRetrieved != nil && declareAlive {
   106  		log.Log.Debugf("Machine %s already exists", machine.IP)
   107  		// if yes, set machine to alive and update
   108  		machine.Alive = true
   109  		machine.DeadPolls = 0
   110  		machine.LastUpdate = time.Now().Unix()
   111  
   112  		_, err = MachineUpdate(machine)
   113  		if err != nil {
   114  			log.Log.Errorf("Cannot update the machine row: %s", err.Error())
   115  			return err
   116  		}
   117  		return nil
   118  	} else {
   119  		log.Log.Debugf("Machine %s does not exist", machine.IP)
   120  	}
   121  
   122  	// add the machine
   123  	tx, err := db.Begin()
   124  	if err != nil {
   125  		log.Log.Errorf("Cannot begin transaction: %s", err.Error())
   126  		return err
   127  	}
   128  	stmt, err := db.Prepare("insert into machines (ip, name, group_name, ping, last_update, alive, dead_polls) values (?,?,?,?,?,?,?)")
   129  	if err != nil {
   130  		log.Log.Errorf("Cannot prepare query: %s", err.Error())
   131  		return err
   132  	}
   133  	_, err = stmt.Exec(machine.IP, machine.Name, machine.GroupName, machine.Ping, time.Now().Unix(), machine.Alive, machine.DeadPolls)
   134  	if err != nil {
   135  		log.Log.Errorf("Cannot execute query: %s", err.Error())
   136  		return err
   137  	}
   138  	err = tx.Commit()
   139  	if err != nil {
   140  		log.Log.Errorf("Cannot commit query: %s", err.Error())
   141  		return err
   142  	}
   143  
   144  	return nil
   145  }
   146  
   147  func MachinesGet() ([]types.Machine, error) {
   148  	rows, err := db.Query("select * from machines order by name")
   149  	if err != nil {
   150  		log.Log.Errorf("Cannot retrieve machines: %s", err.Error())
   151  		return nil, err
   152  	}
   153  	return machinesParseRows(rows)
   154  }
   155  
   156  // MachinesGetAlive retrieves machines that surely are alive
   157  func MachinesGetAlive() ([]types.Machine, error) {
   158  	rows, err := db.Query("select * from machines where alive = 1 order by name")
   159  	if err != nil {
   160  		log.Log.Errorf("Cannot retrieve machines: %s", err.Error())
   161  		return nil, err
   162  	}
   163  	return machinesParseRows(rows)
   164  }
   165  
   166  func MachinesGetAliveAndSuspected() ([]types.Machine, error) {
   167  	rows, err := db.Query("select * from machines where alive = 1 and dead_polls >= 0 and dead_polls < ?  order by name", config.GetMachineDeadPollsRemovingThreshold())
   168  	if err != nil {
   169  		log.Log.Errorf("Cannot retrieve machines: %s", err.Error())
   170  		return nil, err
   171  	}
   172  	return machinesParseRows(rows)
   173  }
   174  
   175  func MachineGet(ip string) (*types.Machine, error) {
   176  	log.Log.Debugf("Searching machine %s", ip)
   177  	rows, err := db.Query("select * from machines where ip = ?", ip)
   178  	if err != nil {
   179  		log.Log.Errorf("Cannot retrieve machines: %s", err.Error())
   180  		return nil, err
   181  	}
   182  	machines, err := machinesParseRows(rows)
   183  	if err != nil {
   184  		return nil, err
   185  	}
   186  	if len(machines) > 0 {
   187  		return &machines[0], nil
   188  	}
   189  	return nil, nil
   190  }
   191  
   192  func MachineUpdate(machine *types.Machine) (int64, error) {
   193  	res, err := db.Exec(`
   194  				update machines set 
   195                      name = ?, 
   196                      group_name = ?, 
   197                      ping = ?, 
   198                      last_update = ?, 
   199                      alive = ?, 
   200                      dead_polls = ? 
   201  				where 
   202  				      ip = ?`,
   203  		machine.Name,
   204  		machine.GroupName,
   205  		machine.Ping,
   206  		machine.LastUpdate,
   207  		machine.Alive,
   208  		machine.DeadPolls,
   209  		machine.IP,
   210  	)
   211  	if err != nil {
   212  		log.Log.Errorf("Cannot update machine %s: %s", machine.IP, err.Error())
   213  		return 0, err
   214  	}
   215  	rowsAff, _ := res.RowsAffected()
   216  	return rowsAff, nil
   217  }
   218  
   219  func MachineRemove(ip string) error {
   220  	_, err := db.Exec("delete from machines where ip = ?", ip)
   221  	if err != nil {
   222  		log.Log.Errorf("Cannot remove machine %s: %s", ip, err.Error())
   223  		return err
   224  	}
   225  	return nil
   226  }
   227  
   228  func MachineRemoveAll() error {
   229  	res, err := db.Exec("delete from machines")
   230  	if err != nil {
   231  		log.Log.Errorf("Cannot remote machines: %s", err.Error())
   232  		return err
   233  	}
   234  	deletedRows, _ := res.RowsAffected()
   235  	log.Log.Debugf("Deleted: %d rows", deletedRows)
   236  	return nil
   237  }
   238  
   239  func machinesParseRows(rows *sql.Rows) ([]types.Machine, error) {
   240  	var machines []types.Machine
   241  	var err error
   242  	totalRows := 0
   243  
   244  	for rows.Next() {
   245  		totalRows += 1
   246  		var tempMachine types.Machine
   247  		err = rows.Scan(&tempMachine.ID, &tempMachine.IP, &tempMachine.Name, &tempMachine.GroupName, &tempMachine.Ping, &tempMachine.LastUpdate, &tempMachine.Alive, &tempMachine.DeadPolls)
   248  		if err != nil {
   249  			log.Log.Errorf("Cannot scan row: %s", err.Error())
   250  			continue
   251  		}
   252  		machines = append(machines, tempMachine)
   253  	}
   254  	log.Log.Debugf("Total rows: %d", totalRows)
   255  	return machines, nil
   256  }
   257  

View as plain text