1
0
mirror of https://github.com/danog/MadelineProto.git synced 2024-11-27 05:34:42 +01:00

Merge remote-tracking branch 'origin/master'

Conflicts:
	tests/ige.py
This commit is contained in:
Anton Grigoryev 2015-03-16 17:24:08 +03:00
commit 9379dd968d
4 changed files with 405 additions and 91 deletions

View File

@ -6,6 +6,10 @@ Created on Tue Sep 2 19:26:15 2014
@author: Sammy Pfeiffer
"""
from binascii import crc32 as originalcrc32
MIN_SUPPORTED_PY3_VERSION = (3, 2, 0)
from sys import version_info
if version_info >= MIN_SUPPORTED_PY3_VERSION: #py3 has no long
long = int
def crc32(data):
return originalcrc32(data) & 0xffffffff
from datetime import datetime
@ -102,11 +106,12 @@ def serialize_method(bytes_io, type_, **kwargs):
def serialize_param(bytes_io, type_, value):
print("type(value): " + str(type(value)))
if type_ == "int":
assert isinstance(value, int)
bytes_io.write(struct.pack('<i', value))
elif type_ == "long":
assert isinstance(value, int)
assert (isinstance(value, long) or isinstance(value, int)) # Py2 can be both
bytes_io.write(struct.pack('<q', value))
elif type_ in ["int128", "int256"]:
assert isinstance(value, bytes)

389
telepy.py
View File

@ -1,6 +1,6 @@
#CLI like interface
import argparse, getopt, cmd
import argparse, getopt, cmd, os
parser = argparse.ArgumentParser('telepy',description='Python implementation of telegram API.')
parser.add_argument('command', nargs='?', choices=['cmd', 'dialog_list', 'contact_list'] + ['chat_' + sub for sub in ['info', 'add_user', 'add_user_to_chat', 'del_user', 'set_photo', 'rename']])
@ -16,114 +16,333 @@ print(args)
class telepyShell(cmd.Cmd):
intro='Welcome to telepy interactive shell. Type help or ? for help.\n'
prompt='>'
def precmd(self, line):
# if len(line) < 1 : return None
# lines = line.split()
# cmd_name = lines[0].lower()
return line
# convert first word(command name) to lower and return it as line
line = line.lstrip()
blank_pos = line.find(' ')
if blank_pos < 0: return line.lower()
return line[:blank_pos].lower() + ' ' + line[blank_pos+1:]
def completedefault(self, *ignored):
print(ignored)
def complete(self, text, state):
super().complete(text, state)
print('completing')
def do_shell(self, line):
'''
shell <command-line>
lets you use external shell. !<command-line> for short-hand.
'''
print(os.popen(line).read())
#detailed commands
def do_msg(self, arg): pass
def do_fwd(self, arg): pass
def do_chat_with_peer(self, arg): pass
def do_add_contact(self, arg): pass
def do_rename_contact(self, arg): pass
def do_mark_read(self, arg): pass
def do_delete_msg(self, arg): pass
def do_restore_msg(self, arg): pass
def do_msg(self, arg):
'''
msg <peer>
sends message to this peer
'''
pass
def do_fwd(self, arg):
'''
fwd <user> <msg-no>
forward message to user. You can see message numbers starting client with -N
'''
pass
def do_chat_with_peer(self, arg):
'''
chat_with_peer <peer>
starts one on one chat session with this peer. /exit or /quit to end this mode.
'''
pass
def do_add_contact(self, arg):
'''
add_contact <phone-number> <first-name> <last-name>
tries to add contact to contact-list by phone
'''
pass
def do_rename_contact(self, arg):
'''
rename_contact <user> <first-name> <last-name>
tries to rename contact. If you have another device it will be a fight
'''
pass
def do_mark_read(self, arg):
'''
mark_read <peer>
mark read all received messages with peer
'''
pass
def do_delete_msg(self, arg):
'''
delete_msg <msg-no>
deletes message (not completly, though)
'''
pass
def do_restore_msg(self, arg):
'''
restore_msg <msg-no>
restores delete message. Impossible for secret chats. Only possible short time (one hour, I think) after deletion
'''
pass
def do_send_photo(self, arg): pass
def do_send_video(self, arg): pass
def do_send_text(self, arg): pass
def do_send_photo(self, arg):
'''
send_photo <peer> <photo-file-name>
sends photo to peer
'''
pass
def do_load_photo(self, arg): pass
def do_load_video(self, arg): pass
def do_load_video_thumb(self, arg): pass
def do_load_audio(self, arg): pass
def do_load_document(self, arg): pass
def do_load_document_thumb(self, arg): pass
def do_send_video(self, arg):
'''
send_video <peer> <video-file-name>
sends video to peer
'''
pass
def do_send_text(self, arg):
'''
send_text <peer> <text-file-name>
sends text file as plain messages
'''
pass
def do_view_photo(self, arg): pass
def do_view_video(self, arg): pass
def do_view_video_thumb(self, arg): pass
def do_view_audio(self, arg): pass
def do_view_document(self, arg): pass
def do_view_document_thumb(self, arg): pass
def do_load_photo(self, arg):
'''
load_photo <msg-no>
loads photo to download dir
'''
pass
def do_fwd_media(self, arg): pass
def do_set_profile_photo(self, arg): pass
def do_load_video(self, arg):
'''
load_video <msg-no>
loads video to download dir
'''
pass
def do_load_video_thumb(self, arg):
'''
load_video_thumb <msg-no>
loads video thumbnail to download dir
'''
pass
def do_load_audio(self, arg):
'''
load_audio <msg-no>
loads audio to download dir
'''
pass
def do_load_document(self, arg):
'''
load_document <msg-no>
loads document to download dir
'''
pass
def do_load_document_thumb(self, arg):
'''
load_document_thumb <msg-no>
loads document thumbnail to download dir
'''
pass
def do_view_photo(self, arg):
'''
view_photo <msg-no>
loads photo/video to download dir and starts system default viewer
'''
pass
def do_view_video(self, arg):
'''
view_video <msg-no>
'''
pass
def do_view_video_thumb(self, arg):
'''
view_video_thumb <msg-no>
'''
pass
def do_view_audio(self, arg):
'''
view_audio <msg-no>
'''
pass
def do_view_document(self, arg):
'''
view_document <msg-no>
'''
pass
def do_view_document_thumb(self, arg):
'''
view_document_thumb <msg-no>
'''
pass
def do_fwd_media(self, arg):
'''
fwd_media <msg-no>
send media in your message. Use this to prevent sharing info about author of media (though, it is possible to determine user_id from media itself, it is not possible get access_hash of this user)
'''
pass
def do_set_profile_photo(self, arg):
'''
set_profile_photo <photo-file-name>
sets userpic. Photo should be square, or server will cut biggest central square part
'''
pass
def do_chat_info(self, arg):
'''
chat_info <chat>
prints info about chat
'''
arg=arg.split()
if len(arg) is 1:
print ('chat_info called with ', arg[0])
def do_chat_add_user(self,arg):
'''
chat_add_user <chat> <user>
add user to chat
'''
print(arg)
def do_chat_del_user(self,arg): pass
def do_chat_del_user(self,arg):
'''
chat_del_user <chat> <user>
remove user from chat
'''
pass
def do_chat_rename(self,arg):
'''
chat_rename <chat> <new-name>
rename chat room
'''
arg=arg.split()
def do_create_group_chat(self,arg): pass
def do_chat_set_photo(self,arg): pass
def do_create_group_chat(self, chat_topic, user1, user2, user3):
'''
create_group_chat <chat topic> <user1> <user2> <user3> ...
creates a groupchat with users, use chat_add_user to add more users
'''
print(chat_topic)
print(user1,user2,user3)
def do_search(self,arg): pass
def do_global_search(self,arg): pass
pass
def do_chat_set_photo(self, chat, photo):
'''
chat_set_photo <chat> <photo-file-name>
sets group chat photo. Same limits as for profile photos.
'''
pass
def do_create_secret_chat(self,arg): pass
def do_visualize_key(self,arg): pass
def do_set_ttl(self,arg): pass
def do_accept_secret_chat(self,arg): pass
def do_search(self, pattern):
'''
search <peer> <pattern>
searches pattern in messages with peer
'''
pass
def do_global_search(self, pattern):
'''
global_search <pattern>
searches pattern in all messages
'''
pass
def do_user_info(self,arg): pass
def do_history(self,arg): pass
def do_dialog_list(self,arg): pass
def do_contact_list(self,arg): pass
def do_suggested_contacts(self,arg): pass
def do_stats(self,arg): pass
def do_create_secret_chat(self, user):
'''
create_secret_chat <user>
creates secret chat with this user
'''
pass
def do_visualize_key(self, secret_chat):
'''
visualize_key <secret_chat>
prints visualization of encryption key. You should compare it to your partner's one
'''
pass
def do_set_ttl(self, secret_chat, ttl):
'''
set_ttl <secret_chat> <ttl>
sets ttl to secret chat. Though client does ignore it, client on other end can make use of it
'''
pass
def do_accept_secret_chat(self, secret_chat):
'''
accept_secret_chat <secret_chat>
manually accept secret chat (only useful when starting with -E key)
'''
pass
def do_export_card(self,arg): pass
def do_import_card(self,arg): pass
def do_user_info(self, user):
'''
user_info <user>
prints info about user
'''
pass
def do_history(self, peer, limit=40):
'''
history <peer> [limit]
prints history (and marks it as read). Default limit = 40
'''
if peer is '':
print('no peer have specified')
return
args = peer.split()
if len(args) not in (1,2) :
print('not appropriate number of arguments : ', peer)
return
if len(args) is 2:
if not args[1].isdecimal() or int(args[1]) < 1:
print('not a valid limit:', args[1])
limit = int(args[1])
print(peer)
print(limit)
def do_dialog_list(self, ignored):
'''
dialog_list
prints info about your dialogs
'''
pass
def do_contact_list(self, ignored):
'''
contact_list
prints info about users in your contact list
'''
pass
def do_suggested_contacts(self, ignored):
'''
suggested_contacts
print info about contacts, you have max common friends
'''
pass
def do_stats(self, ignored):
'''
stats
just for debugging
'''
pass
def do_quit_force(self,arg):
def do_export_card(self, card):
'''
export_card
print your 'card' that anyone can later use to import your contact
'''
pass
def do_import_card(self, card):
'''
import_card <card>
gets user by card. You can write messages to him after that.
'''
pass
def do_quit_force(self, ignored):
'''
quit_force
quit without waiting for query ends
'''
return True
def do_quit(self, arg):
def do_quit(self, ignored):
'''
quit
wait for all queries to end then quit
'''
#TODO:safely end queries
return True
if args.command is None:
telepyShell().cmdloop()
# chat_info <chat> -
# chat_add_user <chat> <user> -
# chat_del_user <chat> <user> -
# chat_set_photo <chat> <photo-file-name> -
# rename_chat <chat> <new-name>
# create_group_chat <chat topic> <user1> <user2> <user3> ... - creates a groupchat with users, use chat_add_user to add more users
# Search
#
# search <peer> pattern - searches pattern in messages with peer
# global_search pattern - searches pattern in all messages
# Secret chat
#
# create_secret_chat <user> - creates secret chat with this user
# visualize_key <secret_chat> - prints visualization of encryption key. You should compare it to your partner's one
# set_ttl <secret_chat> <ttl> - sets ttl to secret chat. Though client does ignore it, client on other end can make use of it
# accept_secret_chat <secret_chat> - manually accept secret chat (only useful when starting with -E key)
# Stats and various info
#
# user_info <user> - prints info about user
# history <peer> [limit] - prints history (and marks it as read). Default limit = 40
# dialog_list - prints info about your dialogs
# contact_list - prints info about users in your contact list
# suggested_contacts - print info about contacts, you have max common friends
# stats - just for debugging
# show_license - prints contents of GPLv2
# help - prints this help
# Card
#
# export_card - print your 'card' that anyone can later use to import your contact
# import_card <card> - gets user by card. You can write messages to him after that.
# Other
#
# quit - quit
# safe_quit - wait for all queries to end then quit

View File

@ -3,7 +3,7 @@ import os
import io
import struct
# Deal with py2 and py3 differences
try:
try: # this only works in py2.7
import configparser
except ImportError:
import ConfigParser as configparser

View File

@ -1,4 +1,15 @@
if version_info >= (3, 4, 0):
# -*- coding: utf-8 -*-
# Author: Sammy Pfeiffer
# This file implements the AES 256 IGE cipher
# working in Python 2.7 and Python 3.4 (other versions untested)
# as it's needed for the implementation of Telegram API
# It's based on PyCryto
from __future__ import print_function
from Crypto.Util import number
from Crypto.Cipher import AES
MIN_SUPPORTED_PY3_VERSION = (3, 2, 0)
from sys import version_info
if version_info >= MIN_SUPPORTED_PY3_VERSION:
from binascii import hexlify
long = int
@ -21,7 +32,7 @@ def str_bytes_to_hex_string(val):
tmp_aes_key_hex = '\xf0\x11(\x08\x87\xc7\xbb\x01\xdf\x0f\xc4\xe1x0\xe0\xb9\x1f\xbb\x8b\xe4\xb2&|\xb9\x85\xae%\xf3;RrS'
Convert it back to it's uppercase string representation, like:
tmp_aes_key_str = "F011280887C7BB01DF0FC4E17830E0B91FBB8BE4B2267CB985AE25F33B527253" """
if version_info >= (3, 4, 0):
if version_info >= MIN_SUPPORTED_PY3_VERSION:
return str(hexlify(val).upper())
return val.encode("hex").upper()
@ -31,6 +42,85 @@ def hex_string_to_long(val):
Convert it to int, which is actually long"""
return int(val, 16)
def xor_stuff(a, b):
"""XOR applied to every element of a with every element of b.
Depending on python version and depeding on input some arrangements need to be done."""
if version_info < MIN_SUPPORTED_PY3_VERSION:
if len(a) > len(b):
return "".join([chr(ord(x) ^ ord(y)) for (x, y) in zip(a[:len(b)], b)])
else:
return "".join([chr(ord(x) ^ ord(y)) for (x, y) in zip(a, b[:len(a)])])
else:
if type(a) == str and type(b) == bytes:# cipher.encrypt returns string
return bytes(ord(x) ^ y for x, y in zip(a, b))
elif type(a) == bytes and type(b) == str:
return bytes(x ^ ord(y) for x, y in zip(a, b))
else:
return bytes(x ^ y for x, y in zip(a, b))
def ige(message, key, iv, operation="decrypt"):
"""Given a key, given an iv, and message
do whatever operation asked in the operation field.
Operation will be checked for: "decrypt" and "encrypt" strings.
Returns the message encrypted/decrypted.
message must be a multiple by 16 bytes (for division in 16 byte blocks)
key must be 32 byte
iv must be 32 byte (it's not internally used in AES 256 ECB, but it's
needed for IGE)"""
if type(message) == long:
message = number.long_to_bytes(message)
if type(key) == long:
key = number.long_to_bytes(key)
if type(iv) == long:
iv = number.long_to_bytes(iv)
if len(key) != 32:
raise ValueError("key must be 32 bytes long (was " +
str(len(key)) + " bytes)")
if len(iv) != 32:
raise ValueError("iv must be 32 bytes long (was " +
str(len(iv)) + " bytes)")
cipher = AES.new(key, AES.MODE_ECB, iv)
blocksize = cipher.block_size
if len(message) % blocksize != 0:
raise ValueError("message must be a multiple of 16 bytes (try adding " +
str(16 - len(message) % 16) + " bytes of padding)")
ivp = iv[0:blocksize]
ivp2 = iv[blocksize:]
ciphered = None
for i in range(0, len(message), blocksize):
indata = message[i:i+blocksize]
if operation == "decrypt":
xored = xor_stuff(indata, ivp2)
decrypt_xored = cipher.decrypt(xored)
outdata = xor_stuff(decrypt_xored, ivp)
ivp = indata
ivp2 = outdata
elif operation == "encrypt":
xored = xor_stuff(indata, ivp)
encrypt_xored = cipher.encrypt(xored)
outdata = xor_stuff(encrypt_xored, ivp2)
ivp = outdata
ivp2 = indata
else:
raise ValueError("operation must be either 'decrypt' or 'encrypt'")
if ciphered is None:
ciphered = outdata
else:
ciphered_ba = bytearray(ciphered)
ciphered_ba.extend(outdata)
if version_info >= MIN_SUPPORTED_PY3_VERSION:
ciphered = bytes(ciphered_ba)
else:
ciphered = str(ciphered_ba)
return ciphered
if __name__ == "__main__":
# Example data from https://core.telegram.org/mtproto/samples-auth_key#conversion-of-encrypted-answer-into-answer
@ -39,7 +129,7 @@ if __name__ == "__main__":
tmp_aes_iv_str = "3212D579EE35452ED23E0D0C92841AA7D31B2E9BDEF2151E80D15860311C85DB"
answer_str = "BA0D89B53E0549828CCA27E966B301A48FECE2FCA5CF4D33F4A11EA877BA4AA57390733002000000FE000100C71CAEB9C6B1C9048E6C522F70F13F73980D40238E3E21C14934D037563D930F48198A0AA7C14058229493D22530F4DBFA336F6E0AC925139543AED44CCE7C3720FD51F69458705AC68CD4FE6B6B13ABDC9746512969328454F18FAF8C595F642477FE96BB2A941D5BCD1D4AC8CC49880708FA9B378E3C4F3A9060BEE67CF9A4A4A695811051907E162753B56B0F6B410DBA74D8A84B2A14B3144E0EF1284754FD17ED950D5965B4B9DD46582DB1178D169C6BC465B0D6FF9CA3928FEF5B9AE4E418FC15E83EBEA0F87FA9FF5EED70050DED2849F47BF959D956850CE929851F0D8115F635B105EE2E4E15D04B2454BF6F4FADF034B10403119CD8E3B92FCC5BFE000100262AABA621CC4DF587DC94CF8252258C0B9337DFB47545A49CDD5C9B8EAE7236C6CADC40B24E88590F1CC2CC762EBF1CF11DCC0B393CAAD6CEE4EE5848001C73ACBB1D127E4CB93072AA3D1C8151B6FB6AA6124B7CD782EAF981BDCFCE9D7A00E423BD9D194E8AF78EF6501F415522E44522281C79D906DDB79C72E9C63D83FB2A940FF779DFB5F2FD786FB4AD71C9F08CF48758E534E9815F634F1E3A80A5E1C2AF210C5AB762755AD4B2126DFA61A77FA9DA967D65DFD0AFB5CDF26C4D4E1A88B180F4E0D0B45BA1484F95CB2712B50BF3F5968D9D55C99C0FB9FB67BFF56D7D4481B634514FBA3488C4CDA2FC0659990E8E868B28632875A9AA703BCDCE8FCB7AE551"
if version_info < (3, 4, 0):
if version_info < MIN_SUPPORTED_PY3_VERSION:
# Crypto.Cipher.AES needs it's parameters to be 32byte str
# So we can either give 'str' type like this ONLY WORKS ON PYTHON2.7
encrypted_answer_hex = encrypted_answer_str.decode("hex")
@ -59,7 +149,7 @@ if __name__ == "__main__":
print("decrypted_answer using int version of input: ")
print(decrypted_answer_in_int)
if version_info < (3, 4, 0):
if version_info < MIN_SUPPORTED_PY3_VERSION:
if decrypted_answer_in_str == decrypted_answer_in_int:
print("\nBoth str input and int input give the same result")
else: