bijingrui
作者bijingrui2020-01-05 22:45
系统运维工程师, 卡中心

Repoll 启动redis集群(附带源码讲解)

字数 20112阅读 5656评论 0赞 3

通过Repoll redis管理平台从申请、审批到集群配置上线的全过程讲解。在过程讲解中本文将附带对平台源码进行讲解。一面看redis、一面看django和python多方面一起学习!

GitHub传送门:https://github.com/NaNShaner/repoll

Redis管理平台Repoll

集群的申请

平台对于redis集群的申请颇为简单,输入如下图所示的各项字段,点击确认即可。报错后平台将发起审批,给到领导层进行审批。

Redis 申请页面

配套的代码讲解

  1. 实例的申请使用的是django的admin功能,未加修饰。直接上代码,功能见注释。

    class ApplyRedisInfoAdmin(admin.ModelAdmin):

     def get_queryset(self, request):
         """函数作用:使当前登录的用户只能看到自己负责的实例"""
         qs = super(ApplyRedisInfoAdmin, self).get_queryset(request)
         result = qs.filter(create_user=request.user)
         if request.user.is_superuser:
             return qs
         return result
    
     def save_model(self, request, obj, form, change):
         """
         隐藏前端页面申请用户字段,后台自动添加用户入库
         :param request: 当前wsgi
         :param obj:
         :param form:
         :param change:
         :return:
         """
         obj.create_user = request.user
         super().save_model(request, obj, form, change)
    
     list_display = ['id', 'apply_ins_name', 'ins_disc', 'redis_type',
                     'redis_mem', 'sys_author', 'area',
                     'pub_date', 'apply_status']
     list_filter = ['redis_type']
     search_fields = ['area']
     # 不可见字段
     exclude = ['create_user']
     list_per_page = 15
    
     readonly_fields = ['apply_status', ]
    

集群的审批

平台对实例的审批也是十分简单,只需勾选需要申请的实例,点击“批准”或者“拒绝”即可。

Redis 审批页面

配套的代码讲解

  • 在django admin的界面中添加“批准选择的Redis实例”和“拒绝选择的Redis实例”两个按钮,详见代码如下:
  • 同时在admin界面中禁用添加、删除等按钮,该界面是用于领导审批,无需这两个按钮
  • 领导当然也可以批量勾选,一次性审批多条

    class RedisApplyAdmin(admin.ModelAdmin):

      def has_add_permission(self, request):
          """
          禁用添加按钮
          :param request:
          :return:
          """
          return False
    
      def has_delete_permission(self, request, obj=None):
          """
          禁用删除按钮
          :param request:
          :param obj:
          :return:
          """
          return False
    
      def get_actions(self, request):
          """
          在actions中去掉‘删除’操作
          :param request:
          :return:
          """
          actions = super(RedisApplyAdmin, self).get_actions(request)
          if request.user.username[0].upper() != 'J':
              if 'delete_selected' in actions:
                  del actions['delete_selected']
          return actions
    
      def get_queryset(self, request):
          """函数作用:使当前登录的用户只能看到自己负责的实例"""
          qs = super(RedisApplyAdmin, self).get_queryset(request)
          if request.user.is_superuser:
              return qs
          return qs.filter(create_user=RedisApply.objects.filter(create_user=request.user))
    
      list_display = ['id', 'apply_ins_name', 'ins_disc', 'redis_type',
                      'redis_mem', 'sys_author', 'area',
                      'pub_date', 'create_user', 'apply_status']
      list_filter = ['redis_type']
      search_fields = ['area']
      list_per_page = 15
    
      actions = ['approve_selected_new_assets', 'deny_selected_new_assets']
    
      def approve_selected_new_assets(self, request, queryset):
          """
          用于在申请redis的界面添加一个审批通过按钮
          :param request: Http Request实例
          :param queryset: 勾选实例名称
          :return:
          """
          # 获得被打钩的checkbox对应的Redis的id编号,用于更新数据库的主键
          selected = request.POST.getlist(admin.ACTION_CHECKBOX_NAME)
          success_upline_number = 0
          try:
              for asset_id in selected:
                  obj = ApproveRedis(request, asset_id)
                  create_redis_ins = obj.create_asset()
                  if create_redis_ins:
                      success_upline_number += 1
                      self.message_user(request, "成功批准  %s  个新Redis实例上线!" % success_upline_number)
                      obj.redis_apply_status_update(statu=3)
                  else:
                      self.message_user(request, "实例为 {0} 的实例上线失败,已存在上线实例,请检查".format(obj.redis_ins_name))
          except ValueError as e:
              self.message_user(request, "实例为 {0} 的实例上线失败,原因为{1}".format(queryset, e))
      approve_selected_new_assets.short_description = "批准选择的Redis实例"
    
      def deny_selected_new_assets(self, request, queryset):
          """
          用于在申请redis的界面添加一个审批拒绝按钮
          :param request: Http Request实例
          :param queryset: 勾选实例名称
          :return:
          """
          # 获得被打钩的checkbox对应的Redis的id编号,用于更新数据库的主键
          selected = request.POST.getlist(admin.ACTION_CHECKBOX_NAME)
          deny_upline_number = 0
          try:
              for asset_id in selected:
                  obj = ApproveRedis(request, asset_id)
                  deny_redis_ins = obj.deny_create()
                  if deny_redis_ins:
                      deny_upline_number += 1
                      obj.redis_apply_status_update(statu=4)
                      self.message_user(request, "已拒绝  %s  个新Redis实例上线!" % deny_upline_number)
                  else:
                      self.message_user(request, "操作实例为 {0} 的实例失败,已存在上线实例,请检查".format(obj.redis_ins_name))
          except ValueError as e:
              self.message_user(request, "操作实例为 {0} 的实例失败,原因为{1}".format(queryset, e))
      deny_selected_new_assets.short_description = "拒绝选择的Redis实例"
    
  • 点击审批同意、拒绝按钮后更新状态

