• 欢迎访问web前端中文站,JavaScript,CSS3,HTML5,web前端demo
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏web前端中文站吧

TiDB show processlist命令源码分析

JavaScript web前端中文站 7个月前 (04-17) 352次浏览 已收录 0个评论

背景

因为丰巢自去年年底开始在推送平台上尝试了 TiDB,最近又要将承接丰巢所有交易的支付平台切到 TiDB 上。我本人一直没有抽出时间对 TiDB 的源码进行学习,最近准备开始一系列的学习和分享。由于我本人没有数据库相关的经验,本着学习的心态和大家一起探讨,欢迎高手随时指正。总结一下本次学习分享的目的:

更多精彩内容请看 web 前端中文站
http://www.lisa33xiaoq.net 可按 Ctrl + D 进行收藏

  • 丰巢把最重要的两个基础业务都放到了 TiDB 上,后续应该会有更多的核心系统跑在 TiDB 上,我们丰巢中间件团队作为引入 TiDB 到丰巢的推动人和执行者,对于 TiDB 的稳定性和突发事件的处理,一定要做足功课;
  • 以 TiDB 为代表的 newsql 代表的是现在和未来,作为个人来说,有着充足的动力去学习;
  • 我们不满足于只是作为 TiDB 的使用者,我们需要在 TiDB 上定制开发对于丰巢更有意义的模块,如果能给社区做贡献,那更是非常棒的一件事;

言归正传,说一下本文的产生原因:去年我们在推送平台上使用 TiDB 的过程中,就发现老版本的 TiDB 是无法通过外部手段 kill 调用慢查询的,而慢查询的危害对于数据库来说会有致命的风险,后来 pingcap 公司在 2.1 版本(具体的版本参见 TiDB 的说明)中增加了 show processlist 和 kill tidb 命令,但是因为 TiDB 本身是无状态的,这两个命令属于单机命令,在使用的过程中,大家还是要提前做好准备,要直连到具体的 TiDB 的 server 上才可使用,不要通过 nginx 等服务进行转发请求,到时不但不能解决问题,还有可能带来意外的风险。今天第一章,我们先来看一下 show processlist 这个比较简单的命令的源码,下一章,我们再分析 kill tidb 这个命令。

源码分析

环境信息

  • 软件:TiDB2.1.7、PD2.1.4、TiKV2.1.4;
  • 硬件:为了随时调试,TiDB 跑在本机的 mac 上、PD 和 TiKV 跑在 linux 虚拟机上;

操作过程

  • 打开一个直连 TiDB 的客户端,输入命令:show PROCESSLIST;
  • 客户端会输出下图的列表;

TiDB show processlist 命令源码分析

上面的列表中展示了当前 TiDB 正在处理每个连接的 sql 语句详情。

问题

在我分析源码之前,我问了自己本次分析源码要搞清楚的两个问题,在这里和大家分享一下:

  • show processlist 到底是不是单机的命令,和 TiKV、PD 有没有啥关系?
  • kill tidb 需要使用的 id 字段到底代表的是什么?

接收命令

首先,启动 TiDB server.代码在 tidb-server/main.go 里面,主要方法是:runServer 方法

func runServer() {  err := svr.Run() } 

再来看一下:server/server.go 源码:

func (s *Server) Run() error {  for {   conn, err := s.listener.Accept()   go s.onConn(conn)  } } 

重点代码是监听端口,并创建连接,启动另一协程去服务新来的连接,接下来再看看 server.go 中的 onConn 方法:

func (s *Server) onConn(c net.Conn) {  conn := s.newConn(c)  conn.Run() } 

其中,s.newConn 方法会将 net. Conn 连接包装成 clientConn 连接,并分配在这个 TiDB server 下唯一的 connectionID,此 connectionID 为原子变量,每次新连接自增加 1,我们先记住这个 id,后面分析的时候会用到它。我们来看看 server/conn.go 下的 Run 方法:

func (cc *clientConn) Run() {  for {   data, err := cc.readPacket()   cc.dispatch(data)   } } 

Run 方法主要就是不断的轮训读取 clientConn 中的内容,并将它交给 dispatch 方法进行下面的分析及返回结果操作,至此关于接收 show processlist 命令部分已经分析完毕,当然其它的 sql 语句也是经过这个过程进入到 dispatch 方法中的。

