App服务器一直都是在后台默默的接受,处理那些乱七八糟的请求。一般开发很少会接触到这些方面的使用,除非是为了优化服务器才会去配置Puma。使用的时候也只是大概了解Puma接受的一些请求转化到Rails App所做的一些工作。但是对于Puma中的请求转化和处理一直比较迷惑,现在有时间,看了一下源码,学习总结了下Puma的实现过程,原理和之前产生的一些疑问。其中单进程模式介绍的比价详细,这种模式是集群模式的特例,只不过集群模式创建了多几个woker而已,其中对应请求的处理其实是一样的。下面的Puma版本是3.12.0。
启动过程
- 合并传入的参数和设置默认参数
- 初始化
Launcher
类,其中会根据workers的数量去决定初始化集群模式还是单进程模式 - 执行
@launcher.run
,其中会执行@runner.run
方法,这个方法是Single类中的run方法 - 接着调用
load_and_bind
=》ConfigMiddleware.new(self, found)
然后创建一个TCPServer监听端口 - write pid到.pid文件中
- 调用
start_server
方法,初始化一个Server server.run.join
=>run_lopez_mode
其中new一个线程池,线程池的概念是在进程中创建一定数量的线程,然后不断的去领取任务执行,其中的任务就是一个请求。=> 然后到调用handle_servers_lopez_mode
这个方法是为了调用IO.select sockets
去监听socket是否写入数据了,如果有数据写入socket了就把其中的数据放到client中,然后加入到线程池的任务中@todo << work
- 上面调用
run_lopez_mode
是tcp的一种方式,还有一种是http模式,这种需要解析头部,body和返回一些信息,如果body那些没准备好,会加入到Reactor中去,但是上面的tcp模式不需要这步处理,直接 执行@app.call env, client.to_io
了。接着会调用process_client client, buffer
去执行一些请求信息。直到status, headers, res_body = @app.call(env)
这种情况去调用到Rails App
,基本就是这个过程了。其实到block.call(env)
这里只是执行了新建线程池的时候的块,其实在块里处理的是另外一件事,需要检查那个请求的数据解析还有其它资源是否准备好才可以继续执行,这里涉及到了Reactor - 如果是queue_requests 的方式, 接着会new一个Reactor实例,然后新开一个线程,通过select(2)系统调用去监听pipe中的ready是否有值写入,或者监听其中的sockets中的client变化,如果是ready值发生了变化,则通过信号值分别执行对应的地方,如果是”*“信号,则是添加client到@sockets中;如果是解析出现timeout的client有变化了,而且请求那些已经准备好了,就加入到threadpool中去
- 然后如果
process_now
可以了就执行handle_request
直到@app.call(env)
,这才是整个过程
Reactor类详解
启动的时候会调用 @reactor.run_in_thread
,接着调用 run_internal
,下面是 run_internal
方法的解析:
sockets = @sockets
while true
begin
ready = IO.select sockets, nil, nil, @sleep_for
rescue IOError => e
Thread.current.purge_interrupt_queue if Thread.current.respond_to? :purge_interrupt_queue
if sockets.any? { |socket| socket.closed? }
STDERR.puts "Error in select: #{e.message} (#{e.class})"
STDERR.puts e.backtrace
sockets = sockets.reject { |socket| socket.closed? }
retry
else
raise
end
end
@sockets初始化时只有pipe的一个读端@read一个元素,这个的作用是在线程的无限循环中阻塞时可以在@trigger写入值时读到这个值,从而唤醒线程继续执行。这些是通过系统级的调用 IO.select
来实现的
if ready and reads = ready[0]
reads.each do |c|
if c == @ready
@mutex.synchronize do
case @ready.read(1)
when "*"
sockets += @input
@input.clear
when "c"
sockets.delete_if do |s|
if s == @ready
false
else
s.close
true
end
end
when "!"
return
end
end
当调用 add
方法时,其中会把 Puma::Client
实例添加到 @input中,然后往 @trigger中写入”*“值,然后上面的 ready 值的第一个元素就是那个值,然后就把 Puma::Client
实例添加到sockets中去,这是sockets中就会有两个元素了,由于 Puma::Client
实例是第一个实例,而且这个实例可以通过 to_io
实例化为 IO 对象,所以可以作为 select(2) 的监控对象。由于主线程从监听的socket那里得到了写入的io流,而且是通过 accept_nonblock
的方式去接受数据的,就是非阻塞IO的方式,通过不断的去轮询。如果系统缓存区数据准备好了,就拷贝到进程内存中。 而且还是未读的状态,所以select(2)会从中取出字符流,如下所示:
else
# We have to be sure to remove it from the timeout
# list or we'll accidentally close the socket when
# it's in use!
if c.timeout_at
@mutex.synchronize do
@timeouts.delete c
end
end
begin
if c.try_to_finish
@app_pool << c
sockets.delete c
end
# Don't report these to the lowlevel_error handler, otherwise
# will be flooding them with errors when persistent connections
# are closed.
rescue ConnectionError
c.write_500
c.close
sockets.delete c
# SSL handshake failure
rescue MiniSSL::SSLError => e
@server.lowlevel_error(e, c.env)
ssl_socket = c.io
addr = ssl_socket.peeraddr.last
cert = ssl_socket.peercert
c.close
sockets.delete c
@events.ssl_error @server, addr, cert, e
# The client doesn't know HTTP well
rescue HttpParserError => e
@server.lowlevel_error(e, c.env)
c.write_400
c.close
sockets.delete c
@events.parse_error @server, c.env, e
rescue StandardError => e
@server.lowlevel_error(e, c.env)
c.write_500
c.close
sockets.delete c
end
end
end
end
检测client中的header和body中的数据是否解析准备好,如果准备好就添加到线程池中去执行,如果没有就需要处理错误了。
unless @timeouts.empty?
@mutex.synchronize do
now = Time.now
while @timeouts.first.timeout_at < now
c = @timeouts.shift
c.write_408 if c.in_data_phase
c.close
sockets.delete c
break if @timeouts.empty?
end
calculate_sleep
end
end
end
这步是处理timeout的情况的。
Cluster Mode(这种方式间的Pipe不是很明白)
- 这一步的每个woker的执行步骤和上面单进程模式差不多,多出来的是创建了多个woker,而且master进程和那些woker间的通信。
- 创建三组pipe
read, @wakeup = Puma::Util.pipe (1)
@check_pipe, @suicide_pipe = Puma::Util.pipe (2)
@master_read, @worker_write = read, @wakeup (3)
- 在每个worker中起一个线程去监听管道
IO.select [@check_pipe]
,监听@check_pipe是否有值写入 - 在主进程中会有exception或有总段的时候,会有个ensure的执行,有一个
@check_pipe.close
的操作,这时woker中的进程接收到这个消息时就会退出进程。这是(2)中值的用法。 @master_read, @worker_write
这一组进程是在woker执行start_server
之后,然后向@woker_write
写了内容@worker_write << "b#{Process.pid}\n"
。还有另外一个地方写入了值,每个worker会另外开一个线程每五秒钟向主进程报告一次进程的状态。- 而
read和@wakeup
这组只有 @wakeup是在主线程中写入!
指令去用于wakeup - 其实主进程没有处理请求,只有多个子进程监控了socket,然后系统分配给进程执行了,cluster和single模式的区别就是在这里。
worker 有多个时有preload和没有的区别
由于每次fork 一个worker都会调用一次start_server的操作,start_server主要是为了启动线程池并且监听socket的变化,方便把任务加入线程池。但是启动每次start_server的时候都会去执行一次 @launcher.config.app
去加载app的操作,这个操作每次fork进程的时候都会去执行,这样就增加很多内存。也就是说每次开始fork代码的时候,其实@app都是nil,所以在fork之后执行start_server的时候都会去分配内存,执行一次 @launcher.config.app
并赋值的操作,这样会很花内存。所以如果有了preload_app!
这个执行,在没有fork其它进程的时候都会去初始化app并赋值到@app,每个fork主进程后的子进程都用的同一个@app,然后在后面@app中的内容有变化的时候再利用系统的 copy-on-write 的功能去复制出来再改变,这样就可以达到节省很多内存的目的。
def app
@app ||= @launcher.config.app
end
线程池的理解
线程池就是在一个进程中创建多个线程,然后把任务添加到线程池的@todo中的,让线程池来分配线程去处理任务,一般这些任务是指网络请求,主要流程如下:
- 初始化线程池的时候会用最小的线程配置数量去生成线程,通过调用
spawn_thread
去生成线程。
def spawn_thread
@spawned += 1
th = Thread.new(@spawned) do |spawned|
# Thread name is new in Ruby 2.3
Thread.current.name = 'puma %03i' % spawned if Thread.current.respond_to?(:name=)
todo = @todo
block = @block
mutex = @mutex
not_empty = @not_empty
not_full = @not_full
extra = @extra.map { |i| i.new }
while true
work = nil
continue = true
mutex.synchronize do
while todo.empty?
if @trim_requested > 0
@trim_requested -= 1
continue = false
not_full.signal
break
end
if @shutdown
continue = false
break
end
@waiting += 1
not_full.signal
not_empty.wait mutex
@waiting -= 1
end
work = todo.shift if continue
end
break unless continue
if @clean_thread_locals
ThreadPool.clean_thread_locals
end
begin
block.call(work, *extra)
rescue Exception => e
STDERR.puts "Error reached top of thread-pool: #{e.message} (#{e.class})"
end
end
mutex.synchronize do
@spawned -= 1
@workers.delete th
end
end
@workers << th
th
end
设置线程的名字,在块中初始化一些线程中才可以用的变量,防止在多个线程中共享同一个实例变量。但是在这里会有个疑问,就是如果每个线程在初始化的时候都会把任务@todo复制到本地变量,那如果@todo中有任务了,怎么知道并且去执行呢?其实是下面的 ,上面的理解错误,其实在这里用局域变量并不是为了防止实例变量被多个线程共享,因为每个线程的创建都是在要求在@mutex中使用,不会出现rate condition的情况。在这里这样用是为了提高性能。其实下面的代码中不会出现死循环,因为todo只是指向@todo的一个指针,当@todo变化了,not_empty.wait mutex
执行了等待的操作,在等not_empty这个变量发送signal信号,所以not_empty其实是@todo的信号发送器,在往threadpool中添加work的时候执行 @not_empty.signal
,这时线程就不会继续等待,线程继续执行。todo.empty?
就为false了。跳出循环后执行 work = todo.shift
从todo中取出一个任务来执行。最后调用 block.call(work, *extra)
执行ThreadPool.new中接的块的代码
while todo.empty?
if @trim_requested > 0
@trim_requested -= 1
continue = false
not_full.signal
break
end
if @shutdown
continue = false
break
end
@waiting += 1
not_full.signal
not_empty.wait mutex
@waiting -= 1
end
涉及到的多个循环等待
- 主线程在
Puma::Server
类中启动一个服务时,会在handle_servers
方法中用无限循环的方式,通过IO.select
的方式去监控socket的变化,如果有数据进来,会通过accept_nonblock
无阻塞IO的方式来进行接受数据,从而创建一个client的处理对象,然后把client加入到线程池的@todo中去。 - 在初始化线程池时,会通过
spawn_thread
方法去创建线程,每个线程里都会有个无限循环块,块里主要是处理@todo中的变量的,当@todo中的任务分配给线程时,线程就会去检查任务的头部和body是否准备好,如果准备好就执行,如果没有就放入到Reactor中去等待。 - Reactor里也有一个无限循环,这个循环是在puma启动时判断是否用
queue_requests
这种方式,如果是这种方式,则新建一个Reactor实例,然后调用run_in_thread
新建了一个线程,然后用无限循环的方式去监听看是否有任务加入到Reactor中,或者Reactor中的client是否有需要读的,如果有就加入到线程池中去。
有 queue_requests
和没有的区别
在 Puma::Server
中新建ThreadPool时,其中的block中有如下的判断:
if queue_requests
process_now = client.eagerly_finish
else
client.finish
process_now = true
end
eagerly_finish 和 finish的区别如下:
def eagerly_finish
return true if @ready
return false unless IO.select([@to_io], nil, nil, 0)
try_to_finish
end
def finish
return true if @ready
until try_to_finish
IO.select([@to_io], nil, nil)
end
true
end
eagerly_finish 方法是在@to_io没有新值的时候返回false,因为如果没有新值,表示这个请求是空的,或者连接了,但是没有请求过来,然后就直接返回fallse了,这时这个请求就会放到Reactor中去,如果有值进来,则会去执行 try_to_finish
方法。但是如果 queue_requests
为false时,由于没有像上面讨论那样,不会创建Reactor实例,如果请求体那些没有准备好,自然也就不能添加到Reactor中去,所以主线程会一直在循环等待解析完请求再继续执行。这样可能会导致线程一直等待阻塞的情况出现。
def try_to_finish
return read_body unless @read_header
begin
data = @io.read_nonblock(CHUNK_SIZE)
rescue Errno::EAGAIN
return false
rescue SystemCallError, IOError
raise ConnectionError, "Connection error detected during read"
end
# No data means a closed socket
unless data
@buffer = nil
@requests_served += 1
@ready = true
raise EOFError
end
if @buffer
@buffer << data
else
@buffer = data
end
@parsed_bytes = @parser.execute(@env, @buffer, @parsed_bytes)
if @parser.finished?
return setup_body
elsif @parsed_bytes >= MAX_HEADER
raise HttpParserError,
"HEADER is longer than allowed, aborting client early."
end
false
end
如果@read_header是false,则会直接去return read_body,否则主要是去 setup_body
,这个方法的主要作用是,解析设置body的内容,如果body没有内容,这设置@body为EmptyBody,然后返回true,默认情况下会把body中内容作为StringIO类型,设置为@body的值,然后返回true,还有一种可能是body的长度大于112kb,这时就会吧body设置为Tempfile。如果setup_body执行过了,表示头部已经加载分析好了,不用再去@io中读取内容了,可以直接执行read_body操作。
一个请求过来执行的过程
所以整个过程可以总结如下:
- 创建好基本的配置,包括线程的数量,端口的配置等。
- 初始化线程池,每个线程都循环等待任务的执行,等待
@todo
数量的变化,如果有变化就执行ThreadPool.new中的块参数。 - 上面的
@todo
是在初始化时主进程中创建了个线程用select(2) 去监听socket的变化,即是监听是否有请求过来,如果有就初始化为一个client,然后把client添加到@todo中去。 - 2步骤中执行的块中会去检测请求中的网络传输是否传完并且解析好内容,如果解析好就去执行
handle_request
方法,其中执行了@app.call(env)
,如果没解析好就放到Reactor中去。
总结:其实上面的过程是如果接到请求,就先放到线程池的@todo
中去,然后线程会去执行初始化的那个线程池块,块会看一下那个请求是否准备好了,如果准备好了就直接执行 @app.call
如果没准备好就放到 Reactor
中去,所以reactor其实是一个任务client的中转站。