bijingrui
作者bijingrui2020-01-05 22:45
系统运维工程师, 卡中心

Repoll 启动redis集群(附带源码讲解)

字数 17785阅读 787评论 0赞 2

通过Repoll redis管理平台从申请、审批到集群配置上线的全过程讲解。在过程讲解中本文将附带对平台源码进行讲解。一面看redis、一面看django和python多方面一起学习!

GitHub传送门:https://github.com/NaNShaner/repoll

Redis管理平台Repoll

集群的申请

平台对于redis集群的申请颇为简单,输入如下图所示的各项字段,点击确认即可。报错后平台将发起审批,给到领导层进行审批。

Redis 申请页面

配套的代码讲解

  1. 实例的申请使用的是django的admin功能,未加修饰。直接上代码,功能见注释。

    class ApplyRedisInfoAdmin(admin.ModelAdmin):
    def get_queryset(self, request):
    """函数作用:使当前登录的用户只能看到自己负责的实例"""
    qs = super(ApplyRedisInfoAdmin, self).get_queryset(request)
    result = qs.filter(create_user=request.user)
    if request.user.is_superuser:
    return qs
    return result

     def save_model(self, request, obj, form, change):
         """
         隐藏前端页面申请用户字段,后台自动添加用户入库
         :param request: 当前wsgi
         :param obj:
         :param form:
         :param change:
         :return:
         """
         obj.create_user = request.user
         super().save_model(request, obj, form, change)
    
     list_display = ['id', 'apply_ins_name', 'ins_disc', 'redis_type',
                     'redis_mem', 'sys_author', 'area',
                     'pub_date', 'apply_status']
     list_filter = ['redis_type']
     search_fields = ['area']
     # 不可见字段
     exclude = ['create_user']
     list_per_page = 15
    
     readonly_fields = ['apply_status', ]

集群的审批

平台对实例的审批也是十分简单,只需勾选需要申请的实例,点击“批准”或者“拒绝”即可。

Redis 审批页面

配套的代码讲解

  • 在django admin的界面中添加“批准选择的Redis实例”和“拒绝选择的Redis实例”两个按钮,详见代码如下:

  • 同时在admin界面中禁用添加、删除等按钮,该界面是用于领导审批,无需这两个按钮

  • 领导当然也可以批量勾选,一次性审批多条

    class RedisApplyAdmin(admin.ModelAdmin):
    def has_add_permission(self, request):
    """
    禁用添加按钮
    :param request:
    :return:
    """
    return False

      def has_delete_permission(self, request, obj=None):
          """
          禁用删除按钮
          :param request:
          :param obj:
          :return:
          """
          return False
    
      def get_actions(self, request):
          """
          在actions中去掉‘删除’操作
          :param request:
          :return:
          """
          actions = super(RedisApplyAdmin, self).get_actions(request)
          if request.user.username[0].upper() != 'J':
              if 'delete_selected' in actions:
                  del actions['delete_selected']
          return actions
    
      def get_queryset(self, request):
          """函数作用:使当前登录的用户只能看到自己负责的实例"""
          qs = super(RedisApplyAdmin, self).get_queryset(request)
          if request.user.is_superuser:
              return qs
          return qs.filter(create_user=RedisApply.objects.filter(create_user=request.user))
    
      list_display = ['id', 'apply_ins_name', 'ins_disc', 'redis_type',
                      'redis_mem', 'sys_author', 'area',
                      'pub_date', 'create_user', 'apply_status']
      list_filter = ['redis_type']
      search_fields = ['area']
      list_per_page = 15
    
      actions = ['approve_selected_new_assets', 'deny_selected_new_assets']
    
      def approve_selected_new_assets(self, request, queryset):
          """
          用于在申请redis的界面添加一个审批通过按钮
          :param request: Http Request实例
          :param queryset: 勾选实例名称
          :return:
          """
          # 获得被打钩的checkbox对应的Redis的id编号,用于更新数据库的主键
          selected = request.POST.getlist(admin.ACTION_CHECKBOX_NAME)
          success_upline_number = 0
          try:
              for asset_id in selected:
                  obj = ApproveRedis(request, asset_id)
                  create_redis_ins = obj.create_asset()
                  if create_redis_ins:
                      success_upline_number += 1
                      self.message_user(request, "成功批准  %s  个新Redis实例上线!" % success_upline_number)
                      obj.redis_apply_status_update(statu=3)
                  else:
                      self.message_user(request, "实例为 {0} 的实例上线失败,已存在上线实例,请检查".format(obj.redis_ins_name))
          except ValueError as e:
              self.message_user(request, "实例为 {0} 的实例上线失败,原因为{1}".format(queryset, e))
      approve_selected_new_assets.short_description = "批准选择的Redis实例"
    
      def deny_selected_new_assets(self, request, queryset):
          """
          用于在申请redis的界面添加一个审批拒绝按钮
          :param request: Http Request实例
          :param queryset: 勾选实例名称
          :return:
          """
          # 获得被打钩的checkbox对应的Redis的id编号,用于更新数据库的主键
          selected = request.POST.getlist(admin.ACTION_CHECKBOX_NAME)
          deny_upline_number = 0
          try:
              for asset_id in selected:
                  obj = ApproveRedis(request, asset_id)
                  deny_redis_ins = obj.deny_create()
                  if deny_redis_ins:
                      deny_upline_number += 1
                      obj.redis_apply_status_update(statu=4)
                      self.message_user(request, "已拒绝  %s  个新Redis实例上线!" % deny_upline_number)
                  else:
                      self.message_user(request, "操作实例为 {0} 的实例失败,已存在上线实例,请检查".format(obj.redis_ins_name))
          except ValueError as e:
              self.message_user(request, "操作实例为 {0} 的实例失败,原因为{1}".format(queryset, e))
      deny_selected_new_assets.short_description = "拒绝选择的Redis实例"
  • 点击审批同意、拒绝按钮后更新状态