show processlist 的构建 Executor

接着分析 dispatch 方法在处理 show processlist 命令的流程:

func (cc *clientConn) dispatch(data []byte) error {  switch cmd {  case mysql.ComQuery:    return cc.handleQuery(ctx1, hack.String(data))  } } 

show processlist 命令属于 mysql.ComQuery,因此流程会走到 handleQuery 方法里面,我们来看一下:

func (cc *clientConn) handleQuery(ctx context.Context, sql string) (err error) {  rs, err := cc.ctx.Execute(ctx, sql)  err = cc.writeResultset(ctx, rs[0], false, 0, 0) } 

handleQuery 中处理 show processlist 命令的重点代码就是上面的两行,我们先来看一下 server/driver_tidb.go 中的 Execute 方法:

rsList, err := tc.session.Execute(ctx, sql) 

Execute 中的重点就是调用 session/session.go 中的 Execute 方法:

func (s *session) execute(ctx context.Context, sql string) (recordSets []sqlexec.RecordSet, err error) {  s.PrepareTxnCtx(ctx)  stmtNodes, warns, err := s.ParseSQL(ctx, sql, charsetInfo, collation)  compiler := executor.Compiler{Ctx: s}  for _, stmtNode := range stmtNodes {   recordSets, err = s.executeStatement(ctx, connID, stmtNode, stmt, recordSets);   } } 

上面的 execute 方法中会对 sql 语句进行处理及制定执行计划,处理完成后调用 executeStatement 方法,executeStatement 中的重点方法是 runStmt:

recordSet, err := runStmt(ctx, s, stmt) 

我们再来看看 session/tidb.go 中的 runStmt 方法:

func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) (sqlexec.RecordSet, error) {  rs, err = s.Exec(ctx)  err = finishStmt(ctx, sctx, se, sessVars, err) } 

继续来分析 executor/adapter 中的(a *ExecStmt) Exec 方法,一样采取划重点的方式:

func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {     e, err := a.buildExecutor(sctx)     e.Open(ctx)  var pi processinfoSetter  if raw, ok := sctx.(processinfoSetter); ok {   pi = raw   sql := a.OriginText()   if simple, ok := a.Plan.(*plannercore.Simple); ok && simple.Statement != nil {    if ss, ok := simple.Statement.(ast.SensitiveStmtNode); ok {     // Use SecureText to avoid leak password information.     sql = ss.SecureText()    }   }   // Update processinfo, ShowProcess() will use it.   pi.SetProcessInfo(sql)   //fmt.Println(sql)   a.Ctx.GetSessionVars().StmtCtx.StmtType = GetStmtLabel(a.StmtNode)  }       return &recordSet{   executor:    e,   stmt:        a,   processinfo: pi,   txnStartTS:  txnStartTS,  }, nil } 

(a *ExecStmt) Exec 方法中 raw, ok := sctx.(processinfoSetter)这段逻辑就是把当前连接正在执行的语句存储到 processinfo 里面取,关于这部分细节比较简单,在这里不展开来分析。我们先来看看 buildExecutor 中做了什么事情?

 b := newExecutorBuilder(ctx, a.InfoSchema)  e := b.build(a.Plan) 

重点要来了,在 executor/builder.go 中的 build 方法做了啥事?

 case *plannercore.Show:   return b.buildShow(v) 

build 方法会根据不同的语句类型来构建不同的 Executor 并返回,show processlist 命令会匹配到 plannercore.Show 类型,我们看看 buildShow 方法的实现:

 e := &ShowExec{   baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),   Tp:           v.Tp,   DBName:       model.NewCIStr(v.DBName),   Table:        v.Table,   Column:       v.Column,   User:         v.User,   Flag:         v.Flag,   Full:         v.Full,   GlobalScope:  v.GlobalScope,   is:           b.is,  }  if len(v.Conditions) == 0 {   return e  }  sel := &SelectionExec{   baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), e),   filters:      v.Conditions,  }  return sel 

因为 v.Conditions 为 0,所以返回类型为 ShowExec 的 Executor,我们接下来再刚才的 Exec 方法中的 e.Open 方法,其实就是 ShowExec 的 Open 方法,ShowExec 位于 executor/show.go 文件中,我们查找后发现 ShowExec 中没有 Open 方法,我当时是被搞蒙了,后来发现这是 go 的一个语言特性,它使用的是 baseExecutor 的 Open 方法:

func (e *baseExecutor) Open(ctx context.Context) error {  for _, child := range e.children {   err := child.Open(ctx)   if err != nil {    return errors.Trace(err)   }  }  return nil } 

上面的方法会遍历 baseExecutor 中的 children 的 Executor,然后调用它们的 Open 方法,但是因为 ShowExec 在创建它的 baseExecutor 的时候,没有任何的 children,所以在 show processlist 这个操作过程中,Open 方法相当于啥也没干,但是大家在分析其它语句时,这个 Open 方法是一个很重要的方法。我们再来看刚才 Exec 中的最后的 return 块里面,返回了包装 executor、processinfo 等信息的 recordSet 类型。至此关于 show processlist 命令如何包装成 Executor 并和 processinfo 等信息作为 recordSet 类型的返回值返回给上层函数分析完毕。

show processlist 的获取各个连接的 processinfo 信息

接下来我们再来看 handleQuery 中的 writeResultset 方法:

err = cc.writeResultset(ctx, rs[0], false, 0, 0) 

在 server/conn.go 中的 writeResultset 主要的逻辑就是下面的逻辑:

err = cc.writeChunks(ctx, rs, binary, serverStatus) 

我们继续来分析 writeChunks 中的重要部分:

func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool, serverStatus uint16) error {  for {   err := rs.Next(ctx, chk)  } } 

