Merge "updater: Move RangeSinkWrite into RangeSinkState."
This commit is contained in:
+152
-170
@@ -232,125 +232,135 @@ static void allocate(size_t size, std::vector<uint8_t>& buffer) {
|
|||||||
buffer.resize(size);
|
buffer.resize(size);
|
||||||
}
|
}
|
||||||
|
|
||||||
struct RangeSinkState {
|
/**
|
||||||
explicit RangeSinkState(RangeSet& rs) : tgt(rs) { };
|
* RangeSinkWriter reads data from the given FD, and writes them to the destination specified by the
|
||||||
|
* given RangeSet.
|
||||||
|
*/
|
||||||
|
class RangeSinkWriter {
|
||||||
|
public:
|
||||||
|
RangeSinkWriter(int fd, const RangeSet& tgt)
|
||||||
|
: fd_(fd), tgt_(tgt), next_range_(0), current_range_left_(0) {
|
||||||
|
CHECK_NE(tgt.count, static_cast<size_t>(0));
|
||||||
|
};
|
||||||
|
|
||||||
int fd;
|
bool Finished() const {
|
||||||
const RangeSet& tgt;
|
return next_range_ == tgt_.count && current_range_left_ == 0;
|
||||||
size_t p_block;
|
|
||||||
size_t p_remain;
|
|
||||||
};
|
|
||||||
|
|
||||||
static size_t RangeSinkWrite(const uint8_t* data, size_t size, RangeSinkState* rss) {
|
|
||||||
if (rss->p_remain == 0) {
|
|
||||||
LOG(ERROR) << "range sink write overrun";
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t written = 0;
|
size_t Write(const uint8_t* data, size_t size) {
|
||||||
while (size > 0) {
|
if (Finished()) {
|
||||||
size_t write_now = size;
|
LOG(ERROR) << "range sink write overrun; can't write " << size << " bytes";
|
||||||
|
return 0;
|
||||||
if (rss->p_remain < write_now) {
|
|
||||||
write_now = rss->p_remain;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (write_all(rss->fd, data, write_now) == -1) {
|
size_t written = 0;
|
||||||
break;
|
while (size > 0) {
|
||||||
}
|
// Move to the next range as needed.
|
||||||
|
if (current_range_left_ == 0) {
|
||||||
|
if (next_range_ < tgt_.count) {
|
||||||
|
off64_t offset = static_cast<off64_t>(tgt_.pos[next_range_ * 2]) * BLOCKSIZE;
|
||||||
|
current_range_left_ =
|
||||||
|
(tgt_.pos[next_range_ * 2 + 1] - tgt_.pos[next_range_ * 2]) * BLOCKSIZE;
|
||||||
|
next_range_++;
|
||||||
|
if (!discard_blocks(fd_, offset, current_range_left_)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
data += write_now;
|
if (!check_lseek(fd_, offset, SEEK_SET)) {
|
||||||
size -= write_now;
|
break;
|
||||||
|
}
|
||||||
rss->p_remain -= write_now;
|
} else {
|
||||||
written += write_now;
|
// We can't write any more; return how many bytes have been written so far.
|
||||||
|
|
||||||
if (rss->p_remain == 0) {
|
|
||||||
// Move to the next block.
|
|
||||||
++rss->p_block;
|
|
||||||
if (rss->p_block < rss->tgt.count) {
|
|
||||||
rss->p_remain =
|
|
||||||
(rss->tgt.pos[rss->p_block * 2 + 1] - rss->tgt.pos[rss->p_block * 2]) * BLOCKSIZE;
|
|
||||||
|
|
||||||
off64_t offset = static_cast<off64_t>(rss->tgt.pos[rss->p_block * 2]) * BLOCKSIZE;
|
|
||||||
if (!discard_blocks(rss->fd, offset, rss->p_remain)) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!check_lseek(rss->fd, offset, SEEK_SET)) {
|
size_t write_now = size;
|
||||||
break;
|
if (current_range_left_ < write_now) {
|
||||||
}
|
write_now = current_range_left_;
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
if (write_all(fd_, data, write_now) == -1) {
|
||||||
// We can't write any more; return how many bytes have been written so far.
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
data += write_now;
|
||||||
|
size -= write_now;
|
||||||
|
|
||||||
|
current_range_left_ -= write_now;
|
||||||
|
written += write_now;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return written;
|
||||||
}
|
}
|
||||||
|
|
||||||
return written;
|
private:
|
||||||
}
|
// The input data.
|
||||||
|
int fd_;
|
||||||
// All of the data for all the 'new' transfers is contained in one
|
// The destination for the data.
|
||||||
// file in the update package, concatenated together in the order in
|
const RangeSet& tgt_;
|
||||||
// which transfers.list will need it. We want to stream it out of the
|
// The next range that we should write to.
|
||||||
// archive (it's compressed) without writing it to a temp file, but we
|
size_t next_range_;
|
||||||
// can't write each section until it's that transfer's turn to go.
|
// The number of bytes to write before moving to the next range.
|
||||||
//
|
size_t current_range_left_;
|
||||||
// To achieve this, we expand the new data from the archive in a
|
};
|
||||||
// background thread, and block that threads 'receive uncompressed
|
|
||||||
// data' function until the main thread has reached a point where we
|
|
||||||
// want some new data to be written. We signal the background thread
|
|
||||||
// with the destination for the data and block the main thread,
|
|
||||||
// waiting for the background thread to complete writing that section.
|
|
||||||
// Then it signals the main thread to wake up and goes back to
|
|
||||||
// blocking waiting for a transfer.
|
|
||||||
//
|
|
||||||
// NewThreadInfo is the struct used to pass information back and forth
|
|
||||||
// between the two threads. When the main thread wants some data
|
|
||||||
// written, it sets rss to the destination location and signals the
|
|
||||||
// condition. When the background thread is done writing, it clears
|
|
||||||
// rss and signals the condition again.
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* All of the data for all the 'new' transfers is contained in one file in the update package,
|
||||||
|
* concatenated together in the order in which transfers.list will need it. We want to stream it out
|
||||||
|
* of the archive (it's compressed) without writing it to a temp file, but we can't write each
|
||||||
|
* section until it's that transfer's turn to go.
|
||||||
|
*
|
||||||
|
* To achieve this, we expand the new data from the archive in a background thread, and block that
|
||||||
|
* threads 'receive uncompressed data' function until the main thread has reached a point where we
|
||||||
|
* want some new data to be written. We signal the background thread with the destination for the
|
||||||
|
* data and block the main thread, waiting for the background thread to complete writing that
|
||||||
|
* section. Then it signals the main thread to wake up and goes back to blocking waiting for a
|
||||||
|
* transfer.
|
||||||
|
*
|
||||||
|
* NewThreadInfo is the struct used to pass information back and forth between the two threads. When
|
||||||
|
* the main thread wants some data written, it sets writer to the destination location and signals
|
||||||
|
* the condition. When the background thread is done writing, it clears writer and signals the
|
||||||
|
* condition again.
|
||||||
|
*/
|
||||||
struct NewThreadInfo {
|
struct NewThreadInfo {
|
||||||
ZipArchiveHandle za;
|
ZipArchiveHandle za;
|
||||||
ZipEntry entry;
|
ZipEntry entry;
|
||||||
|
|
||||||
RangeSinkState* rss;
|
RangeSinkWriter* writer;
|
||||||
|
|
||||||
pthread_mutex_t mu;
|
pthread_mutex_t mu;
|
||||||
pthread_cond_t cv;
|
pthread_cond_t cv;
|
||||||
};
|
};
|
||||||
|
|
||||||
static bool receive_new_data(const uint8_t* data, size_t size, void* cookie) {
|
static bool receive_new_data(const uint8_t* data, size_t size, void* cookie) {
|
||||||
NewThreadInfo* nti = reinterpret_cast<NewThreadInfo*>(cookie);
|
NewThreadInfo* nti = static_cast<NewThreadInfo*>(cookie);
|
||||||
|
|
||||||
while (size > 0) {
|
while (size > 0) {
|
||||||
// Wait for nti->rss to be non-null, indicating some of this
|
// Wait for nti->writer to be non-null, indicating some of this data is wanted.
|
||||||
// data is wanted.
|
pthread_mutex_lock(&nti->mu);
|
||||||
pthread_mutex_lock(&nti->mu);
|
while (nti->writer == nullptr) {
|
||||||
while (nti->rss == nullptr) {
|
pthread_cond_wait(&nti->cv, &nti->mu);
|
||||||
pthread_cond_wait(&nti->cv, &nti->mu);
|
|
||||||
}
|
|
||||||
pthread_mutex_unlock(&nti->mu);
|
|
||||||
|
|
||||||
// At this point nti->rss is set, and we own it. The main
|
|
||||||
// thread is waiting for it to disappear from nti.
|
|
||||||
size_t written = RangeSinkWrite(data, size, nti->rss);
|
|
||||||
data += written;
|
|
||||||
size -= written;
|
|
||||||
|
|
||||||
if (nti->rss->p_block == nti->rss->tgt.count) {
|
|
||||||
// we have written all the bytes desired by this rss.
|
|
||||||
|
|
||||||
pthread_mutex_lock(&nti->mu);
|
|
||||||
nti->rss = nullptr;
|
|
||||||
pthread_cond_broadcast(&nti->cv);
|
|
||||||
pthread_mutex_unlock(&nti->mu);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
pthread_mutex_unlock(&nti->mu);
|
||||||
|
|
||||||
return true;
|
// At this point nti->writer is set, and we own it. The main thread is waiting for it to
|
||||||
|
// disappear from nti.
|
||||||
|
size_t written = nti->writer->Write(data, size);
|
||||||
|
data += written;
|
||||||
|
size -= written;
|
||||||
|
|
||||||
|
if (nti->writer->Finished()) {
|
||||||
|
// We have written all the bytes desired by this writer.
|
||||||
|
|
||||||
|
pthread_mutex_lock(&nti->mu);
|
||||||
|
nti->writer = nullptr;
|
||||||
|
pthread_cond_broadcast(&nti->cv);
|
||||||
|
pthread_mutex_unlock(&nti->mu);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void* unzip_new_data(void* cookie) {
|
static void* unzip_new_data(void* cookie) {
|
||||||
@@ -381,28 +391,26 @@ static int ReadBlocks(const RangeSet& src, std::vector<uint8_t>& buffer, int fd)
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int WriteBlocks(const RangeSet& tgt, const std::vector<uint8_t>& buffer, int fd) {
|
static int WriteBlocks(const RangeSet& tgt, const std::vector<uint8_t>& buffer, int fd) {
|
||||||
const uint8_t* data = buffer.data();
|
size_t written = 0;
|
||||||
|
for (size_t i = 0; i < tgt.count; ++i) {
|
||||||
size_t p = 0;
|
off64_t offset = static_cast<off64_t>(tgt.pos[i * 2]) * BLOCKSIZE;
|
||||||
for (size_t i = 0; i < tgt.count; ++i) {
|
size_t size = (tgt.pos[i * 2 + 1] - tgt.pos[i * 2]) * BLOCKSIZE;
|
||||||
off64_t offset = static_cast<off64_t>(tgt.pos[i * 2]) * BLOCKSIZE;
|
if (!discard_blocks(fd, offset, size)) {
|
||||||
size_t size = (tgt.pos[i * 2 + 1] - tgt.pos[i * 2]) * BLOCKSIZE;
|
return -1;
|
||||||
if (!discard_blocks(fd, offset, size)) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!check_lseek(fd, offset, SEEK_SET)) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (write_all(fd, data + p, size) == -1) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
p += size;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
if (!check_lseek(fd, offset, SEEK_SET)) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (write_all(fd, buffer.data() + written, size) == -1) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
written += size;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Parameters for transfer list command functions
|
// Parameters for transfer list command functions
|
||||||
@@ -1215,45 +1223,31 @@ static int PerformCommandZero(CommandParameters& params) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int PerformCommandNew(CommandParameters& params) {
|
static int PerformCommandNew(CommandParameters& params) {
|
||||||
|
if (params.cpos >= params.tokens.size()) {
|
||||||
|
LOG(ERROR) << "missing target blocks for new";
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
if (params.cpos >= params.tokens.size()) {
|
RangeSet tgt = parse_range(params.tokens[params.cpos++]);
|
||||||
LOG(ERROR) << "missing target blocks for new";
|
|
||||||
return -1;
|
if (params.canwrite) {
|
||||||
|
LOG(INFO) << " writing " << tgt.size << " blocks of new data";
|
||||||
|
|
||||||
|
RangeSinkWriter writer(params.fd, tgt);
|
||||||
|
pthread_mutex_lock(¶ms.nti.mu);
|
||||||
|
params.nti.writer = &writer;
|
||||||
|
pthread_cond_broadcast(¶ms.nti.cv);
|
||||||
|
|
||||||
|
while (params.nti.writer != nullptr) {
|
||||||
|
pthread_cond_wait(¶ms.nti.cv, ¶ms.nti.mu);
|
||||||
}
|
}
|
||||||
|
|
||||||
RangeSet tgt = parse_range(params.tokens[params.cpos++]);
|
pthread_mutex_unlock(¶ms.nti.mu);
|
||||||
|
}
|
||||||
|
|
||||||
if (params.canwrite) {
|
params.written += tgt.size;
|
||||||
LOG(INFO) << " writing " << tgt.size << " blocks of new data";
|
|
||||||
|
|
||||||
RangeSinkState rss(tgt);
|
return 0;
|
||||||
rss.fd = params.fd;
|
|
||||||
rss.p_block = 0;
|
|
||||||
rss.p_remain = (tgt.pos[1] - tgt.pos[0]) * BLOCKSIZE;
|
|
||||||
|
|
||||||
off64_t offset = static_cast<off64_t>(tgt.pos[0]) * BLOCKSIZE;
|
|
||||||
if (!discard_blocks(params.fd, offset, tgt.size * BLOCKSIZE)) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!check_lseek(params.fd, offset, SEEK_SET)) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_lock(¶ms.nti.mu);
|
|
||||||
params.nti.rss = &rss;
|
|
||||||
pthread_cond_broadcast(¶ms.nti.cv);
|
|
||||||
|
|
||||||
while (params.nti.rss) {
|
|
||||||
pthread_cond_wait(¶ms.nti.cv, ¶ms.nti.mu);
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_unlock(¶ms.nti.mu);
|
|
||||||
}
|
|
||||||
|
|
||||||
params.written += tgt.size;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int PerformCommandDiff(CommandParameters& params) {
|
static int PerformCommandDiff(CommandParameters& params) {
|
||||||
@@ -1296,40 +1290,28 @@ static int PerformCommandDiff(CommandParameters& params) {
|
|||||||
LOG(INFO) << "patching " << blocks << " blocks to " << tgt.size;
|
LOG(INFO) << "patching " << blocks << " blocks to " << tgt.size;
|
||||||
Value patch_value(
|
Value patch_value(
|
||||||
VAL_BLOB, std::string(reinterpret_cast<const char*>(params.patch_start + offset), len));
|
VAL_BLOB, std::string(reinterpret_cast<const char*>(params.patch_start + offset), len));
|
||||||
RangeSinkState rss(tgt);
|
|
||||||
rss.fd = params.fd;
|
|
||||||
rss.p_block = 0;
|
|
||||||
rss.p_remain = (tgt.pos[1] - tgt.pos[0]) * BLOCKSIZE;
|
|
||||||
|
|
||||||
off64_t offset = static_cast<off64_t>(tgt.pos[0]) * BLOCKSIZE;
|
|
||||||
if (!discard_blocks(params.fd, offset, rss.p_remain)) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!check_lseek(params.fd, offset, SEEK_SET)) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
RangeSinkWriter writer(params.fd, tgt);
|
||||||
if (params.cmdname[0] == 'i') { // imgdiff
|
if (params.cmdname[0] == 'i') { // imgdiff
|
||||||
if (ApplyImagePatch(
|
if (ApplyImagePatch(params.buffer.data(), blocks * BLOCKSIZE, &patch_value,
|
||||||
params.buffer.data(), blocks * BLOCKSIZE, &patch_value,
|
std::bind(&RangeSinkWriter::Write, &writer, std::placeholders::_1,
|
||||||
std::bind(&RangeSinkWrite, std::placeholders::_1, std::placeholders::_2, &rss),
|
std::placeholders::_2),
|
||||||
nullptr, nullptr) != 0) {
|
nullptr, nullptr) != 0) {
|
||||||
LOG(ERROR) << "Failed to apply image patch.";
|
LOG(ERROR) << "Failed to apply image patch.";
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (ApplyBSDiffPatch(
|
if (ApplyBSDiffPatch(params.buffer.data(), blocks * BLOCKSIZE, &patch_value, 0,
|
||||||
params.buffer.data(), blocks * BLOCKSIZE, &patch_value, 0,
|
std::bind(&RangeSinkWriter::Write, &writer, std::placeholders::_1,
|
||||||
std::bind(&RangeSinkWrite, std::placeholders::_1, std::placeholders::_2, &rss),
|
std::placeholders::_2),
|
||||||
nullptr) != 0) {
|
nullptr) != 0) {
|
||||||
LOG(ERROR) << "Failed to apply bsdiff patch.";
|
LOG(ERROR) << "Failed to apply bsdiff patch.";
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// We expect the output of the patcher to fill the tgt ranges exactly.
|
// We expect the output of the patcher to fill the tgt ranges exactly.
|
||||||
if (rss.p_block != tgt.count || rss.p_remain != 0) {
|
if (!writer.Finished()) {
|
||||||
LOG(ERROR) << "range sink underrun?";
|
LOG(ERROR) << "range sink underrun?";
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Reference in New Issue
Block a user