1
0
mirror of https://github.com/danog/MadelineProto.git synced 2024-11-30 10:38:59 +01:00

crypt module refactor. need to be tested in py2

This commit is contained in:
Anton Grigoryev 2015-03-16 17:59:59 +03:00
parent 9379dd968d
commit e21614b1b2
5 changed files with 69 additions and 62 deletions

View File

@ -10,50 +10,62 @@ from __future__ import print_function
from Crypto.Util.strxor import strxor
from Crypto.Cipher import AES
def ige(message, key, iv, operation="decrypt"):
"""Given a key, given an iv, and message
do whatever operation asked in the operation field.
Operation will be checked for: "decrypt" and "encrypt" strings.
Returns the message encrypted/decrypted.
message must be a multiple by 16 bytes (for division in 16 byte blocks)
key must be 32 byte
iv must be 32 byte (it's not internally used in AES 256 ECB, but it's
needed for IGE)"""
if len(key) != 32:
raise ValueError("key must be 32 bytes long (was " +
str(len(key)) + " bytes)")
if len(iv) != 32:
raise ValueError("iv must be 32 bytes long (was " +
str(len(iv)) + " bytes)")
class IGE:
def __init__(self, key, iv):
if len(key) != 32:
raise ValueError("key must be 32 bytes long (was " +
str(len(key)) + " bytes)")
if len(iv) != 32:
raise ValueError("iv must be 32 bytes long (was " +
str(len(iv)) + " bytes)")
cipher = AES.new(key, AES.MODE_ECB, iv)
blocksize = cipher.block_size
if len(message) % blocksize != 0:
raise ValueError("message must be a multiple of 16 bytes (try adding " +
str(16 - len(message) % 16) + " bytes of padding)")
self.key = key
self.iv = iv
ivp = iv[0:blocksize]
ivp2 = iv[blocksize:]
self.cipher = AES.new(key, AES.MODE_ECB, iv)
ciphered = bytearray()
def encrypt(self, message):
return self._ige(message, operation="encrypt")
for i in range(0, len(message), blocksize):
indata = message[i:i+blocksize]
if operation == "decrypt":
xored = strxor(indata, ivp2)
decrypt_xored = cipher.decrypt(xored)
outdata = strxor(decrypt_xored, ivp)
ivp = indata
ivp2 = outdata
elif operation == "encrypt":
xored = strxor(indata, ivp)
encrypt_xored = cipher.encrypt(xored)
outdata = strxor(encrypt_xored, ivp2)
ivp = outdata
ivp2 = indata
else:
raise ValueError("operation must be either 'decrypt' or 'encrypt'")
def decrypt(self, message):
return self._ige(message, operation="decrypt")
ciphered.extend(outdata)
return ciphered
def _ige(self, message, operation="decrypt"):
"""Given a key, given an iv, and message
do whatever operation asked in the operation field.
Operation will be checked for: "decrypt" and "encrypt" strings.
Returns the message encrypted/decrypted.
message must be a multiple by 16 bytes (for division in 16 byte blocks)
key must be 32 byte
iv must be 32 byte (it's not internally used in AES 256 ECB, but it's
needed for IGE)"""
if len(message) % blocksize != 0:
raise ValueError("message must be a multiple of 16 bytes (try adding " +
str(16 - len(message) % 16) + " bytes of padding)")
blocksize = self.cipher.block_size
ivp = self.iv[0:blocksize]
ivp2 = self.iv[blocksize:]
ciphered = bytearray()
for i in range(0, len(message), blocksize):
indata = message[i:i+blocksize]
if operation == "decrypt":
xored = strxor(indata, ivp2)
decrypt_xored = self.cipher.decrypt(xored)
outdata = strxor(decrypt_xored, ivp)
ivp = indata
ivp2 = outdata
elif operation == "encrypt":
xored = strxor(indata, ivp)
encrypt_xored = self.cipher.encrypt(xored)
outdata = strxor(encrypt_xored, ivp2)
ivp = outdata
ivp2 = indata
else:
raise ValueError("operation must be either 'decrypt' or 'encrypt'")
ciphered.extend(outdata)
return ciphered

View File

