服务器实作
3.1 构建Python Websocket服务器
Websocket是随着HTML5技术出现的一个最新的Web服务类型的协议标准,它实现了客户端和服务器全双工的功能。
3.1.1 Websocket的应用场景
Websocket的基本握手协议: 客户端部分:
GET /chat HTTP/1.1
Host: server.xzy.com
Upgrade: websocket
Connection: Upgrade
Sec-Websocket-Key: xsdouiaDDBub25jZQ==
Origin: http://xyz.com
Sec-Websocket-Protocol: chat,
Sec-Websocket-Version: 13
服务端响应部分:
HTTP/1.1 101 Switching Protocol
Upgrade: websocket
Connection: Upgrade
Sec-Websocket-Accept: s3psauiodASDZRbK+xOo=
Sec-Websocket-Protocol: chat
Upgrade: websocket
:这是一个特殊的HTTP请求,请求的目的是要将客户端和服务器的通信协议从HTTP协议升级到Websocket协议。Sec-Websocket-Key
:是一段浏览器BASE64加密的密钥。Sec-Websocket-Accept
:服务器在接收到Sec-Websocket-Key密钥后追加一段魔法字符串(Magic String),并将结果进行SHA-1散列签名,然后再进行BASE64加密返回客户端。Sec-Websocket-Protocol
:表示客户端请求提供的可供选择的子协议,及服务器选中的支持的子协议。Origin
:服务器用于区分未授权的websocket浏览器。HTTP/1.1 101 Switching Protocol
:其中101为服务器返回的状态码,所有非101的状态码都表示handshake并未完成。
3.1.2 实作Websocket握手协议
当客户端和服务器要进行通信的时候,TCP协议底层要进行三次握手,在握手完毕后,就可以进行一般通信的传输了。
定义一些要使用到的库:
import socket
import threading
import sys
import os
import base64
import hashlib
import struct
import json
最初的定义部分:
HOST = 'localhost'
PORT = 1234
MAGIC_STRING = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
HANDSHAKE_STRING = "HTTP/1.1 101 Switching Protocols\r\n" \
"Upgrade: websocket\r\n" \
"Connection: Upgrade\r\n" \
"Sec-Websocket-Accept: #1\r\n" \
"Websocket-Location: ws://#2/chat\r\n" \
"Websocket-Protocol:chat\r\n\r\n" \
其中,MAGIC_STRING
不能被改变,是RFC6455标准中规定的值。
3.1.3 MAGIC_STRING在Websocket中的作用
握手的具体函数内容:
def handshake(con):
headers = {}
try:
shake = con.recv(1024)
except Exception as e:
print e
return False
if not len(shake):
return False
try:
header, data = shake.split('\r\n\r\n', 1)
for line in header.split('\r\n')[1:]:
key, val = line.split(': ', 1)
headers[key] = val
if 'Sec-Websocket-Key' not in headers:
print ('not websocket.')
con.close()
return False
sec_key = headers['Sec-Websocket-Key']
res_key = base64.b64encode(hashlib.sha1(sec_key + MAGIC_STRING).digest())
str_handshake = HANDSHAKE_STRING.replace('#1', res_key).replace('#2', HOST + ':' + str(PORT))
print str_handshake
con.send(str_handshake)
except Exception as e:
print e
return False
return True
- 在recv到数据流后,将内容分割开,形成一个字典。
- 将Sec-Websocket-Key作为key,加上MAGIC_STRING进行SHA1的散列,再加上BASE64编码,最后将计算结果传送回去。
- 这一整个流程下来,握手过程就结束了。
3.1.4 Websocket启动
Websocket启动:
def start_service():
global HOST
global PORT
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind((HOST, PORT))
sock.listen(1)
print "bind" + str(PORT) + ", ready to use"
except Exception, e:
print ("Server is already running, quit")
print e
while True:
time.sleep(1)
try:
connection, address = sock.accept()
except Exception as e:
print e
else:
print "Got connection from ", address
if handshake(connection):
print "handshake success"
try:
# SOME THREAD CODE
print 'new thread for client ...'
except Exception as e:
print e
print 'start new thread error'
connection.close()
3.1.5 Websocket消息拆分与读取
接收从浏览器或客户端传过来的数据包,包裹格式是这样的:固定字节+包的长度信息+掩码+数据。
- 包长度信息:第一位一定是1,其余的7位取0~127的整数,其中1~125则表示长度,如果是126,则表示只有2个字节的长度,127表示有4个字节长度。
- 掩码:在长度信息后面4个字节,之后和数据进行运算才能得到真正的数据。
发送的数据包的格式是:固定字节+包长度信息+原始数据。
接收的Python代码
def recv_data(self, num):
try:
all_data = self.con.recv(num) # 原生recv
if not len(all_data):
return "" # get nothing
except:
return ""
else:
len = ord(all_data[1]) & 127
if len == 126:
masks = all_data[4:8]
data = all_data[8:]
elif len == 127:
masks = all_data[10:14]
data = all_data[14:]
else:
masks = all_data[2:6]
data = all_data[6:]
raw_str = ""
i = 0
for d in data:
raw_str += chr(ord(d) ^ ord(masks[i % 4]))
i += 1
return raw_str
发送的Python代码
def send_data(self, data):
if data:
data = str(data)
else:
return False
token = "\x81" # 固定长度
length = len(data)
if length < 126:
token += struct.pack("B", length) # struct用于处理二进制数据流
elif length <= 0xFFFF:
token += struct.pack("!BQ", 126, length)
else:
token += struct.pack("!BH", 127, length)
data = '%s%s' % (token, data)
self.con.send(data)
return True
3.2 多线程服务
-
进程:进程是一个正在运行的程序对于操作系统的抽象,操作系统给没一个进程分配了独立的处理器资源、内存、磁盘空间(或者网络)资源。
-
线程:线程是指在一个单独的进程中,对于CPU和内存而言的多个工作单位,所有线程在进程中的资源都是共享的(全局数据、执行代码等)
-
并发:并发放在网络服务器上的概念,就是在同一台物理服务器或者逻辑服务器上同时能承载的用户数,可以同时处理多个客户端发来的逻辑数据。
-
超线程:多核CPU的任意一个物理核可以通过超线程技术在极端时间内同时执行两个或多个计算任务。
-
并行:并行不同于并发。并行指的是指令集的并行,即CPU同时执行多条指令的属性。
3.2.1 Python的多线程模式
启动一个线程(使用threading):
#coding=utf-8
import threading
from time import sleep
def foo1(a):
for i in range(2000):
print 'foo1', i, a
sleep(1)
i += 1
def foo2(a):
for i in range(1000):
print 'foo2', i, a
sleep(5)
i += 1
threads = []
t1 = threading.Thread(target=foo1, args=('foo1 arg',))
threads.append(t1)
t2 = threading.Thread(target=foo2, args=('foo2 arg',))
threads.append(t2)
if __name__ == '__main__':
for ts in threads:
ts.setDaemon(True)
ts.start()
扩展:
setDaemon()
:主线程A中,创建了子线程B,并且在主线程A中调用了B.setDaemon(),这个的意思是,把主线程A设置为守护线程,这时候,要是主线程A执行结束了,就不管子线程B是否完成,一并和主线程A退出。此外,还有个要特别注意的:必须在start() 方法调用之前设置,如果不设置为守护线程,程序会被无限挂起。join()
:主线程A中,创建了子线程B,并且在主线程A中调用了B.join(),那么,主线程A会在调用的地方等待,直到子线程B完成操作后,才可以接着往下执行,那么在调用这个线程时可以使用被调用线程的join方法(与setDaemon相反)。
在Python中有两个库,一个是thread,一个是threading。
thread更为底层,threading库在thread库上做了封装。
在Python3中,thread库被修改为_thread。
使用thread实现多线程的处理模式:
import string, threading, time
def thread_main(a):
global count, mutex
threadname = threading.currentThread().getName()
for x in xrange(0, int(a)):
mutex.acquire()
count = count + 1
mutex.release()
print threadname, x, count
time.sleep(1)
def main(num):
global count, mutex
threads = []
count = 1
mutex = threading.Lock()
for i in xrange(0, num)
threads.append(threading.Thread(target=thread_main, args=(5,)))
for t in threads:
t.join()
if __name__ == '__main__':
num = 4
main(4)
3.2.2 锁
互斥锁(Mutual exclusion, Mutex)是一种用于多线程编程,防止两条线程同时对同一公共资源(如全局变量)进行读写的机制。
3.2.3 Python GIL
GIL,Global Interpreter Lock,即全局解释锁。
CPU密集型代码(单线程版本):
from threading import Thread
import time
def my_counter():
i = 0
for x in range(10000):
i = i + 1
return True
def run():
thread_array = {}
start_time = time.time()
for tt in range(2):
t = Thread(target=my_counter)
t.start()
t.join()
end_time = time.time()
print("count time: {}".format(end_time - start_time))
if __name__ == '__main__':
run()
CPU密集型代码(多线程版本):
from threading import Thread
import time
def my_counter():
i = 0
for x in range(10000):
i = i + 1
return True
def run():
thread_array = {}
start_time = time.time()
for tt in range(2):
t = Thread(target=my_counter)
t.start()
thread_array[tt] = t
for i in range(2):
thread_array[i].join()
end_time = time.time()
print("count time: {}".format(end_time - start_time))
if __name__ == '__main__':
run()
3.2.4 multiprocess的解决思路
在Python2.6以上版本提供的multiprocess是为了弥补GIL的效率问题而出现的,不同的是它使用了多进程而不是多线程。每个进程有自己独立的GIL。
multiprocess缺点:由于进程之间无法看到对方的数据,只能通过主线程声明一个Queue,put再get或者用共享内存、共享文件、管道等方法,但编码效率会变低。
multiprocess共享内容数据的方案:
from multiprocessing import Process, Queue
def f(q):
q.put([4031, 1024, 'my data'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print q.get()
p.join()
3.2.5 给Websocket加上多线程
加上线程的websocket:
class Th(threading.Thread):
def __init__(self, connection, layout):
threading.Thread.__init__(self)
self.con = connection
self.id = layout
print "init data: ", self.con, ", ", self.id
def run(self):
timeout = 0
self.con.setblocking(0) # settimeout(15)
while True:
try:
time.sleep(1)
buf = self.recv_data(1024)
if len(buf) <= 0:
timeout += 1
else:
timeout = 0
if timeout >= 10:
print 'timeout, close'
break
buf = web_logic.function(buf, self.con, self.id)
self.send_data(buf)
except Exception, e:
print "err found ", e, self.con, "\n"
break
self.con.close()
在start_service()
函数里加上线程代码:
def start_service():
global HOST
global PORT
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.bind((HOST, PORT))
sock.listen(1)
print "bind" + str(PORT) + ", ready to use"
except Exception, e:
print ("Server is already running, quit")
print e
while True:
time.sleep(1)
try:
connection, address = sock.accept()
except Exception as e:
print e
else:
print "Got connection from ", address
if handshake(connection):
print "handshake success"
try:
t = Th(connection, address) # 新增加的线程代码
print 'new thread for client ...'
except Exception as e:
print e
print 'start new thread error'
connection.close()
3.3 线程池
线程池是一种生产者、消费者的模型,和内存池、资源池等类似。
线程池一定有一个生产线程→任务←消费者线程池线程,这样的模型存在,至于线程(进程)中的共享内容,则可以通过Queue或者其他方式取得。
3.3.1 默认线程池和进程池
进程池的使用:
import multiprocessing
def Test(a, b):
print str(a) + '->' + str(b) + '\t'
def MPTest():
pool = multiprocessing.Pool(processes=6)
for i in range(1000):
pool.apply_async(Test, (i, i + 50, ))
pool.close()
pool.join()
if __name__ == '__main__':
MPTest()
multiprocessing.Pool
是一个默认的进程池,同样的,multiprocess也提供了一个线程池,接口是dummy。
# 可以使用类似这样的代码,进行线程与进程的切换
from multiprocessing import Pool # 进程池
from multiprocessing.dummy import Pool as ThreadPool # 线程池
进程池的一些方法:
apply(func[, args[, kwds]])
:使用args和kwds参数调用func函数,结果返回前会一直阻塞。
apply_async(func[, args[, kwds[, callback[, error_callback]]]])
:apply()方法的变体,会返回一个结果对象。若回调函数被指定,那么回调可以接收一个参数然后被调用,当结果准备好回调时会调用回调函数,调用失败时,则用error_callback替换回调。回调里面的代码应被写成立即完成,否则处理结果的线程会被阻塞。
close()
:阻止更多的任务提交到pool,待任务完成后,工作进程会退出。
terminate()
:不管任务是否完成,立即停止工作进程。在对pool对象进程垃圾回收的时候,会立即调用terminate()。
join()
:等待工作线程的退出,所以在调用join()前,必须调用close或者terminate()。
3.3.2 协程
协程(Coroutine)的概念不同于线程,它看上去像是函数调用,然而在内部却能进行中断,待执行完成再回来继续执行其他内容。
创建要一个生成器(Generator)函数:
def foo():
a, b = 0, 1
while True:
yield a
a, b = b, a + b
可以在for循环中这样使用它:
for i in foo():
print i
上述做法的不仅快而且不会给内存带来压力,因为我们所需要的值都是动态生成的,而不是将它们存储在一个列表中。更概括地说,使用yield便可获得一个协程。协程会消费掉发送给它的值。
def gerp(pattern):
print "Searching for ", pattern
while True:
line = (yield)
if pattern in line:
print(line)
search = gerp('coroutine')
next(search)
search.send("I love you")
search.send("Don't you love me?")
search.send("I love coroutine instead!")
通过send()方法向gerp()函数传值,发送的值会被yield接收。
next方法是为了启动一个协程。就像协程中包含的生成器并不是立刻执行,而是通过next()方法来响应send()方法。因此,必须通过next()方法来执行yield表达式。
可以通过close()方法来关闭一个协程:
search = gerp('coroutine')
search.close()