在上面的代码中,如下一句即更新实例审批状态为“已审批”

obj.redis_apply_status_update(statu=3)

如下一句即更新实例审批状态为“已拒绝”

obj.redis_apply_status_update(statu=4)
  • 来看看点完“同意”后的状态,当然上线状态还是“未上线”

审批后的页面

配置集群上线

平台对于redis实例配置上线的页面是给DBA或者运维的同学准备中,对于用户的申请和领导的审批状态都是展现出来,详如下图。在实例详情的文本框中按照固定的格式输入,点击保存,即完成上线配置咯。

这块的逻辑比较复杂,代码较多,各位花点时间:

  • 逻辑实现,使用了django的信号触发,有兴趣了解django的信号功能,可以详见django的官网

    @receiver(post_save, sender=ApplyRedisText)
    def apply_redis_text_handler(sender, **kwargs):

      """
      触发器,前端页面在审批完后自动触发
      :param sender:
      :param kwargs:
      :return:
      """
      a = kwargs
      redis_ins_id = kwargs['instance'].redis_ins_id
      redis_ins_obj = RedisIns.objects.filter(id=redis_ins_id)
      redis_ins_type = redis_ins_obj.values('redis_type').first()['redis_type']
      redis_text = kwargs['instance'].apply_text
      redis_apply_text_split = redis_apply_text(redis_text, redis_type=redis_ins_type)
      redis_ins_obj_name = redis_ins_obj.values('redis_ins_name').first()
      redis_ins_obj_mem = redis_ins_obj.values('redis_mem').first()
      if redis_ins_type == 'Redis-Standalone':
          redis_ip = redis_apply_text_split['redis_ip']
          redis_port = redis_apply_text_split['redis_port']
          a = RedisStandalone(redis_ins=redis_ins_obj,
                              redis_ins_name=redis_ins_obj_name,
                              redis_ins_type=redis_ins_type,
                              redis_ins_mem=redis_apply_text_split['redis_mem'],
                              redis_ip=redis_ip,
                              redis_port=redis_port)
          a.saved_redis_running_ins()
          if a.create_redis_conf_file():
              redis_start = RedisStartClass(host=redis_ip,
                                            redis_server_ctl="/opt/repoll/redis/src/redis-server /opt/repoll/conf/" + str(redis_port) + ".conf")
              if redis_start.start_server():
                  logging.info("Redis 单实例启动成功,服务器IP:{0}, 启动端口为:{1}".format(redis_ip, redis_port))
              else:
                  logging.info("Redis 单实例启动失败,服务器IP:{0}, 启动端口为:{1}".format(redis_ip, redis_port))
      elif redis_ins_type == 'Redis-Sentinel':
          b = RedisModelStartClass(model_type='Redis-Sentinel',
                                   redis_ins=redis_ins_obj,
                                   redis_master_ip_port=redis_apply_text_split['redis_master_ip_port'],
                                   redis_slave_ip_port=redis_apply_text_split['redis_slave_ip_port'],
                                   redis_master_name=redis_apply_text_split['redis_master_name'],
                                   redis_sentinel_ip_port=redis_apply_text_split['redis_sentinel_ip_port'],
                                   redis_sentinel_num=redis_apply_text_split['redis_sentinel_num'],
                                   sentinel_down_after_milliseconds=30000,
                                   sentinel_failover_timeout=180000,
                                   sentinel_parallel_syncs=1,
                                   redis_mem=redis_apply_text_split['redis_mem'])
          create_sentinel_conf_file = b.create_sentienl_conf_file()
          create_master_slave_file = b.create_maser_slave_conf()
          if create_sentinel_conf_file and create_master_slave_file:
              start_master = b.start_redis_master()
              if start_master:
                  start_slave = b.start_slave_master()
                  if start_slave:
                      b.start_sentinel_master()
                      logging.info("哨兵模式启动成功,redis_master_ip_port:{0},"
                                   "redis_slave_ip_port:{1},"
                                   "redis_sentinel_ip_port:{2},"
                                   "redis_master_name:{3}".format(redis_apply_text_split['redis_master_ip_port'],
                                                                  redis_apply_text_split['redis_slave_ip_port'],
                                                                  redis_apply_text_split['redis_sentinel_ip_port'],
                                                                  redis_apply_text_split['redis_master_name']))
          b.save_sentinel_redis_ins()
      elif redis_ins_type == 'Redis-Cluster':
          redis_list = []
          redis_cluster_mem_sum = 0
          for redis_one_ins in redis_apply_text_split:
              redis_one_ins_split = {"redis_master": redis_one_ins['redis_ip_port'][0],
                                     "redis_slave": redis_one_ins['redis_ip_port'][1:],
                                     "redis_mem": redis_one_ins['redis_mem']}
              redis_cluster_mem_sum += int(redis_one_ins['redis_mem'])
              redis_list.append(redis_one_ins_split)
          obj_runningins = RunningInsTime(running_ins_name=redis_ins_obj_name["redis_ins_name"],
                                          redis_type='Redis-Cluster',
                                          redis_ins_mem=redis_cluster_mem_sum,
                                          ins_status=0)
          obj_runningins.save()
          for redis_one_ins in redis_apply_text_split:
              for all_redis_ins in redis_one_ins['redis_ip_port']:
                  if redis_one_ins['redis_ip_port'].index(all_redis_ins) == 0:
                      c = RedisClusterClass(redis_ins=redis_ins_obj,
                                            redis_ins_name=redis_ins_obj_name,
                                            redis_ins_type="Redis-Master",
                                            redis_ins_mem=redis_one_ins['redis_mem'],
                                            redis_ip=all_redis_ins[0],
                                            redis_port=all_redis_ins[1])
                      file_status = c.create_cluster_file()
                      if file_status:
                          c.start_all_redis_ins()
                          c.save_cluster_ins()
                  else:
                      c = RedisClusterClass(redis_ins=redis_ins_obj,
                                            redis_ins_name=redis_ins_obj_name,
                                            redis_ins_type="Redis-Slave",
                                            redis_ins_mem=redis_one_ins['redis_mem'],
                                            redis_ip=all_redis_ins[0],
                                            redis_port=all_redis_ins[1])
                      file_status = c.create_cluster_file()
                      if file_status:
                          c.start_all_redis_ins()
                          c.save_cluster_ins()
          RedisIns.objects.filter(redis_ins_name=redis_ins_obj_name["redis_ins_name"]).update(on_line_status=0)
          start_cluster = StartRedisCluster(cluster_list=redis_list)
          redis_cluster_list = start_cluster.redis_cluster_list()
          start_cluster.redis_cluser_meet(redis_cluster_list)
          redis_cluster_node_info = start_cluster.get_cluster_info()
          if redis_cluster_node_info:
              start_cluster.add_slot_2_master(redis_cluster_node_info)
          else:
              logging.error("redis cluster 启动失败,集群信息为{0}".format(redis_list))
    
  • 功能实现,
  1. 在每台服务器中生成redis实例的配置文件
  2. 按照配置文件启动redis的实例
  3. redis集群所有节点进行握手
  4. 按照输入的master节点分配redis的slot(当前只支持3主3从,受限于slot的计算)
  5. 按照输入的master节点,完成redis主从关系的确立

    class RedisClusterClass:

     def __init__(self, redis_ins, redis_ins_name, redis_ins_type, redis_ins_mem, redis_ip, redis_port):
         """
         单个redis的实例进行配置文件生成、配置文件分发、进程启动
         :param redis_ins:
         :param redis_ins_name:
         :param redis_ins_type:
         :param redis_ins_mem:
         :param redis_ip:
         :param redis_port:
         """
         self.redis_ins = [r.__dict__ for r in redis_ins]
         self.redis_ins_name = redis_ins_name['redis_ins_name']
         self.redis_ins_type = redis_ins_type
         self.redis_ins_mem = redis_ins_mem
         self.redis_ip = redis_ip
         self.redis_port = redis_port
    
     def create_cluster_file(self):
         """
         创建redis实例的配置文件,并分发到资源池服务器指定的目录下。分发文件支持ssh免密和用户密码校验
         使用用户密码校验目前是硬编码,后续优化支持读库
         :return:
         """
         redis_conf = get_redis_conf("Redis-Standalone")
         redis_cluster_conf = get_redis_conf("Redis-Cluster")
         all_redis_conf = [conf_k_v.__dict__ for conf_k_v in redis_conf]
         all_cluster_conf = [conf_k_v.__dict__ for conf_k_v in redis_cluster_conf]
         conf_file_name = "{0}/templates/".format(TEMPLATES_DIR) + str(self.redis_port) + "-cluser.conf"
         with open(conf_file_name, 'w+') as f:
             for k, v in all_redis_conf[0].items():
                 if k != 'id' and k != 'redis_version' and k != 'redis_type':
                     if isinstance(v, str) or isinstance(v, int):
                         k, v = regx_redis_conf(key=k, value=v, port=self.redis_port,
                                                maxmemory=mem_unit_chage(self.redis_ins_mem))
                         f.write(k + " " + str(v) + "\\n")
             for k, v in all_cluster_conf[0].items():
                 if k != 'id' and k != 'redis_version' and k != 'redis_type':
                     if isinstance(v, str) or isinstance(v, int):
                         k, v = regx_redis_conf(key=k, value=v, port=self.redis_port,
                                                maxmemory=mem_unit_chage(self.redis_ins_mem),
                                                kwargs={"redis_port": self.redis_port})
                         f.write(k + " " + str(v) + "\\n")
         if do_scp(self.redis_ip, conf_file_name, "/opt/repoll/conf/" + str(self.redis_port) + "-cluster.conf",
                   user_name="root", user_password="Pass@word"):
             logging.info("目标服务器{0}文件分发成功".format("/opt/repoll/conf/" + str(self.redis_port) + "-cluster.conf"))
         else:
             logging.error("目标服务器{0}文件分发失败".format("/opt/repoll/conf/" + str(self.redis_port) + "-cluster.conf"))
             return False
         return True
    
     def start_all_redis_ins(self):
         redis_start = RedisStartClass(host=self.redis_ip,
                                       redis_server_ctl="/opt/repoll/redis/src/redis-server /opt/repoll/conf/" +
                                                        str(self.redis_port) + "-cluster.conf")
         if redis_start.start_server():
             logging.info("redis 实例{2}启动成功,ip:port: {0}:{1}".format(self.redis_ip, self.redis_port, self.redis_ins_name))
             return True
         else:
             logging.info("redis 实例{2}启动失败,ip:port: {0}:{1}".format(self.redis_ip, self.redis_port, self.redis_ins_name))
             return False
    
     def save_cluster_ins(self):
         obj_runningins_now = RunningInsTime.objects.all().get(running_ins_name=self.redis_ins_name)
         obj = RunningInsCluster(
             running_ins_name=self.redis_ins_name,
             redis_type=self.redis_ins_type,
             running_ins_port=self.redis_port,
             redis_ip=self.redis_ip,
             redis_ins_mem=self.redis_ins_mem,
             running_ins_standalone=obj_runningins_now
         )
         obj.save()
    

