Bootstrap

MongoDB自动化运维

随着管理的的集群越来越多,就需要考虑自动化运维平台了。

MongoDB的运维平台,首先要考虑的自动化创建集群,自动化升级,统一修改参数,故障节点重新初始化,当然还应该有备份管理,定时任务管理等。另外为了实现上述功能,需要把机器,集群,实例等各个元数据都维护进去。才能实现各种自动化。

下面以故障节点重新初始化为例,说明这个过程。

日常运维中,经常遇到,某个节点宕机或者由于各种原因数据延迟太大,已经追不上主库数据了。其实这两种情况的处理方式时可以统一的。每天我们可以通过报警或者登录平台就可以看到宕机或者延迟的情况。

自动化同步数据的基本步骤如下:

前提:我们一键修复的是出问题的实例。

1,判断进程是否存在,若已经关闭,或者符合关闭条件,进入下一步;

2,对于符合关闭条件的,关闭进程;

3,删除数据(根据配置文件找到数据目录);

4,重启启动实例;

主要代码如下

//ReSync is ReSync
func (c *ResyncController) ReSync() {
  //选择集群以后,给出各个节点的状态;在可重启的下拉框中,只显示,符合重启条件的节点;

  mongoVersion := c.GetString("Version")
  mongoPort := c.GetString("Port")
  confPath := c.GetString("ConfPath")
  CurrentHost := c.GetString("TargetIP")
  ClusterName := c.GetString("ClusterName")
  MongoHosts := c.GetString("Servers")
  MongoHosts = strings.Replace(MongoHosts, ",", ":"+mongoPort+",", -1) + ":" + mongoPort
  DelaySeconds := -1

  mongodPath := beego.AppConfig.String("mongodb"+"::"+mongoVersion) + `/mongod`
  mongoProcess := mongodPath + " -f " + confPath

  //flagMongodActive := false
  flagEableShutdown := false
  flagShutdownMongod := false
  flagDropData := false
  flagReSync := false
  var dbdataPath string
  var strResult string
  var strEableShutdown string
  var str string
  var err error

  conFile := strings.Split(confPath, "/")[len(strings.Split(confPath, "/"))-1]
  sh := runshell.New(CurrentHost, "mongo", "", 22)

  //0,判断进程是否存在
  isactive := "ps -ef | grep " + conFile + " | grep -v grep "
  logs.Info(isactive)
  str, err = sh.RunCmd(isactive)

  //这里对 err 和 str 两个变量判断结果,,是因为,上面的执行语句, 没有结果时, err不为空,包含waitmsg信息
  if err != nil {
    logs.Warning(str)
    if str == "" {
      flagShutdownMongod = true
      logs.Warning(conFile + " 进程是关闭状态")
      strResult = strResult + conFile + " 进程是关闭状态;
" } else { strResult = strResult + "连接ssh失败;
" } } else { if str == "" { flagShutdownMongod = true logs.Warning(conFile + " 进程是关闭状态") strResult = strResult + conFile + " 进程是关闭状态;
" } else { //判断进程是否可以被关闭, 符合两个条件,进程才可以被关闭:1.从节点,2.延时大于给定的值 flagEableShutdown, strEableShutdown = EnableShutdown(MongoHosts, ClusterName, CurrentHost, DelaySeconds) //flagMongodActive = true strResult = strResult + conFile + " 进程是运行状态;
" } } if strEableShutdown != "" { strResult = strResult + strEableShutdown + ";
" } //1,关闭进程 if flagEableShutdown { logs.Info(mongoProcess + " --shutdown on " + CurrentHost) str, err = sh.RunCmd(mongoProcess + " --shutdown") logs.Info(str) if err != nil { strResult = strResult + " 关闭mongodb进程失败;
" } else { flagShutdownMongod = true strResult = strResult + " 关闭mongodb进程成功;
" } } //2, 删除数据 if flagShutdownMongod { logs.Info("cat " + confPath + " | grep dbPath") str, err = sh.RunCmd("cat " + confPath + " | grep dbPath") if err != nil { flagShutdownMongod = false } else { //去掉可能出现的换行符 str = strings.Replace(str, "\n", "", -1) str = strings.Replace(str, " ", "", -1) dbdataPath = strings.Split(str, ":")[len(strings.Split(str, ":"))-1] if strings.HasPrefix(dbdataPath, `/data/`) == false || len(strings.Split(dbdataPath, `/`)) < 3 { flagShutdownMongod = false strResult = strResult + " 删除路径出现问题" + dbdataPath + ";
" } } } if flagShutdownMongod { rmrfdata := "rm -rf " + dbdataPath + "/*" logs.Info(rmrfdata + " on " + CurrentHost) str, err = sh.RunCmd(rmrfdata) logs.Info(str) if err != nil { strResult = strResult + " 删除数据失败程失败;
" } else { flagDropData = true strResult = strResult + " 删除数据失败程成功;
" } } //3, 启动实例 if flagDropData { logs.Info(mongoProcess + " on " + CurrentHost) str, err = sh.RunCmd(mongoProcess) logs.Info(str) if err != nil { strResult = strResult + " 启动实例失败;
" } else { flagReSync = true strResult = strResult + " 启动实例成功;
" } } if flagReSync { c.jsonResult(enums.JRCodeSucc, strResult, 0) } else { c.jsonResult(enums.JRCodeFailed, strResult, 0) } }

下面是判断节点是否可以被初始化, 虽然列出来的都是出问题的节点,但是为了安全起见,初始化之前,还是要做各种判断 。例如有a,b,c三个节点的集群, 当a宕机的情况下, 是不允许重新初始化b和c的,因为初始化过程中,会出现没有主节点的情况。

//EnableShutdown  is EnableShutdown
func EnableShutdown(MongoHosts, ClusterName, CurrentHost string, DelaySeconds int) (bool, string) {
  boolResult := false
  strResult := ""
  var opTimePrimary, opTimeCurrent time.Time

  res := dao.GetClusterInfo(MongoHosts, ClusterName)

  logs.Info((res))
  nodesCount := len(res.Members)
  validCount := 0
  resyncAlive := false
  //1判断主节点
  for _, v := range res.Members {
    if v.State == 1 && v.IP == CurrentHost {
      strResult = "该节点是master节点,不允许初始化!"
      break
    }
    if v.State == 1 {
      opTimePrimary = v.OptimeDate
    }
    if v.IP == CurrentHost {
      opTimeCurrent = v.OptimeDate
      if v.Health == 1 {
        resyncAlive = true
      }
    }
    if v.Health == 1 {
      validCount++
    }
  }

  if strResult != "" {
    return boolResult, strResult
  }

  //2判断延时
  diff := int(opTimePrimary.Sub(opTimeCurrent).Seconds())
  if diff > DelaySeconds {
    boolResult = true
  } else {
    strResult = "该节点是secondary节点,延时时间为" + strconv.Itoa(diff) + ", 小于" + strconv.Itoa(DelaySeconds) + ", 没有触发重启!"
  }

  //3 需要确保在一个节点关闭重启的时间内,依然有多数节点存活。
  //例如有a,b,c三个节点的集群, 当a宕机的情况下, 是不允许重新初始化b和c的,因为初始化过程中,会出现没有主节点的情况。
  if nodesCount/2+1 == validCount && resyncAlive {
    boolResult = false
    strResult = "该节点不能被重新初始化, 因为初始化过程中可能没有主节点!"
  }

  return boolResult, strResult
}

理顺了各种日常情况以后,下面时连接服务和运行我们需要的命令了。


// NewSSHKey is remote server by key
func (c *Cli) connectByKey() error {
  keypath := beego.AppConfig.String("key" + "::keypath")
  //keypath := `D:\Users\key\key.txt`
  key, err := ioutil.ReadFile(keypath)
  signer, err := ssh.ParsePrivateKey(key)
  if err != nil {
    return err
  }
  config := &ssh.ClientConfig{
    User: c.Username,
    Auth: []ssh.AuthMethod{
      ssh.PublicKeys(signer),
    },
    Timeout: 10 * time.Second,
    HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
      return nil
    },
  }
  addr := fmt.Sprintf("%s:%d", c.IP, c.Port)
  sshClient, err := ssh.Dial("tcp", addr, config)
  if err != nil {
    return err
  }
  c.client = sshClient
  return err
}

// RunCmd is Run linux cmd
func (c *Cli) RunCmd(shell string) (string, error) {
  if c.client == nil {
    if err := c.connectByKey(); err != nil {
      return "ssh remote server failed", err
    }
  }
  session, err := c.client.NewSession()
  if err != nil {
    return "ssh remote server failed", err
  }
  defer session.Close()
  buf, err := session.CombinedOutput(shell)

  c.LastResult = string(buf)
  return c.LastResult, err
}