AU3将BT种子转换为 磁力链的方法,AnalyseTorrentFile.Hash
最近在研究怎么样把BT种子转换为磁力链接,,发现 易语言有个函数〈发布文件信息〉可以直接取得BT种子值,很容易转换,magnet:?xt=urn:btih:+HASH就可以了,AU3有没有相关函数呢??、不喜欢易语言也不懂易语言,AU3应该更强大的吧??调用格式: 〈发布文件信息〉 分析发布文件 (文本型 发布文件名) - BT下载支持库->全局命令
英文名称:AnalyseTorrentFile
分析发布文件(Torrent文件),取到其中的信息。本命令为初级命令。
参数<1>的名称为“发布文件名”,类型为“文本型(text)”。本参数表示指定发布文件名(Torrent)。.
种子名称 = 接收到的文件路径
.判断开始 (种子名称 ≠ “”)
磁力链 = 分析发布文件 (种子名称)
.判断开始 (磁力链.效验码 ≠ “”)
编辑框1.内容 = 到文本 (“magnet:?xt=urn:btih:” + 到大写 (磁力链.效验码))
.默认
信息框 (“这不是一个有效的 torrent种子文件,或该文件不完整或已损坏!”, 16, “错误!”)
.判断结束
.默认
编辑框1.内容 = “”
回复 1# user11
这就是传说中的“易语言”啊!果然碉堡了{:face (394):} 看易语言比看c++难多了! 回复 3# haijie1223
呵呵,或许门外汉不懂易语言的“易”的精髓。
“易”,姓易的?
“易”,容易上手的?
“易”,易经相关的? 本帖最后由 user11 于 2012-4-20 20:51 编辑
怎么讨论都是些,无关的,,,我只想知道,,AU3如何取得bt种子HASH 然后 转成磁力链接P
PYTHON 代码——————AU3如何实现呢?
import os.path
import sys
import hashlib
from cStringIO import StringIO
import re
from lixian_encoding import default_encoding
def magnet_to_infohash(magnet):
import re
import base64
m = re.match(r'magnet:\?xt=urn:btih:(\w+)', magnet)
assert m, magnet
code = m.group(1)
if re.match(r'^{40}, code):
return code.decode('hex')
else:
return base64.b32decode(code)
class decoder:
def __init__(self, bytes):
self.bytes = bytes
self.i = 0
def decode_value(self):
x = self.bytes
if x.isdigit():
return self.decode_string()
self.i += 1
if x == 'd':
v = {}
while self.peek() != 'e':
k = self.decode_string()
v = self.decode_value()
self.i += 1
return v
elif x == 'l':
v = []
while self.peek() != 'e':
v.append(self.decode_value())
self.i += 1
return v
elif x == 'i':
return self.decode_int()
else:
raise NotImplementedError(x)
def decode_string(self):
i = self.bytes.index(':', self.i)
n = int(self.bytes)
s = self.bytes
self.i = i + 1 + n
return s
def decode_int(self):
e = self.bytes.index('e', self.i)
n = int(self.bytes)
self.i = e + 1
return n
def peek(self):
return self.bytes
class encoder:
def __init__(self, stream):
self.stream = stream
def encode(self, v):
if type(v) == str:
self.stream.write(str(len(v)))
self.stream.write(':')
self.stream.write(v)
elif type(v) == dict:
self.stream.write('d')
for k in sorted(v):
self.encode(k)
self.encode(v)
self.stream.write('e')
elif type(v) == list:
self.stream.write('l')
for x in v:
self.encode(x)
self.stream.write('e')
elif type(v) in (int, long):
self.stream.write('i')
self.stream.write(str(v))
self.stream.write('e')
else:
raise NotImplementedError(type(v))
def bdecode(bytes):
return decoder(bytes).decode_value()
def bencode(v):
from cStringIO import StringIO
stream = StringIO()
encoder(stream).encode(v)
return stream.getvalue()
def info_hash_from_content(content):
return hashlib.sha1(bencode(bdecode(content)['info'])).hexdigest()
def info_hash(path):
if not path.lower().endswith('.torrent'):
print ' Is it really a .torrent file? '+path
if os.path.getsize(path) > 1000*1000:
raise NotImplementedError('Torrent file too big')
with open(path, 'rb') as stream:
return info_hash_from_content(stream.read())
def encode_path(path):
return path.decode('utf-8').encode(default_encoding)
class sha1_reader:
def __init__(self, pieces, progress_callback=None):
assert pieces
assert len(pieces) % 20 == 0
self.total = len(pieces)/20
self.processed = 0
self.stream = StringIO(pieces)
self.progress_callback = progress_callback
def next_sha1(self):
self.processed += 1
if self.progress_callback:
self.progress_callback(float(self.processed)/self.total)
return self.stream.read(20)
def sha1_update_stream(sha1, stream, n):
while n > 0:
readn = min(n, 1024*1024)
bytes = stream.read(readn)
assert len(bytes) == readn
n -= readn
sha1.update(bytes)
assert n == 0
def verify_bt_single_file(path, info, progress_callback=None):
# TODO: check md5sum if available
if os.path.getsize(path) != info['length']:
return False
piece_length = info['piece length']
assert piece_length > 0
sha1_stream = sha1_reader(info['pieces'], progress_callback=progress_callback)
size = info['length']
with open(path, 'rb') as stream:
while size > 0:
n = min(size, piece_length)
size -= n
sha1sum = hashlib.sha1()
sha1_update_stream(sha1sum, stream, n)
if sha1sum.digest() != sha1_stream.next_sha1():
return False
assert size == 0
assert stream.read(1) == ''
assert sha1_stream.next_sha1() == ''
return True
def verify_bt_multiple(folder, info, file_set=None, progress_callback=None):
# TODO: check md5sum if available
piece_length = info['piece length']
assert piece_length > 0
files = [{'path':os.path.join(folder, apply(os.path.join, x['path'])), 'length':x['length'], 'file':x['path']} for x in info['files']]
sha1_stream = sha1_reader(info['pieces'], progress_callback=progress_callback)
sha1sum = hashlib.sha1()
piece_left = piece_length
complete_piece = True
while files:
f = files.pop(0)
path = f['path']
size = f['length']
if os.path.exists(path) and ((not file_set) or (f['file'] in file_set)):
if os.path.getsize(path) != size:
return False
if size <= piece_left:
with open(path, 'rb') as stream:
sha1_update_stream(sha1sum, stream, size)
assert stream.read(1) == ''
piece_left -= size
if not piece_left:
if sha1sum.digest() != sha1_stream.next_sha1() and complete_piece:
return False
complete_piece = True
sha1sum = hashlib.sha1()
piece_left = piece_length
else:
with open(path, 'rb') as stream:
while size >= piece_left:
size -= piece_left
sha1_update_stream(sha1sum, stream, piece_left)
if sha1sum.digest() != sha1_stream.next_sha1() and complete_piece:
return False
complete_piece = True
sha1sum = hashlib.sha1()
piece_left = piece_length
if size:
sha1_update_stream(sha1sum, stream, size)
piece_left -= size
else:
while size >= piece_left:
size -= piece_left
sha1_stream.next_sha1()
sha1sum = hashlib.sha1()
piece_left = piece_length
if size:
complete_piece = False
piece_left -= size
else:
complete_piece = True
if piece_left < piece_length:
if complete_piece:
if sha1sum.digest() != sha1_stream.next_sha1():
return False
else:
sha1_stream.next_sha1()
assert sha1_stream.next_sha1() == ''
return True
def verify_bt(path, info, file_set=None, progress_callback=None):
if not os.path.exists(path):
raise Exception("File doesn't exist: %s" % path)
if 'files' not in info:
if os.path.isfile(path):
return verify_bt_single_file(path, info, progress_callback=progress_callback)
else:
path = os.path.join(path, encode_path(info['name']))
return verify_bt_single_file(path, info, progress_callback=progress_callback)
else:
return verify_bt_multiple(path, info, file_set=file_set, progress_callback=progress_callback)
def verify_bt_file(path, torrent_path, file_set=None, progress_callback=None):
with open(torrent_path, 'rb') as x:
return verify_bt(path, bdecode(x.read())['info'], file_set, progress_callback)
很高深,学习了!{:face (394):}
页:
[1]