在上面的代码中,如下一句即更新实例审批状态为“已审批”

obj.redis_apply_status_update(statu=3)

如下一句即更新实例审批状态为“已拒绝”

obj.redis_apply_status_update(statu=4)
  • 来看看点完“同意”后的状态,当然上线状态还是“未上线”

审批后的页面

配置集群上线

平台对于redis实例配置上线的页面是给DBA或者运维的同学准备中,对于用户的申请和领导的审批状态都是展现出来,详如下图。在实例详情的文本框中按照固定的格式输入,点击保存,即完成上线配置咯。

这块的逻辑比较复杂,代码较多,各位花点时间:

  • 逻辑实现,使用了django的信号触发,有兴趣了解django的信号功能,可以详见django的官网

    @receiver(post_save, sender=ApplyRedisText)
    def apply_redis_text_handler(sender, **kwargs):
    """
    触发器,前端页面在审批完后自动触发
    :param sender:
    :param kwargs:
    :return:
    """
    a = kwargs
    redis_ins_id = kwargs['instance'].redis_ins_id
    redis_ins_obj = RedisIns.objects.filter(id=redis_ins_id)
    redis_ins_type = redis_ins_obj.values('redis_type').first()['redis_type']
    redis_text = kwargs['instance'].apply_text
    redis_apply_text_split = redis_apply_text(redis_text, redis_type=redis_ins_type)
    redis_ins_obj_name = redis_ins_obj.values('redis_ins_name').first()
    redis_ins_obj_mem = redis_ins_obj.values('redis_mem').first()
    if redis_ins_type == 'Redis-Standalone':
    redis_ip = redis_apply_text_split['redis_ip']
    redis_port = redis_apply_text_split['redis_port']
    a = RedisStandalone(redis_ins=redis_ins_obj,
    redis_ins_name=redis_ins_obj_name,
    redis_ins_type=redis_ins_type,
    redis_ins_mem=redis_apply_text_split['redis_mem'],
    redis_ip=redis_ip,
    redis_port=redis_port)
    a.saved_redis_running_ins()
    if a.create_redis_conf_file():
    redis_start = RedisStartClass(host=redis_ip,
    redis_server_ctl="/opt/repoll/redis/src/redis-server /opt/repoll/conf/" + str(redis_port) + ".conf")
    if redis_start.start_server():
    logging.info("Redis 单实例启动成功,服务器IP:{0}, 启动端口为:{1}".format(redis_ip, redis_port))
    else:
    logging.info("Redis 单实例启动失败,服务器IP:{0}, 启动端口为:{1}".format(redis_ip, redis_port))
    elif redis_ins_type == 'Redis-Sentinel':
    b = RedisModelStartClass(model_type='Redis-Sentinel',
    redis_ins=redis_ins_obj,
    redis_master_ip_port=redis_apply_text_split['redis_master_ip_port'],
    redis_slave_ip_port=redis_apply_text_split['redis_slave_ip_port'],
    redis_master_name=redis_apply_text_split['redis_master_name'],
    redis_sentinel_ip_port=redis_apply_text_split['redis_sentinel_ip_port'],
    redis_sentinel_num=redis_apply_text_split['redis_sentinel_num'],
    sentinel_down_after_milliseconds=30000,
    sentinel_failover_timeout=180000,
    sentinel_parallel_syncs=1,
    redis_mem=redis_apply_text_split['redis_mem'])
    create_sentinel_conf_file = b.create_sentienl_conf_file()
    create_master_slave_file = b.create_maser_slave_conf()
    if create_sentinel_conf_file and create_master_slave_file:
    start_master = b.start_redis_master()
    if start_master:
    start_slave = b.start_slave_master()
    if start_slave:
    b.start_sentinel_master()
    logging.info("哨兵模式启动成功,redis_master_ip_port:{0},"
    "redis_slave_ip_port:{1},"
    "redis_sentinel_ip_port:{2},"
    "redis_master_name:{3}".format(redis_apply_text_split['redis_master_ip_port'],
    redis_apply_text_split['redis_slave_ip_port'],
    redis_apply_text_split['redis_sentinel_ip_port'],
    redis_apply_text_split['redis_master_name']))
    b.save_sentinel_redis_ins()
    elif redis_ins_type == 'Redis-Cluster':
    redis_list = []
    redis_cluster_mem_sum = 0
    for redis_one_ins in redis_apply_text_split:
    redis_one_ins_split = {"redis_master": redis_one_ins['redis_ip_port'][0],
    "redis_slave": redis_one_ins['redis_ip_port'][1:],
    "redis_mem": redis_one_ins['redis_mem']}
    redis_cluster_mem_sum += int(redis_one_ins['redis_mem'])
    redis_list.append(redis_one_ins_split)
    obj_runningins = RunningInsTime(running_ins_name=redis_ins_obj_name["redis_ins_name"],
    redis_type='Redis-Cluster',
    redis_ins_mem=redis_cluster_mem_sum,
    ins_status=0)
    obj_runningins.save()
    for redis_one_ins in redis_apply_text_split:
    for all_redis_ins in redis_one_ins['redis_ip_port']:
    if redis_one_ins['redis_ip_port'].index(all_redis_ins) == 0:
    c = RedisClusterClass(redis_ins=redis_ins_obj,
    redis_ins_name=redis_ins_obj_name,
    redis_ins_type="Redis-Master",
    redis_ins_mem=redis_one_ins['redis_mem'],
    redis_ip=all_redis_ins[0],
    redis_port=all_redis_ins[1])
    file_status = c.create_cluster_file()
    if file_status:
    c.start_all_redis_ins()
    c.save_cluster_ins()
    else:
    c = RedisClusterClass(redis_ins=redis_ins_obj,
    redis_ins_name=redis_ins_obj_name,
    redis_ins_type="Redis-Slave",
    redis_ins_mem=redis_one_ins['redis_mem'],
    redis_ip=all_redis_ins[0],
    redis_port=all_redis_ins[1])
    file_status = c.create_cluster_file()
    if file_status:
    c.start_all_redis_ins()
    c.save_cluster_ins()
    RedisIns.objects.filter(redis_ins_name=redis_ins_obj_name["redis_ins_name"]).update(on_line_status=0)
    start_cluster = StartRedisCluster(cluster_list=redis_list)
    redis_cluster_list = start_cluster.redis_cluster_list()
    start_cluster.redis_cluser_meet(redis_cluster_list)
    redis_cluster_node_info = start_cluster.get_cluster_info()
    if redis_cluster_node_info:
    start_cluster.add_slot_2_master(redis_cluster_node_info)
    else:
    logging.error("redis cluster 启动失败,集群信息为{0}".format(redis_list))

  • 功能实现,

  1. 在每台服务器中生成redis实例的配置文件

  2. 按照配置文件启动redis的实例

  3. redis集群所有节点进行握手

  4. 按照输入的master节点分配redis的slot(当前只支持3主3从,受限于slot的计算)

  5. 按照输入的master节点,完成redis主从关系的确立

    class RedisClusterClass:

     def __init__(self, redis_ins, redis_ins_name, redis_ins_type, redis_ins_mem, redis_ip, redis_port):
         """
         单个redis的实例进行配置文件生成、配置文件分发、进程启动
         :param redis_ins:
         :param redis_ins_name:
         :param redis_ins_type:
         :param redis_ins_mem:
         :param redis_ip:
         :param redis_port:
         """
         self.redis_ins = [r.__dict__ for r in redis_ins]
         self.redis_ins_name = redis_ins_name['redis_ins_name']
         self.redis_ins_type = redis_ins_type
         self.redis_ins_mem = redis_ins_mem
         self.redis_ip = redis_ip
         self.redis_port = redis_port
    
     def create_cluster_file(self):
         """
         创建redis实例的配置文件,并分发到资源池服务器指定的目录下。分发文件支持ssh免密和用户密码校验
         使用用户密码校验目前是硬编码,后续优化支持读库
         :return:
         """
         redis_conf = get_redis_conf("Redis-Standalone")
         redis_cluster_conf = get_redis_conf("Redis-Cluster")
         all_redis_conf = [conf_k_v.__dict__ for conf_k_v in redis_conf]
         all_cluster_conf = [conf_k_v.__dict__ for conf_k_v in redis_cluster_conf]
         conf_file_name = "{0}/templates/".format(TEMPLATES_DIR) + str(self.redis_port) + "-cluser.conf"
         with open(conf_file_name, 'w+') as f:
             for k, v in all_redis_conf[0].items():
                 if k != 'id' and k != 'redis_version' and k != 'redis_type':
                     if isinstance(v, str) or isinstance(v, int):
                         k, v = regx_redis_conf(key=k, value=v, port=self.redis_port,
                                                maxmemory=mem_unit_chage(self.redis_ins_mem))
                         f.write(k + " " + str(v) + "\n")
             for k, v in all_cluster_conf[0].items():
                 if k != 'id' and k != 'redis_version' and k != 'redis_type':
                     if isinstance(v, str) or isinstance(v, int):
                         k, v = regx_redis_conf(key=k, value=v, port=self.redis_port,
                                                maxmemory=mem_unit_chage(self.redis_ins_mem),
                                                kwargs={"redis_port": self.redis_port})
                         f.write(k + " " + str(v) + "\n")
         if do_scp(self.redis_ip, conf_file_name, "/opt/repoll/conf/" + str(self.redis_port) + "-cluster.conf",
                   user_name="root", user_password="Pass@word"):
             logging.info("目标服务器{0}文件分发成功".format("/opt/repoll/conf/" + str(self.redis_port) + "-cluster.conf"))
         else:
             logging.error("目标服务器{0}文件分发失败".format("/opt/repoll/conf/" + str(self.redis_port) + "-cluster.conf"))
             return False
         return True
    
     def start_all_redis_ins(self):
         redis_start = RedisStartClass(host=self.redis_ip,
                                       redis_server_ctl="/opt/repoll/redis/src/redis-server /opt/repoll/conf/" +
                                                        str(self.redis_port) + "-cluster.conf")
         if redis_start.start_server():
             logging.info("redis 实例{2}启动成功,ip:port: {0}:{1}".format(self.redis_ip, self.redis_port, self.redis_ins_name))
             return True
         else:
             logging.info("redis 实例{2}启动失败,ip:port: {0}:{1}".format(self.redis_ip, self.redis_port, self.redis_ins_name))
             return False
    
     def save_cluster_ins(self):
         obj_runningins_now = RunningInsTime.objects.all().get(running_ins_name=self.redis_ins_name)
         obj = RunningInsCluster(
             running_ins_name=self.redis_ins_name,
             redis_type=self.redis_ins_type,
             running_ins_port=self.redis_port,
             redis_ip=self.redis_ip,
             redis_ins_mem=self.redis_ins_mem,
             running_ins_standalone=obj_runningins_now
         )
         obj.save()

    class StartRedisCluster:

     def __init__(self, cluster_list):
         """
         启动Redis Cluster
         :param cluster_list: 所有redis实例的IP及PORT
         """
         if isinstance(cluster_list, list):
             self.cluster_list = cluster_list
    
     def redis_cluster_list(self):
         """
         格式化所有redis实例的IP及PORT
         :return: list
         """
         redis_list = []
         for redis_one_list in self.cluster_list:
             redis_list.append(redis_one_list['redis_master'])
             for redis_slave in redis_one_list['redis_slave']:
                 redis_list.append(redis_slave)
         return redis_list
    
     def redis_cluser_meet(self, redis_ins_list):
         """
         redis集群内所有节点完成cluster meet
         :param redis_ins_list: 入参为格式化后的集群节点信息
         :return:
         """
         if isinstance(redis_ins_list, list):
             redis_ins_list_copy = copy.deepcopy(redis_ins_list)
             i = 0
             try:
                 if i < len(redis_ins_list_copy):
                     i += 1
                     redis_ins_one = redis_ins_list_copy.pop(0)
                     redis_ins_list_copy.append(redis_ins_one)
                     redis_ins_one_ip = redis_ins_one[0]
                     redis_ins_one_port = redis_ins_one[1]
                     for redis_ins_one_by_one in redis_ins_list_copy:
                         redis_ins_one_by_one_ip = redis_ins_one_by_one[0]
                         redis_ins_one_port_port = redis_ins_one_by_one[1]
                         comm_line = "/opt/repoll/redis/src/redis-cli -c -h {0} -p {1} cluster meet {2} {3}".format(
                             redis_ins_one_ip, redis_ins_one_port, redis_ins_one_by_one_ip, redis_ins_one_port_port
                         )
                         if do_command(host=redis_ins_one_ip, commands=comm_line, user_name="root", user_password="Pass@word"):
                             logging.info("{0}:{1} cluster meet {2}:{3} is ok".format(
                                 redis_ins_one_ip, redis_ins_one_port, redis_ins_one_by_one_ip, redis_ins_one_port_port))
             except Exception as e:
                 logging.error("Redis Cluster 启动失败,涉及节点为{0},报错信息为{1}".format(self.cluster_list, e))
    
     def get_cluster_info(self):
         """
         获取cluster nodes信息,用于后续的主从关系添加
         :return: 所有节点的node id及ip对应关系
         """
         node_dict = {}
         redis_cluster_ip_port = self.cluster_list[0]
         _comm_line = "/opt/repoll/redis/src/redis-cli -c -h {0} -p {1} cluster nodes ".format(redis_cluster_ip_port['redis_master'][0], redis_cluster_ip_port['redis_master'][1])
         _comm_result = do_command(host=redis_cluster_ip_port['redis_master'][0], commands=_comm_line, user_name="root", user_password="Pass@word")
         if _comm_result[0] == 0:
             _comm_result_list = _comm_result[1].decode('unicode-escape')
             _comm_result_list = _comm_result_list.split("\n")
             node_dict = {one_line.split()[1]: one_line.split()[0] for one_line in _comm_result_list if one_line}
         if node_dict:
             return node_dict
         else:
             return None
    
     def add_slot_2_master(self, cluster_node_info):
         """
         给主节点添加slot,并完成主从节点关系
         :param cluster_node_info: 入参为所有节点的node id和ip对应关系
         :return:
         """
         _slots = ["{0..5461}", "{5462..10922}", "{10923..16383}"]
         num = 0
         try:
             for redis_ip_port in self.cluster_list:
                 redis_master_ip = redis_ip_port['redis_master']
                 _add_slot_comm_line = "/opt/repoll/redis/src/redis-cli -c -h {0} -p {1} cluster addslots {2}".format(redis_master_ip[0],
                                                                                                                      redis_master_ip[1],
                                                                                                                      _slots[num])
                 if do_command(host=redis_master_ip[0], commands=_add_slot_comm_line, user_name="root", user_password="Pass@word"):
                     logging.info("add slot 成功")
                 for redis_slave_ip in redis_ip_port['redis_slave']:
                     _add_master_replca_comm_line = "/opt/repoll/redis/src/redis-cli -c -h {0} -p {1} cluster replicate {2}".format(
                         redis_slave_ip[0],
                         redis_slave_ip[1],
                         cluster_node_info['{0}:{1}'.format(redis_master_ip[0], redis_master_ip[1])]
                     )
                     logging.info(_add_master_replca_comm_line)
                     if do_command(host=redis_slave_ip[0],  commands=_add_master_replca_comm_line, user_name="root", user_password="Pass@word"):
                         logging.info("add replicate 成功")
                 num += 1
         except IOError as e:
             return False
         return True

已运行实例

对于已运行的实例,平台目前提供启动、停止、以及qps的趋势图功能。平台的起、停功能由django rest_framework框架完成,框架中已完成了API的安全限制,非平台用户无法调用。

Redis已运行实例

Redis集群信息

redis的qps功能,目前使用的是django_crontab,尚有不完美的地方,这块代码还在优化中就不贴出来了。

如果觉得我的文章对您有用,请点赞。您的支持将鼓励我继续创作!

2

添加新评论0 条评论

Ctrl+Enter 发表

作者其他文章

关于TWT  使用指南  社区专家合作  厂商入驻社区  企业招聘  投诉建议  版权与免责声明  联系我们
© 2020  talkwithtrend — talk with trend,talk with technologist 京ICP备09031017号-30