Peer协议设计
duandetao
Python边学边用--BT客户端实现之(三)Peer协议设计
与peer建立tcp连接后,首先发送handshake消息进行握手
handshake消息格式如下:
一个字节0x19 + 一个字符串' BitTorrent protocol' + 8 byte 保留字节默认值为0(draft中对保留字节有定义)
+ 种子文件中info 部分的sha1字,大小为20个字节 + 20个自己的peer id(从tracker获取到的peer信息大多没有peerid,这个可以使用本地的peer id)
如果handshake信息协商不上,tcp连接将被关闭。
BT标准BEP-3中定义了8种peer消息:消息格式为msg_len(4个字节) + msg_type(1一个字节) + payload
4 - have ---发送该消息,通告对端 本段拥有的pieces,payload为4个字节的piece index
5 - bitfield ---发送该消息,通告对端 本段拥有的pieces,为bit map的方式表示每个piece index在本端是否拥有。piece index所在bit位为1,表示拥有。
该消息为handshake成功后的第一个消息。
6 - request ---piece请求消息,payload为: index, begin, and length,都是4个字节表示,length一般实现为0x8000, 最大不能超过0x20000。
7 - piece ---piece 数据,payload为: index, begin,data
8 - cancel ---发送该消息,表示本端取消了某个piece请求。payload为:index, begin, and length
使用python的同步socket接口实现,为了减少处理过程被socket阻塞,使用多个线程处理每个peer。
每个peer包括3个线程:request timeout timer ,socket send data thread, socket receive data thread
使用 RLock 保护临界数据。peer建立和piece request分配有download task线程处理。
发送数据数据时先写一个队列,然后通过set threading.Event 变量出发socket send data thread发送数据,保证发送数据的线程不阻塞
由于 python没有结束线程的接口,socket send data thread, socket receive data thread 需要依赖特殊变量的赋值,使socket处理进程结束。
使用同步调用来触发下载过程运转,尽量不使用timer轮询的方式,可以降低cpu使用率并加快下载过程。
但是,多线程间的同步调用由于锁的存在,会导致性能下降并容易引入信号量死锁的问题。需要仔细设计好多线程的运行轨迹避免死锁。
draft BEP中定义的功能暂未实现,peer的上传流控未实现,peer质量分级未实现。
duandetao
10 2012 档案
Python边学边用--BT客户端实现之(三)Peer协议设计
摘要: 与peer建立tcp连接后,首先发送handshake消息进行握手handshake消息格式如下:一个字节0x19 + 一个字符串'BitTorrent protocol' + 8 byte 保留字节默认值为0(draft中对保留字节有定义)+ 种子文件中info 部分的sha1字,大小为20个字节 + 20个自己的peer id(从tracker获取到的peer信息大多没有peerid,这个可以使用本地的peer id)如果handshake信息协商不上,tcp连接将被关闭。BT标准BEP-3中定义了8种peer消息:消息格式为msg_len(4个字节) + msg_type 阅读全文
posted @ 2012-10-07 23:03 duandetao 阅读(619) | 评论 (0) 编辑
Python边学边用--BT客户端实现之(二)Peers获取
摘要: 解析完torrent文件之后,需要从Tracker服务器获取拥有每个piece的peer列表。tracker服务器使用http协议提供支持,使用get的方式获取请求内容。也有使用udp协议的,暂时还没有分析。get 格式如下:announce-url?info_hash=xxxxxxxxxxxxxxxxxxxxx,peer_id=xxxxxxxxxxxxxxxxxxxx,ip=x.x.x.x,port=xxxx,uploaded=xx,downloaded=xx,left=xx,event=xurl中各参数需要经过url扰码处理。其中,info_hash为torrent文件中info属性的va 阅读全文
posted @ 2012-10-02 20:09 duandetao 阅读(221) | 评论 (0) 编辑
duandetao
09 2012 档案
Python边学边用--BT客户端实现之(一)BitTorrent文件解析
摘要: BitTorrent文件解析:BitTorrent文件使用bencode编码,其中包括了4种数据类型:'d' 开头表示是dict类型,'e'表示结束'l' (小写字母L)开头表示是list类型,'e'表示结束'i'开头表示是integer类型,'e'表示结束,可以表示负数以数字开头表示string类型,数字为string长度,长度与string内容以':'分割默认所有text类型的属性为utf-8编码,但是大多数BitTorrent包含codepage 和 encoding属性,指定 阅读全文
posted @ 2012-09-29 23:17 duandetao 阅读(1038) | 评论 (2) 编辑
1 '''
2 Created on 2012-9-29
3
4 @author: ddt
5 '''
6 import torrent_file
7 import bcodec
8 import random
9 import threading
10 from trackers_info import *
11 import socket
12 from peer_connect import PeerConnect
13 import os
14 import time
15 import log_info
16
17 class DownLoadTask(object):
18 '''
19 TODO: UPNP support
20 TODO: local listening support
21 TODO: threading safety
22 TODO: DHT support
23 TODO: flow control support
24 '''
25 def __init__ (self):
26 '''
27 Constructor
28 '''
29 self. __host_ip , self. __host_port = _get_listening_addr();
30 self. __bt_file = torrent_file.TorrentFile()
31 self. __peer_id = ''
32 self. __download_state = {}
33 self. __files = []
34 self. __info_hash = None
35 self. __piece_length = 0
36
37 self. __trackers_info = None
38
39 self. __pieces_refresh_intvl = 30 # second
40 self. __pieces_refresh_timer = None
41 self. __piece_max_unread_time = 20
42
43 self. __unchoked_peers = {}
44 self. __choked_peers = {}
45 self. __max_pieces_per_peer = 50
46
47 self. __rlock_common = threading.RLock()
48
49 self. __base_path = ' .\\ '
50
51 pass
52
53 def start_task(self, bt_file_name):
54 with self. __rlock_common :
55 try :
56 self. __bt_file .read_file(bt_file_name)
57 except (IOError,torrent_file.BTFormatError), reason:
58 self. __write_log ( " Read bittorrent file error! Error:%s " % str(reason))
59 return False
60
61 self. __peer_id = _generate_peerid()
62 self. __files = self. __bt_file .get_files()
63 self. __info_hash = self. __bt_file .get_info_hash()
64
65 self. __piece_length = self. __bt_file .get_piece_length()
66
67 self. __download_state [ ' downloaded ' ] = 0
68 self. __download_state [ ' uploaded ' ] = 0
69 self. __download_state [ ' left ' ] = self. __get_total_length ()
70 self. __download_state [ ' event ' ] = ' started '
71 self. __download_state [ ' pieces-num ' ] = self. __bt_file .get_pieces_num()
72 self. __download_state [ ' downloading-pieces ' ] = {}
73 self. __download_state [ ' waiting-pieces ' ] = {}
74 self. __download_state [ ' completed-pieces ' ] = {}
75
76 self. __writingfile_pieces = []
77
78 self. __trackers_info = TrackersInfo(self)
79
80 self. __init_files ()
81
82 self. __init_pieces_state ()
83
84 self. __trackers_info .start_trackers_refresh()
85
86 self. __start_pieces_refresh ()
87
88 return True
89
90 def tacker_get_peers(self, peers):
91 with self. __rlock_common :
92 self. __try_new_peers (peers)
93 self. __dispatch_pieces ()
94
95 def get_announce_list(self):
96 return self. __bt_file .get_announces()
97
98 def get_download_stat(self):
99 with self. __rlock_common :
100 downloaded = self. __download_state [ ' downloaded ' ]
101 uploaded = self. __download_state [ ' uploaded ' ]
102 left = self. __download_state [ ' left ' ]
103 event = self. __download_state [ ' event ' ]
104 return (downloaded,uploaded,left,event)
105
106 def get_host_info(self):
107 host_info = (self. __peer_id , self. __host_ip , self. __host_port )
108 return host_info
109
110 def get_info_hash(self):
111 return self. __info_hash
112
113 def get_local_id(self):
114 return self. __peer_id
115
116 def get_local_have_pieces(self):
117 with self. __rlock_common :
118 return self. __download_state [ ' completed-pieces ' ].keys()
119
120 def get_pieces_num(self):
121 return self. __bt_file .get_pieces_num()
122
123 def peer_pieces_canceled(self, pieces):
124 print ' peer_pieces_canceled:%s ' % str(pieces)
125
126 with self. __rlock_common :
127 for piece_index in pieces:
128 if piece_index in self. __download_state [ ' downloading-pieces ' ].keys():
129 self. __undownload_piece (piece_index)
130
131 self. __dispatch_pieces ()
132
133 def peer_have_pieces(self,peer_con, pieces):
134 self. __write_log ( ' peer_have_pieces:%s ' % str(pieces))
135 with self. __rlock_common :
136 self. __dispatch_peer_pieces (peer_con, pieces)
137
138 def peer_pieces_completed(self, peer_con):
139 self. __write_log ( ' peer_pieces_completed:%s ' % str(peer_con.get_peer_addr()))
140 with self. __rlock_common :
141 self. __dispatch_peer_pieces (peer_con)
142
143 def peer_choked(self, peer_con):
144 with self. __rlock_common :
145 peer_addr = peer_con.get_peer_addr()
146 if peer_addr in self. __unchoked_peers .keys():
147 self. __unchoked_peers .pop(peer_addr)
148 self. __choked_peers [peer_con.get_peer_addr()] = peer_con
149
150 # peeding_pieces = peer_con.get_local_pending_pieces()
151 # peer_con.cancel_pieces(peeding_pieces)
152 # self.peer_pieces_canceled(peeding_pieces)
153
154 def peer_unchoked(self, peer_con):
155 with self. __rlock_common :
156 peer_addr = peer_con.get_peer_addr()
157 if peer_addr in self. __choked_peers .keys():
158 self. __choked_peers .pop(peer_addr)
159 self. __unchoked_peers [peer_con.get_peer_addr()] = peer_con
160 self. __dispatch_peer_pieces (peer_con)
161
162 def peer_dead(self, peer_con):
163 with self. __rlock_common :
164 peer_addr = peer_con.get_peer_addr()
165 if peer_addr in self. __choked_peers .keys():
166 self. __choked_peers .pop(peer_addr)
167
168 if peer_addr in self. __unchoked_peers .keys():
169 self. __unchoked_peers .pop(peer_addr)
170
171 def write_piecedata(self, piece_index, offset, data):
172 with self. __rlock_common :
173 if piece_index in self. __download_state [ ' downloading-pieces ' ].keys():
174 piece_state = self. __download_state [ ' downloading-pieces ' ][piece_index]
175
176 if offset == piece_state[ ' waiting-offset ' ]:
177 self. __piece_install_data (piece_state, data)
178
179 while len(piece_state[ ' disorder-data ' ]) != 0:
180 curr_offset = piece_state[ ' waiting-offset ' ]
181 if curr_offset in piece_state[ ' disorder-data ' ].keys():
182 self. __piece_install_data (piece_state, piece_state[ ' disorder-data ' ][curr_offset])
183 else :
184 break
185 else :
186 piece_state[ ' disorder-data ' ][offset] = data
187
188 return
189
190 self. __write_log ( ' unexpected piece data: %s ' % str((piece_index,offset,data)))
191
192 def read_piecedata(self, piece_index, offset, length):
193 with self. __rlock_common :
194 if piece_index in self. __download_state [ ' completed-pieces ' ].keys():
195 piece_state = self. __download_state [ ' completed-pieces ' ][piece_index]
196
197 if piece_state[ ' waiting-offset ' ] == 0:
198 self. __read_from_file (piece_state)
199
200 if offset + length <= piece_state[ ' waiting-offset ' ]:
201 piece_state[ ' unread-time ' ] = 0
202 return piece_state[ ' piece-data ' ][offset:offset+ length]
203
204 self. __write_log ( ' read piece date but not hava:%s ' % str((piece_index, offset, length)))
205
206 def __piece_install_data (self, piece_state, data):
207 piece_state[ ' waiting-offset ' ] += len(data)
208 piece_state[ ' piece-data ' ] += data
209
210 if piece_state[ ' waiting-offset ' ] >= self. __piece_length :
211 piece_state[ ' disorder-data ' ].clear()
212 self. __complete_piece (piece_state[ ' piece-index ' ])
213
214 if piece_state[ ' waiting-offset ' ] > self. __piece_length :
215 piece_state[ ' piece-data ' ] = piece_state[ ' piece-data ' ][0:self. __piece_length ]
216 piece_state[ ' waiting-offset ' ] = self. __piece_length
217 self. __write_log ( ' piece data is too much:%s ' %str(piece_state[ ' piece-index ' ]))
218
219 self. __download_state [ ' downloaded ' ] += piece_state[ ' waiting-offset ' ]
220 self. __download_state [ ' left ' ] -= piece_state[ ' waiting-offset ' ]
221
222 def __read_from_file (self, piece_state):
223 piece_index = piece_state[ ' piece-index ' ]
224 files = piece_state[ ' file-info ' ]
225 one_file = files[0]
226 first_piece = one_file[ ' first-piece ' ]
227 last_piece = one_file[ ' last-piece ' ]
228 if first_piece[0] <= piece_index or piece_index <= last_piece[0]:
229 file_offset = (piece_index - first_piece[0])*self. __piece_length - first_piece[1 ]
230 file_handle = open(self. __base_path + ' \\ ' .join(one_file[ ' name ' ]), ' rb ' )
231 file_handle.seek(file_offset)
232 data = file_handle.read(self. __piece_length )
233 file_handle.close()
234 piece_state[ ' piece-data ' ] = data
235 piece_state[ ' waiting-offset ' ] = self. __piece_length
236 piece_state[ ' unread-time ' ] = 0
237
238 def __write_piece_to_file (self, piece_state):
239 self. __write_log ( ' __write_piece_to_file ' )
240
241 piece_index = piece_state[ ' piece-index ' ]
242 data = piece_state[ ' piece-data ' ]
243
244 for one_file in piece_state[ ' file-info ' ]:
245 first_piece = one_file[ ' first-piece ' ]
246 last_piece = one_file[ ' last-piece ' ]
247 if first_piece[0] <= piece_index or piece_index <= last_piece[0]:
248 file_offset = (piece_index - first_piece[0])*self. __piece_length - first_piece[1 ]
249 else :
250 continue
251 piece_offset = 0
252 if first_piece[0] == piece_index:
253 piece_offset = first_piece[1 ]
254
255 write_file_len = self. __piece_length - piece_offset
256 if last_piece[0] == piece_index:
257 write_file_len = last_piece[1] - piece_offset
258
259 file_path = self. __base_path + ' \\ ' .join(one_file[ ' name ' ])
260 file_handle = open(file_path, ' rb+ ' )
261 file_handle.seek(file_offset)
262 data = file_handle.write(data[piece_offset:piece_offset+ write_file_len])
263 file_handle.close()
264 self. __write_log ( ' write data to file:%s ' % str((file_path, file_offset, write_file_len)))
265
266 self. __write_log ( ' __write_piece_to_file end ' )
267
268 def __get_total_length (self):
269 total_length = 0
270 for one_file in self. __files :
271 total_length += one_file[ ' length ' ]
272 return total_length
273
274 def __init_files (self):
275 self. __write_log ( ' __init_files ' )
276 for one_file in self. __files :
277
278 path = self. __base_path
279 for dir in one_file[ ' name ' ][:-1 ]:
280 path += dir+ ' .\\ '
281 if not os.path.exists(path):
282 os.mkdir(path)
283
284 file_path = self. __base_path + ' \\ ' .join(one_file[ ' name ' ])
285
286 if os.path.exists(file_path):
287 break
288
289 file_length = one_file[ ' length ' ]
290 file_handle = open(file_path, ' ab+ ' )
291 dumy_data = chr(0)*0x4000
292 for i in range(0,file_length, 0x4000 ):
293 write_len = 0x4000
294 if i+write_len > file_length:
295 write_len = file_length - i
296 file_handle.write(dumy_data[0:write_len])
297 file_handle.close()
298 pass
299
300 def __init_pieces_state (self):
301 self. __write_log ( ' __init_pieces_state ' )
302 pieces_state = {}
303 for one_file in self. __files :
304 for piece_index in range(one_file[ ' first-piece ' ][0], one_file[ ' last-piece ' ][0] + 1 ):
305 if piece_index not in pieces_state.keys():
306 piece_state = {}
307 piece_state[ ' connect-info ' ] = None
308 piece_state[ ' file-info ' ] = [one_file]
309
310 piece_state[ ' waiting-offset ' ] = 0
311 piece_state[ ' piece-data ' ] = ''
312 piece_state[ ' disorder-data ' ] = {}
313
314 piece_state[ ' piece-index ' ] = piece_index
315
316 piece_state[ ' unread-time ' ] = 0
317 pieces_state[piece_index] = piece_state
318 else :
319 pieces_state[piece_index][ ' file-info ' ] += one_file
320 self. __download_state [ ' waiting-pieces ' ] = pieces_state
321 pass
322
323 def __start_pieces_refresh (self):
324 if self. __pieces_refresh_timer != None:
325 self. __pieces_refresh_timer .cancel()
326
327 refresh_thread = threading.Thread(target=DownLoadTask. __refresh_pieces_state , args= (self,))
328 refresh_thread.start()
329
330 def __refresh_pieces_state (self):
331 self. __write_log ( ' __refresh_pieces_state ' )
332
333 with self. __rlock_common :
334 for piece_state in self. __download_state [ ' completed-pieces ' ].values():
335 if piece_state[ ' waiting-offset ' ] != 0:
336 piece_state[ ' unread-time ' ] += 1
337
338 if len(piece_state[ ' file-info ' ]) >= 2 and piece_state[ ' unread-time ' ] > self. __piece_max_unread_time :
339 piece_state[ ' waiting-offset ' ] = 0
340 piece_state[ ' piece-data ' ] = ''
341 piece_state[ ' unread-time ' ] = 0
342
343 self. __check_files_completed ()
344
345 if self. __pieces_refresh_timer != None:
346 self. __pieces_refresh_timer .cancel()
347
348 self. __pieces_refresh_timer = threading.Timer(self. __pieces_refresh_intvl , 349 DownLoadTask. __refresh_pieces_state , [self,])
350 self. __pieces_refresh_timer .start()
351
352 self. __dispatch_pieces ()
353 self. __write_log ( ' __refresh_pieces_state end ' )
354
355 def __notify_peers_have (self, have_pieces):
356 self. __write_log ( ' __notify_peers_have ' )
357 with self. __rlock_common :
358 for peer_con in self. __unchoked_peers .values():
359 peer_con.notify_local_have_pieces(have_pieces)
360
361 for peer_con in self. __choked_peers .values():
362 peer_con.notify_local_have_pieces(have_pieces)
363
364 def __try_new_peers (self, peers):
365 with self. __rlock_common :
366 for peer in peers:
367 if peer not in self. __unchoked_peers .keys() and peer not in self. __choked_peers .keys():
368 peer_con = PeerConnect(peer[0],peer[1 ],self)
369 self. __choked_peers [peer] = peer_con
370 peer_con.start()
371
372 def __check_files_completed (self):
373 self. __write_log ( ' __check_files_completed ' )
374 completed_pieces = self. __download_state [ ' completed-pieces ' ].keys()
375 completed_pieces = set(completed_pieces)
376 writingfile_pieces = set(self. __writingfile_pieces )
377
378 for one_file in self. __files :
379 file_path = self. __base_path + ' .\\ ' .join(one_file[ ' name ' ])
380
381 first_piece = one_file[ ' first-piece ' ]
382 last_piece = one_file[ ' last-piece ' ]
383
384 file_pieces = range(first_piece[0], last_piece[0]+1 )
385 file_pieces = set(file_pieces)
386
387 need_writefile = file_pieces & writingfile_pieces
388 if len(need_writefile) != 0:
389 file_handle = open(file_path, ' ab+ ' )
390 for piece_index in need_writefile:
391
392 piece_state = self. __download_state [ ' completed-pieces ' ][piece_index]
393 if len(piece_state[ ' file-info ' ]) > 1 :
394 self. __write_piece_to_file (piece_state)
395 writingfile_pieces.remove(piece_index)
396 continue
397
398 data = piece_state[ ' piece-data ' ]
399 file_offset = (piece_index - first_piece[0])*self. __piece_length - first_piece[1 ]
400 file_handle.seek(file_offset)
401 file_handle.write(data)
402 self. __write_log ( ' writing data to file:%s,%s ' % (piece_index,file_path))
403 file_handle.close()
404
405 if file_pieces <= completed_pieces:
406 one_file[ ' completed ' ] = True
407 self. __write_log ( ' file:%s completed! ' % file_path)
408
409 self. __writingfile_pieces = []
410
411 def __dispatch_pieces (self):
412 self. __write_log ( ' __dispatch_pieces ' )
413 for peer_con in self. __unchoked_peers .values():
414 if len(self. __download_state [ ' waiting-pieces ' ].keys()) == 0:
415 break
416
417 self. __dispatch_peer_pieces (peer_con)
418
419 for peer_con in self. __choked_peers .values():
420 if len(self. __download_state [ ' waiting-pieces ' ].keys()) == 0:
421 break
422
423 self. __dispatch_peer_pieces (peer_con)
424
425 def __dispatch_peer_pieces (self, peer_con, peer_have_pieces = None):
426 self. __write_log ( ' __dispatch_peer_pieces ' )
427
428 peer_pending_pieces = peer_con.get_local_pending_pieces()
429 dispatching_num = self. __max_pieces_per_peer - len(peer_pending_pieces)
430 if dispatching_num <= 0:
431 return
432
433 if peer_have_pieces == None:
434 peer_have_pieces = peer_con.get_peer_have_pieces()
435
436 peer_have_pieces = set(peer_have_pieces)
437
438 if len(peer_have_pieces) == 0:
439 return
440
441 waiting_pieces = set(self. __download_state [ ' waiting-pieces ' ].keys())
442
443 if len(waiting_pieces) == 0:
444 return
445
446 dispatching_pieces = peer_have_pieces & waiting_pieces
447
448 dispatching_pieces = list(dispatching_pieces)
449 dispatching_pieces = dispatching_pieces[0:dispatching_num]
450 if len(dispatching_pieces) == 0:
451 return
452
453 if peer_con.dispatch_pieces(dispatching_pieces, self. __piece_length ):
454 for piece_index in dispatching_pieces:
455 self. __download_piece (piece_index, peer_con)
456 self. __write_log ( ' dispatch_pieces:%s,%s ' % (str(dispatching_pieces),str(peer_con.get_peer_addr())))
457 pass
458
459 def __complete_piece (self, piece_index):
460 piece_state = self. __download_state [ ' downloading-pieces ' ].pop(piece_index, None)
461 if piece_state == None:
462 return
463 self. __download_state [ ' completed-pieces ' ][piece_index] = piece_state
464 piece_state[ ' connect-info ' ] = None
465 self. __notify_peers_have ([piece_index,])
466 self. __writingfile_pieces .append(piece_index)
467
468 def __undownload_piece (self, piece_index):
469 piece_state = self. __download_state [ ' downloading-pieces ' ].pop(piece_index, None)
470 if piece_state == None:
471 return
472 self. __download_state [ ' waiting-pieces ' ][piece_index] = piece_state
473
474 self. __download_state [ ' left ' ] += piece_state[ ' waiting-offset ' ]
475 self. __download_state [ ' downloaded ' ] -= piece_state[ ' waiting-offset ' ]
476 piece_state[ ' waiting-offset ' ] = 0
477 piece_state[ ' piece-data ' ] = ''
478 piece_state[ ' connect-info ' ] = None
479
480 def __download_piece (self, piece_index, peer_con):
481 piece_state = self. __download_state [ ' waiting-pieces ' ].pop(piece_index, None)
482 if piece_state == None:
483 return
484
485 self. __download_state [ ' downloading-pieces ' ][piece_index] = piece_state
486 piece_state[ ' waiting-offset ' ] = 0
487 piece_state[ ' piece-data ' ] = ''
488 piece_state[ ' connect-info ' ] = peer_con
489
490 def __write_log (self,log):
491 log_info.write_log(log)
492 pass
493
494 def _generate_peerid():
495 peerid = ''
496 for i in range(0,20 ):
497 peerid += chr(random.randrange(ord( ' a ' ),ord( ' z ' )))
498 return peerid
499
500 def _get_listening_addr():
501 host_ip = socket.gethostbyname(socket.gethostname())
502 host_port = 6881
503 return host_ip,host_port
504
505 if __name__ == ' __main__ ' :
506 new_task = DownLoadTask()
507 new_task.start_task(r " .\narodo.torrent " )
508 time.sleep(100000 )
509
作者: Leo_wl
出处: http://HdhCmsTestcnblogs测试数据/Leo_wl/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
版权信息