@ -7,9 +7,7 @@ Created on Tue Sep 2 19:26:15 2014
"""
from binascii import crc32 as originalcrc32
MIN_SUPPORTED_PY3_VERSION = (3, 2, 0)
from sys import version_info
if version_info >= MIN_SUPPORTED_PY3_VERSION: #py3 has no long
long = int
import numbers
def crc32(data):
return originalcrc32(data) & 0xffffffff
from datetime import datetime
@ -111,7 +109,7 @@ def serialize_param(bytes_io, type_, value):
assert isinstance(value, int)
bytes_io.write(struct.pack('<i', value))
elif type_ == "long":
assert (isinstance(value, long) or isinstance(value, int)) # Py2 can be both
assert isinstance(value, numbers.Real)
bytes_io.write(struct.pack('<q', value))
elif type_ in ["int128", "int256"]:
assert isinstance(value, bytes)
@ -120,13 +118,11 @@ def serialize_param(bytes_io, type_, value):
l = len(value)
if l < 254: # short string format
bytes_io.write(struct.pack('<b', l)) # 1 byte of string
#bytes_io.write(int.to_bytes(l, 1, 'little')) # 1 byte of string
bytes_io.write(value) # string
bytes_io.write(b'\x00'*((-l-1) % 4)) # padding bytes
else:
bytes_io.write(b'\xfe') # byte 254
bytes_io.write(struct.pack('<i', l)[:3]) # 3 bytes of string
#bytes_io.write(int.to_bytes(l, 3, 'little')) # 3 bytes of string
bytes_io.write(value) # string
bytes_io.write(b'\x00'*(-l % 4)) # padding bytes
@ -177,6 +173,7 @@ def deserialize(bytes_io, type_=None, subtype=None):
x[arg['name']] = deserialize(bytes_io, type_=arg['type'], subtype=arg['subtype'])
return x
class Session:
""" Manages TCP Transport. encryption and message frames """
def __init__(self, ip, port):

View File

@ -18,6 +18,7 @@ def primesbelow(N):
smallprimeset = set(primesbelow(100000))
_smallprimeset = 100000
def isprime(n, precision=7):
# http://en.wikipedia.org/wiki/Miller-Rabin_primality_test#Algorithm_and_running_time
if n == 1 or n % 2 == 0:

View File

@ -43,10 +43,6 @@ print("PQ = %d\np = %d, q = %d" % (PQ, p, q))
P_bytes = struct.pack('>i', p)
Q_bytes = struct.pack('>i', q)
# print("p.bit_length()//8+1: " + str(p.bit_length()//8+1)) # 4
# print("q.bit_length()//8+1: " + str(q.bit_length()//8+1)) # 4
# P_bytes = int.to_bytes(p, p.bit_length()//8+1, 'big')
# Q_bytes = int.to_bytes(q, q.bit_length()//8+1, 'big')
f = open('rsa.pub', 'r')
key = RSA.importKey(f.read())
@ -78,8 +74,6 @@ z = Session.method_call('req_DH_params',
encrypted_data=encrypted_data)
encrypted_answer = z['encrypted_answer']
print("encrypted_answer:")
print(encrypted_answer.__repr__())
tmp_aes_key = SHA.new(new_nonce + server_nonce).digest() + SHA.new(server_nonce + new_nonce).digest()[0:12]
tmp_aes_iv = SHA.new(server_nonce + new_nonce).digest()[12:20] + SHA.new(new_nonce + new_nonce).digest() + new_nonce[0:4]
@ -91,8 +85,11 @@ print("\ntmp_aes_iv:")
mtproto.vis(tmp_aes_iv)
print(tmp_aes_iv.__repr__())
decrypted_answer = crypt.ige(encrypted_answer, tmp_aes_key, tmp_aes_iv)
print("decrypted_answer is:")
print(decrypted_answer.__repr__())
mtproto.vis(decrypted_answer[20:]) # To start off BA0D89 ...
crypter = crypt.IGE(tmp_aes_key, tmp_aes_iv)
answer_with_hash = crypter.decrypt(encrypted_answer)
answer_hash = answer_with_hash[:20]
answer = answer_with_hash[20:]
mtproto.vis(answer) # To start off BA0D89 ...

View File

@ -27,10 +27,10 @@ print("With key for AES 256 ECB: '" + str(aes_key) + "'")
# Initialization Vector must be 32 bytes
aes_iv = b'01234567890123456789012345678901'
print("And initialization vector: '" + str(aes_iv) + "'")
encrypted_msg = ige(msg_to_encrypt_padded, aes_key, aes_iv, operation="encrypt")
encrypted_msg = _ige(msg_to_encrypt_padded, aes_key, aes_iv, operation="encrypt")
print("\nEncrypted msg: '" + str(encrypted_msg) + "'")
print("In hex: " + encrypted_msg.__repr__())
decrypted_msg = ige(encrypted_msg, aes_key, aes_iv, operation="decrypt")
decrypted_msg = _ige(encrypted_msg, aes_key, aes_iv, operation="decrypt")
print("\nDecrypted msg: '" + str(decrypted_msg) + "'")
print("In hex: " + decrypted_msg.__repr__())
@ -44,7 +44,7 @@ msg_not_multiple_of_16 = "6bytes"
print("Trying to encrypt: '" + msg_not_multiple_of_16 +
"' of size: " + str(len(msg_not_multiple_of_16)))
try:
encrypted_msg = ige(msg_not_multiple_of_16, aes_key, aes_iv, operation="encrypt")
encrypted_msg = _ige(msg_not_multiple_of_16, aes_key, aes_iv, operation="encrypt")
except ValueError as ve:
print(" Correctly got ValueError: '" + str(ve) + "'")
@ -52,7 +52,7 @@ except ValueError as ve:
aes_key_not_32_bytes = b'0123456789'
print("Trying to use key: '" + str(aes_key_not_32_bytes) + "'")
try:
encrypted_msg = ige(msg_to_encrypt_padded, aes_key_not_32_bytes, aes_iv, operation="encrypt")
encrypted_msg = _ige(msg_to_encrypt_padded, aes_key_not_32_bytes, aes_iv, operation="encrypt")
except ValueError as ve:
print(" Correctly got ValueError: '" + str(ve) + "'")
@ -60,7 +60,7 @@ except ValueError as ve:
iv_key_not_32_bytes = b'0123456789'
print("Trying to use iv: '" + str(iv_key_not_32_bytes) + "'")
try:
encrypted_msg = ige(msg_to_encrypt_padded, aes_key, iv_key_not_32_bytes, operation="encrypt")
encrypted_msg = _ige(msg_to_encrypt_padded, aes_key, iv_key_not_32_bytes, operation="encrypt")
except ValueError as ve:
print(" Correctly got ValueError: '" + str(ve) + "'")