writeChunks 里面主要就是循环调用 rs.Next 的方法,直到满足条件为止,rs 的类型实际上是 server/driver_tidb.go 下的 tidbResultSet 类型,我们来看一下它的 Next 方法:

func (trs *tidbResultSet) Next(ctx context.Context, chk *chunk.Chunk) error {  return trs.recordSet.Next(ctx, chk) } 

tidbResultSet 的 Next 方法主要是调用了 executor/adapter.go 中的 recordSet 类型的 Next 方法,我们来看看这个 Next 方法:

func (a *recordSet) Next(ctx context.Context, chk *chunk.Chunk) error {  err := a.executor.Next(ctx, chk) } 

recordSet 方法的重点就是调用它的 executor 的 Next 方法,我们在上一个小节 结尾处分析出 recordSet 的 executor 就是之前生成的 ShowExec(可算是找到它了,我已经累晕)。那么,我们接着分析它的 Next 方法:

e.fetchAll() 

ShowExec 中的 Next 方法的主要逻辑就是调用它的 fetchAll 方法,接着往下看:

case ast.ShowProcessList:   return e.fetchShowProcessList() 

因为匹配到了这个 case,所以会调用它的 fetchShowProcessList 方法:

func (e *ShowExec) fetchShowProcessList() error {  sm := e.ctx.GetSessionManager()  pl := sm.ShowProcessList() } 

上面的 sm 类型的 server/server.go 中的 Server 类型,我们来看看它的 ShowProcessList 方法:

func (s *Server) ShowProcessList() map[uint64]util.ProcessInfo {  s.rwlock.RLock()  rs := make(map[uint64]util.ProcessInfo, len(s.clients))  for _, client := range s.clients {   if atomic.LoadInt32(&client.status) == connStatusWaitShutdown {    continue   }   pi := client.ctx.ShowProcess()   rs[pi.ID] = pi  }  s.rwlock.RUnlock()  return rs } 

它主要是遍历当前所有的客户端,并获取到所有客户端的 ShowProcess,其中的 client.ctx 类型为 server.TiDBContext,我们来看看它的 ShowProcess:

func (tc *TiDBContext) ShowProcess() util.ProcessInfo {  return tc.session.ShowProcess() } 

逻辑比较简单,就是调用类型为 session.session 的 ShowProcess 方法,接着往下看:

