1
0
mirror of https://github.com/danog/ton.git synced 2024-11-27 04:24:52 +01:00
ton/crypto/block/Binlog.cpp
2019-09-07 14:33:36 +04:00

494 lines
15 KiB
C++

/*
This file is part of TON Blockchain Library.
TON Blockchain Library is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 2 of the License, or
(at your option) any later version.
TON Blockchain Library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with TON Blockchain Library. If not, see <http://www.gnu.org/licenses/>.
Copyright 2017-2019 Telegram Systems LLP
*/
#include "crypto/block/Binlog.h"
#include "td/utils/as.h"
#include "td/utils/misc.h"
#include "td/utils/port/path.h"
#include <sstream>
namespace block {
/*
*
* GENERIC BINLOG (move to separate file)
*
*/
BinlogBuffer::BinlogBuffer(std::unique_ptr<BinlogCallback> cb, std::size_t _max_size, td::FileFd fd)
: cb(std::move(cb))
, need_more_bytes(0)
, eptr(nullptr)
, log_rpos(0)
, log_cpos(0)
, log_wpos(0)
, fd(std::move(fd))
, replica(false)
, writing(false)
, dirty(false)
, created(false)
, ok(false) {
max_size = _max_size;
start = static_cast<unsigned char*>(std::malloc(max_size));
DCHECK(start);
rptr = wptr = cptr = start;
end = start + max_size;
}
unsigned char* BinlogBuffer::alloc_log_event_force(std::size_t size) {
unsigned char* res = alloc_log_event(size);
if (!res) {
throw LevAllocError{size};
}
return res;
}
unsigned char* BinlogBuffer::try_alloc_log_event(std::size_t size) {
if (!eptr) {
if (end - wptr >= (long)size) {
unsigned char* res = wptr;
wptr += size;
log_wpos += size;
return res;
}
eptr = wptr;
wptr = start;
if (rptr == eptr) {
rptr = start;
}
if (cptr == eptr) {
cptr = start;
}
}
if (rptr - wptr > (long)size) {
unsigned char* res = wptr;
wptr += size;
log_wpos += size;
return res;
}
return nullptr;
}
bool BinlogBuffer::flush(int mode) {
auto r_res = try_flush(mode);
if (r_res.is_ok()) {
return r_res.ok();
}
std::string msg = PSTRING() << "cannot flush binlog file " << binlog_name << " at position " << log_rpos << " "
<< r_res.error();
LOG(ERROR) << msg;
throw BinlogError{msg};
}
td::Result<bool> BinlogBuffer::try_flush(int mode) {
LOG(DEBUG) << "in flush: writing=" << writing << " r=" << rptr - start << " c=" << cptr - start
<< " w=" << wptr - start << "; rp=" << log_rpos << " cp=" << log_cpos << " wp=" << log_wpos;
if (!writing || rptr == cptr) {
return false; // nothing to flush
}
DCHECK(!fd.empty()); // must have an open binlog file
while (rptr != cptr) {
unsigned char* tptr = (cptr >= rptr ? cptr : eptr);
DCHECK(rptr <= tptr);
auto sz = tptr - rptr;
if (sz) {
LOG(INFO) << "writing " << sz << " bytes to binlog " << binlog_name << " at position " << log_rpos;
TRY_RESULT(res, fd.pwrite(td::Slice(rptr, sz), log_rpos));
if (static_cast<td::int64>(res) != sz) {
return td::Status::Error(PSLICE() << "written " << res << " bytes instead of " << sz);
}
log_rpos += sz;
rptr += sz;
}
if (rptr == eptr) {
rptr = start;
eptr = nullptr;
}
}
if (mode >= 3) {
LOG(INFO) << "syncing binlog " << binlog_name << " (position " << log_rpos << ")";
TRY_STATUS(fd.sync());
}
return true;
}
unsigned char* BinlogBuffer::alloc_log_event(std::size_t size) {
if (!writing) {
throw BinlogError{"cannot create new binlog event: binlog not open for writing"};
}
if (size >= max_size || size > max_event_size) {
return nullptr;
}
size = (size + 3) & -4;
unsigned char* res = try_alloc_log_event(size);
if (!res) {
flush();
return try_alloc_log_event(size);
} else {
return res;
}
}
bool BinlogBuffer::commit_range(unsigned long long pos_start, unsigned long long pos_end) {
// TODO: make something more clever, with partially committed/uncommitted segments in [cpos..wpos] range
if (pos_start != log_cpos || pos_end < pos_start || pos_end > log_wpos) {
return false;
}
if (!pos_start && pos_end >= pos_start + 4 && td::as<unsigned>(cptr) != 0x0442446b) {
throw BinlogError{"incorrect magic"};
}
long long size = pos_end - pos_start;
replay_range(cptr, pos_start, pos_end);
log_cpos = pos_end;
cptr += size;
if (eptr && cptr >= eptr) {
cptr -= eptr - start;
}
return true;
}
bool BinlogBuffer::rollback_range(unsigned long long pos_start, unsigned long long pos_end) {
if (pos_start < log_cpos || pos_end < pos_start || pos_end != log_wpos) {
return false;
}
long long size = pos_end - pos_start;
log_wpos = pos_end;
if (size >= wptr - start) {
wptr -= size;
} else {
DCHECK(eptr);
wptr += eptr - start - size;
}
return true;
}
void BinlogBuffer::NewBinlogEvent::commit() {
//LOG(DEBUG) << "in NewBinlogEvent::commit (status = " << status << ")";
if (!(status & 4)) {
throw BinlogError{"cannot commit new binlog event: already committed or rolled back"};
}
if (!bb.commit_range(pos, pos + size)) {
throw BinlogError{"cannot commit new binlog event: possibly some earlier log events are not committed yet"};
}
status = 1;
//LOG(DEBUG) << "after NewBinlogEvent::commit (status = " << status << ")";
}
void BinlogBuffer::NewBinlogEvent::rollback() {
if (!(status & 4)) {
throw BinlogError{"cannot roll back new binlog event: already committed or rolled back"};
}
if (!bb.rollback_range(pos, pos + size)) {
throw BinlogError{"cannot roll back new binlog event: possibly some later log event are already committed"};
}
status = 2;
}
BinlogBuffer::NewBinlogEvent::~NewBinlogEvent() {
if (status & 4) {
if (status == 5) {
status = 4;
commit();
} else if (status == 6) {
status = 4;
rollback();
} else {
LOG(ERROR) << "newly-allocated binlog event is neither committed nor rolled back (automatically rolling back)";
rollback();
}
}
}
void BinlogBuffer::replay_range(unsigned char* ptr, unsigned long long pos_start, unsigned long long pos_end) {
unsigned char* tptr = (ptr <= wptr ? wptr : eptr);
long long avail = tptr - ptr;
while (pos_start < pos_end) {
if (ptr == eptr) {
ptr = start;
tptr = wptr;
avail = tptr - ptr;
if (avail > (long long)(pos_end - pos_start)) {
avail = pos_end - pos_start;
}
}
int res = (avail >= 4 ? cb->replay_log_event(*this, reinterpret_cast<const unsigned*>(ptr),
td::narrow_cast<size_t>(avail), pos_start)
: -0x7ffffffc);
if (res <= 0 || res > avail) {
std::ostringstream ss;
ss << "cannot interpret newly-committed binlog event 0x" << std::hex
<< (avail >= 4 ? (unsigned)td::as<unsigned>(ptr) : 0u) << std::dec << ": error " << res;
throw BinlogError{ss.str()};
}
ptr += res;
pos_start += res;
avail -= res;
}
}
int BinlogBuffer::replay_pending(bool allow_partial) {
if (rptr == cptr) {
return 0;
}
unsigned char* tptr = (rptr <= cptr ? cptr : eptr);
long long avail = tptr - rptr;
DCHECK(tptr && avail >= 0);
while (rptr != cptr) {
int res = (avail >= 4 ? cb->replay_log_event(*this, reinterpret_cast<const unsigned*>(rptr),
td::narrow_cast<size_t>(avail), log_rpos)
: -0x7ffffffc);
if (res > 0) {
if (res > avail) {
throw BinlogError{"binlog event used more bytes than available"};
}
avail -= res;
log_rpos += res;
rptr += res;
if (rptr != eptr) {
continue;
}
rptr = start;
tptr = cptr;
avail = tptr - rptr;
continue;
}
long long prev_need = 0;
while (res < -0x40000000) {
long long need = res - 0x80000000;
need = (need + 3) & -4;
if (need > (long long)max_event_size) {
throw BinlogError{"binlog event requires too many bytes"};
}
if (need <= avail) {
throw BinlogError{"binlog event requires more bytes, but we already had them"};
}
if (need <= prev_need) {
throw BinlogError{"binlog event requires more bytes, but we already had them"};
}
prev_need = need;
long long total_avail = avail + (rptr > cptr ? cptr - start : 0);
if (need > total_avail) {
if (allow_partial) {
need_more_bytes = td::narrow_cast<size_t>(need - total_avail);
return 2;
} else {
throw BinlogError{"binlog event extends past end of buffer"};
}
}
if (need <= 1024) {
unsigned char tmp[1024];
std::memcpy(tmp, rptr, td::narrow_cast<size_t>(avail));
std::memcpy(tmp + avail, start, td::narrow_cast<size_t>(need - avail));
res = cb->replay_log_event(*this, reinterpret_cast<const unsigned*>(tmp), td::narrow_cast<size_t>(need),
log_rpos);
} else {
unsigned char* tmp = static_cast<unsigned char*>(std::malloc(td::narrow_cast<size_t>(need)));
std::memcpy(tmp, rptr, td::narrow_cast<size_t>(avail));
std::memcpy(tmp + avail, start, td::narrow_cast<size_t>(need - avail));
res = cb->replay_log_event(*this, reinterpret_cast<const unsigned*>(tmp), td::narrow_cast<size_t>(need),
log_rpos);
std::free(tmp);
}
if (res > need) {
throw BinlogError{"binlog event used more bytes than available"};
}
}
if (res < 0) {
return res;
}
if (!res) {
throw BinlogError{"unknown error while interpreting binlog event"};
}
if (res < avail) {
avail -= res;
log_rpos += res;
rptr += res;
continue;
}
DCHECK(eptr);
log_rpos += res;
rptr += res;
rptr = start + (rptr - eptr);
eptr = nullptr;
DCHECK(start <= rptr && rptr <= cptr && cptr <= wptr && wptr <= end);
}
return 1;
}
BinlogBuffer::~BinlogBuffer() {
if (start) {
if (writing) {
flush(2);
}
std::free(start);
}
}
td::Status BinlogBuffer::set_binlog(std::string new_binlog_name, int mode) {
if (!binlog_name.empty() || !fd.empty()) {
return td::Status::Error("binlog buffer already attached to a file");
}
td::int32 flags = td::FileFd::Read;
if ((mode & 1) != 0) {
flags |= td::FileFd::Write;
}
auto r_fd = td::FileFd::open(new_binlog_name, flags, 0640);
if (r_fd.is_error()) {
if (!(~mode & 3)) {
TRY_RESULT(new_fd, td::FileFd::open(new_binlog_name, flags | td::FileFd::CreateNew, 0640));
fd = std::move(new_fd);
created = true;
} else {
return r_fd.move_as_error();
}
} else {
fd = r_fd.move_as_ok();
}
replica = !(mode & 1);
if (!replica) {
TRY_STATUS(fd.lock(td::FileFd::LockFlags::Write, new_binlog_name, 100));
}
if (created) {
writing = true;
td::Status res;
try {
res = cb->init_new_binlog(*this);
} catch (BinlogBuffer::BinlogError& err) {
res = td::Status::Error(err.msg);
}
if (res.is_error()) {
fd.close();
td::unlink(new_binlog_name).ignore();
writing = false;
return res;
}
binlog_name = new_binlog_name;
ok = true;
return td::Status::OK();
}
binlog_name = new_binlog_name;
auto res = replay_binlog(replica);
if (res.is_error()) {
return res.move_as_error();
}
if (!replica) {
if (log_rpos != log_wpos || log_rpos != log_cpos || rptr != wptr || rptr != cptr) {
std::string msg = (PSLICE() << "error while interpreting binlog `" << binlog_name << "`: " << log_wpos - log_rpos
<< " bytes left uninterpreted at position " << log_rpos << ", truncated binlog?")
.c_str();
LOG(ERROR) << msg;
return td::Status::Error(msg);
}
//rptr = wptr = cptr = start;
//eptr = nullptr;
LOG(INFO) << "read and interpreted " << res.move_as_ok() << " bytes from binlog `" << binlog_name
<< "`, final position " << log_rpos << ", reopening in write mode";
writing = true;
if (!log_rpos) {
td::Status status;
try {
status = cb->init_new_binlog(*this);
} catch (BinlogBuffer::BinlogError& err) {
status = td::Status::Error(err.msg);
}
if (status.is_error()) {
fd.close();
td::unlink(new_binlog_name).ignore();
writing = false;
return status;
}
}
}
ok = true;
return td::Status::OK();
}
td::Result<long long> BinlogBuffer::replay_binlog(bool allow_partial) {
if (writing) {
return 0;
}
long long total = 0;
while (true) {
auto res = read_file();
if (res.is_error()) {
return res.move_as_error();
}
long long sz = res.move_as_ok();
total += sz;
try {
cptr = wptr;
log_cpos = log_wpos;
if (!log_rpos && rptr == start && wptr >= rptr + 4 && td::as<unsigned>(rptr) != 0x0442446b) {
throw BinlogError{"incorrect magic"};
}
int r = replay_pending(allow_partial || sz != 0);
if (r < 0 && r >= -0x40000000) {
throw InterpretError{(PSLICE() << "binlog error " << r).c_str()};
}
} catch (BinlogError err) {
LOG(ERROR) << "error reading binlog " << binlog_name << ": " << err.msg << " at position " << log_rpos;
return td::Status::Error(PSLICE() << "error reading binlog " << binlog_name << ": " << err.msg << " at position "
<< log_rpos);
} catch (InterpretError err) {
LOG(ERROR) << "error interpreting binlog " << binlog_name << ": " << err.msg << " at position " << log_rpos;
return td::Status::Error(PSLICE() << "error interpreting binlog " << binlog_name << ": " << err.msg
<< " at position " << log_rpos);
}
if (!sz) {
break;
}
};
return total;
}
td::Result<int> BinlogBuffer::read_file() {
unsigned char* ptr = wptr;
std::size_t sz = end - wptr;
if (rptr > wptr) {
DCHECK(eptr);
sz = rptr - wptr;
if (sz <= 4) {
return 0; // buffer full
}
sz -= 4;
} else if (!sz) {
DCHECK(!eptr);
if (rptr <= start + 4) {
return 0; // buffer full
}
eptr = end;
ptr = wptr = start;
sz = rptr - start - 4;
}
auto r_res = fd.pread(td::MutableSlice(ptr, sz), log_wpos);
if (r_res.is_error()) {
std::string msg = PSTRING() << "error reading binlog file `" << binlog_name << "` at position " << log_wpos << " : "
<< r_res.error();
LOG(ERROR) << msg;
return td::Status::Error(msg);
}
auto res = r_res.move_as_ok();
DCHECK(std::size_t(res) <= sz);
LOG(INFO) << "read " << res << " bytes from binlog `" << binlog_name << "` at position " << log_wpos;
log_wpos += res;
wptr += res;
return (int)res;
}
} // namespace block