class StartRedisCluster:

    def __init__(self, cluster_list):
        """
        启动Redis Cluster
        :param cluster_list: 所有redis实例的IP及PORT
        """
        if isinstance(cluster_list, list):
            self.cluster_list = cluster_list

    def redis_cluster_list(self):
        """
        格式化所有redis实例的IP及PORT
        :return: list
        """
        redis_list = []
        for redis_one_list in self.cluster_list:
            redis_list.append(redis_one_list['redis_master'])
            for redis_slave in redis_one_list['redis_slave']:
                redis_list.append(redis_slave)
        return redis_list

    def redis_cluser_meet(self, redis_ins_list):
        """
        redis集群内所有节点完成cluster meet
        :param redis_ins_list: 入参为格式化后的集群节点信息
        :return:
        """
        if isinstance(redis_ins_list, list):
            redis_ins_list_copy = copy.deepcopy(redis_ins_list)
            i = 0
            try:
                if i < len(redis_ins_list_copy):
                    i += 1
                    redis_ins_one = redis_ins_list_copy.pop(0)
                    redis_ins_list_copy.append(redis_ins_one)
                    redis_ins_one_ip = redis_ins_one[0]
                    redis_ins_one_port = redis_ins_one[1]
                    for redis_ins_one_by_one in redis_ins_list_copy:
                        redis_ins_one_by_one_ip = redis_ins_one_by_one[0]
                        redis_ins_one_port_port = redis_ins_one_by_one[1]
                        comm_line = "/opt/repoll/redis/src/redis-cli -c -h {0} -p {1} cluster meet {2} {3}".format(
                            redis_ins_one_ip, redis_ins_one_port, redis_ins_one_by_one_ip, redis_ins_one_port_port
                        )
                        if do_command(host=redis_ins_one_ip, commands=comm_line, user_name="root", user_password="Pass@word"):
                            logging.info("{0}:{1} cluster meet {2}:{3} is ok".format(
                                redis_ins_one_ip, redis_ins_one_port, redis_ins_one_by_one_ip, redis_ins_one_port_port))
            except Exception as e:
                logging.error("Redis Cluster 启动失败,涉及节点为{0},报错信息为{1}".format(self.cluster_list, e))

    def get_cluster_info(self):
        """
        获取cluster nodes信息,用于后续的主从关系添加
        :return: 所有节点的node id及ip对应关系
        """
        node_dict = {}
        redis_cluster_ip_port = self.cluster_list[0]
        _comm_line = "/opt/repoll/redis/src/redis-cli -c -h {0} -p {1} cluster nodes ".format(redis_cluster_ip_port['redis_master'][0], redis_cluster_ip_port['redis_master'][1])
        _comm_result = do_command(host=redis_cluster_ip_port['redis_master'][0], commands=_comm_line, user_name="root", user_password="Pass@word")
        if _comm_result[0] == 0:
            _comm_result_list = _comm_result[1].decode('unicode-escape')
            _comm_result_list = _comm_result_list.split("\\n")
            node_dict = {one_line.split()[1]: one_line.split()[0] for one_line in _comm_result_list if one_line}
        if node_dict:
            return node_dict
        else:
            return None

    def add_slot_2_master(self, cluster_node_info):
        """
        给主节点添加slot,并完成主从节点关系
        :param cluster_node_info: 入参为所有节点的node id和ip对应关系
        :return:
        """
        _slots = ["{0..5461}", "{5462..10922}", "{10923..16383}"]
        num = 0
        try:
            for redis_ip_port in self.cluster_list:
                redis_master_ip = redis_ip_port['redis_master']
                _add_slot_comm_line = "/opt/repoll/redis/src/redis-cli -c -h {0} -p {1} cluster addslots {2}".format(redis_master_ip[0],
                                                                                                                     redis_master_ip[1],
                                                                                                                     _slots[num])
                if do_command(host=redis_master_ip[0], commands=_add_slot_comm_line, user_name="root", user_password="Pass@word"):
                    logging.info("add slot 成功")
                for redis_slave_ip in redis_ip_port['redis_slave']:
                    _add_master_replca_comm_line = "/opt/repoll/redis/src/redis-cli -c -h {0} -p {1} cluster replicate {2}".format(
                        redis_slave_ip[0],
                        redis_slave_ip[1],
                        cluster_node_info['{0}:{1}'.format(redis_master_ip[0], redis_master_ip[1])]
                    )
                    logging.info(_add_master_replca_comm_line)
                    if do_command(host=redis_slave_ip[0],  commands=_add_master_replca_comm_line, user_name="root", user_password="Pass@word"):
                        logging.info("add replicate 成功")
                num += 1
        except IOError as e:
            return False
        return True

已运行实例

对于已运行的实例,平台目前提供启动、停止、以及qps的趋势图功能。平台的起、停功能由django rest_framework框架完成,框架中已完成了API的安全限制,非平台用户无法调用。

Redis已运行实例

Redis集群信息

redis的qps功能,目前使用的是django_crontab,尚有不完美的地方,这块代码还在优化中就不贴出来了。

如果觉得我的文章对您有用,请点赞。您的支持将鼓励我继续创作!

3

添加新评论0 条评论

Ctrl+Enter 发表

作者其他文章

相关文章

相关问题

相关资料

X社区推广