func (s *session) ShowProcess() util.ProcessInfo {  var pi util.ProcessInfo  tmp := s.processInfo.Load()  if tmp != nil {   pi = tmp.(util.ProcessInfo)   pi.Mem = s.GetSessionVars().StmtCtx.MemTracker.BytesConsumed()  }  return pi } 

session 的 ShowProcess 方法会从内存中加载当前 session 的 processInfo 信息。至此我们分析 show processlist 命令的源码分析完毕,关于每个连接如何设置自身的 processinfo 信息,逻辑也比较简单,大家有兴趣可以自己去研究一下。

总结

我们可以回答一下开头提出的两个问题:

  • show processlist 到底是不是单机的命令,和 TiKV、PD 有没有啥关系?答案是 show processlist 确实是一个单机命令,和 TiKV、PD 没有任何关系。
  • kill tidb 需要使用的 id 字段到底代表的是什么?id 字段就是在创建连接时,分配的 connectionId,它在单个 TiDB 服务内唯一。

通过上面的分析,我们还可以总结以下的特点:

  • TiDB 的连接在客户端不能够复用,因为它处理请求时,主流程是在单协程中处理的,处理完一个再处理下一个;
  • show processlist 命令的处理中关于 ShowExec 的 Open 方法调用,其实是它内部的 baseExecutor 的 Open 方法;
  • 每个连接的 session 负责独立管理此连接的 processinfo 信息;
  • TiDB 的 Executor 机制靠 next 的方式不断在它的链式处理结构上传递;
  • show processlist 因为没有其它条件,所以它在处理时的 Executor 类型为 ShowExec,没有再包装 SelectionExec 类型;
  • 真正的语句执行(获取 show processlist 的信息)其实是在 write 的时候,我在分析这点的时候,花了不少时间;

源码阅读方法

最后,我想和大家分享一下,我自己在源码阅读里面用到的一些方法和技巧,大的方面会有两种方法:

  • 由因导果:就是由某一行代码,开始自顶向下的正向阅读;
  • 执果索因:就是从结果处出发,开始自底向上的反向阅读和推导;

上面的两种方法,会伴随大家在源码阅读的各个阶段,但是有了这两种方法还是远远不够的,我再分享一下我的相关技巧:

【注:本文源自网络文章资源,由站长整理发布】

  • 编译运行:当我们在下定决心要阅读某个框架的源码时,第一步要做的就是,将这个框架的源码从源码库拉下来后,用我们的 IDE 工具编译运行起来,对于有些框架运行的难度会比较高,就比如说我这次选择的 TiDB,在编译过程中花费了好多的时间。建议大家在这个过程中,不要放弃,第一步是一定要把它编译运行起来;
  • 资料参考:一般来讲只要不是太冷门的组件,一般网上都会有比较多的源码分享,我们需要甄别出写的好的源码分析资料,然后参考验证我们的源码阅读;
  • 重要类的结构关系图整理:我们都知道,java 体系的组件(golang 的也一样),在设计时都会有各种复杂接口和抽象类继承关系,在阅读源码时,我们很容易便陷入到这种复杂的继承关系中去,所以利用 IDE 工具绘出类的结构关系图,会在我们阅读源码时,有很大的帮助;
  • 掌握调试技巧:有较好的调试技巧可以便于我们分析代码流程和上下游关系;
  • 修改源码:在我们不能完全确定流程分支等情况下,可以靠修改源码去理解;
  • 提问题:在本文分析 show processlist 源码的过程中,提问题一直都伴随着我们的源码阅读过程,提问题能让我们更好的理解背后的含义,便于深入到源码的架构设计中去;
  • 聚焦:对于类似于 TiDB 如此复杂的组件,我们在一开始分析的过程中,一定要先选定分析的主线路,比如:本章的“show processlist”,在这个过程中,有意的忽略我们本次分析主线路之外的逻辑分支,目标明确,才能不会陷入到框架各种复杂的设计中去;
  • 总结分享:这一点是最重要的,源码阅读完后,如果不进行总结,过一段时间,我们便很容易遗忘了,同时分享也很重要,开源软件本身就是一种众包思想,我们既然是受益方,同时也要通过知识分享回馈他人;

web 前端中文站 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:TiDB show processlist 命令源码分析
喜欢 (0)
发表我的评论
取消评论
表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址