ceph message and osd process

OSD message queue

Pipe读到的能走fast dispatch的直接走fast dispatch, 不能走fast的queue起来 交给dispatch_thread处理

void Pipe::start_reader()
-> void Pipe::reader()
        if (in_q->can_fast_dispatch(m)) {
          in_q->fast_dispatch(m);
        } else {
          in_q->enqueue(m, m->get_priority(), conn_id);
	}

创建的两个thread一个用来处理queue里的请求,一个用来处理local请求

void DispatchQueue::start()
->dispatch_thread.create("ms_dispatch");
  local_delivery_thread.create("ms_local");
dispatch_thread.create("ms_dispatch");
-> void DispatchQueue::entry()
      msgr->ms_deliver_dispatch(m);
-> Messenger:: void ms_deliver_dispatch(Message *m)
-> bool OSD::ms_dispatch(Message *m)
-> void OSD::_dispatch(Message *m)
    handle_osd_map(static_cast<MOSDMap*>(m));
    handle_command(static_cast<MMonCommand*>(m));
    handle_scrub(static_cast<MOSDScrub*>(m));
    ...
    dispatch_op(op);
-> void OSD::dispatch_op(OpRequestRef op)

local_delivery_thread用来处理本地请求,同样能走fast的直接走fast,不能走fast 的queue起来,dispatch_thread来处理. local_delivery_thread处理的请求是在submit_message的时候,如果判断是local请求 通过local_delivery放到local_messages里

local_delivery_thread.create("ms_local");
-> void DispatchQueue::run_local_delivery()
-> void DispatchQueue::fast_dispatch(Message *m)
-> Messenger::  void ms_fast_dispatch(Message *m) {
-> void OSD::ms_fast_dispatch(Message *m)
-> void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
-> bool OSD::dispatch_op_fast(OpRequestRef& op, OSDMapRef& osdmap)

OSD r/w message process


=> void ShardedThreadPool::start()
-> void ShardedThreadPool::start_threads()
-> ShardedThreadPool::WorkThreadSharded::entry()
-> void ShardedThreadPool::shardedthreadpool_worker(uint32_t thread_index)
-> void OSD::ShardedOpWQ::_process(uint32_t thread_index, heartbeat_handle_d *hb )
-> void PGQueueable::RunVis::operator()(const OpRequestRef &op) {
-> void OSD::dequeue_op
-> void ReplicatedPG::do_request
-> void ReplicatedPG::do_op(OpRequestRef& op)
-> void ReplicatedPG::execute_ctx(OpContext *ctx)