Files
2025-12-10 14:38:26 -08:00

466 lines
20 KiB
C++

// *****************************************************************************
// * This file is part of the FreeFileSync project. It is distributed under *
// * GNU General Public License: https://www.gnu.org/licenses/gpl-3.0 *
// * Copyright (C) Zenju (zenju AT freefilesync DOT org) - All Rights Reserved *
// *****************************************************************************
#include "parallel_scan.h"
#include <chrono>
#include <zen/thread.h>
#include <zen/scope_guard.h>
using namespace zen;
using namespace fff;
namespace
{
const int FOLDER_TRAVERSAL_LEVEL_MAX = 100;
/* PERF NOTE
---------------------------------------------
|Test case: Reading from two different disks|
---------------------------------------------
Windows 7:
1st(unbuffered) |2nd (OS buffered)
----------------------------------
1 Thread: 57s | 8s
2 Threads: 39s | 7s
---------------------------------------------------
|Test case: Reading two directories from same disk|
---------------------------------------------------
Windows 7: Windows XP:
1st(unbuffered) |2nd (OS buffered) 1st(unbuffered) |2nd (OS buffered)
---------------------------------- ----------------------------------
1 Thread: 41s | 13s 1 Thread: 45s | 13s
2 Threads: 42s | 11s 2 Threads: 38s | 8s
=> Traversing does not take any advantage of file locality so that multiple threads operating on the same disk impose no performance overhead! (even faster on XP) */
class AsyncCallback
{
public:
AsyncCallback(size_t threadsToFinish, std::chrono::milliseconds cbInterval) : threadsToFinish_(threadsToFinish), cbInterval_(cbInterval) {}
//blocking call: context of worker thread
AFS::TraverserCallback::HandleError reportError(const AFS::TraverserCallback::ErrorInfo& errorInfo) //throw ThreadStopRequest
{
assert(!runningOnMainThread());
std::unique_lock dummy(lockRequest_);
interruptibleWait(conditionReadyForNewRequest_, dummy, [this] { return !errorRequest_ && !errorResponse_; }); //throw ThreadStopRequest
errorRequest_ = errorInfo;
conditionNewRequest.notify_all();
interruptibleWait(conditionHaveResponse_, dummy, [this] { return static_cast<bool>(errorResponse_); }); //throw ThreadStopRequest
AFS::TraverserCallback::HandleError rv = *errorResponse_;
errorRequest_ = std::nullopt;
errorResponse_ = std::nullopt;
dummy.unlock(); //optimization for condition_variable::notify_all()
conditionReadyForNewRequest_.notify_all(); //instead of notify_one(); work around bug: https://svn.boost.org/trac/boost/ticket/7796
return rv;
}
//context of main thread
void waitUntilDone(const TravErrorCb& onError, const TravStatusCb& onStatusUpdate) //throw X
{
assert(runningOnMainThread());
for (;;)
{
const std::chrono::steady_clock::time_point callbackTime = std::chrono::steady_clock::now() + cbInterval_;
for (std::unique_lock dummy(lockRequest_) ;;) //process all errors without delay
{
const bool rv = conditionNewRequest.wait_until(dummy, callbackTime, [this] { return (errorRequest_ && !errorResponse_) || (threadsToFinish_ == 0); });
if (!rv) //time-out + condition not met
break;
if (errorRequest_ && !errorResponse_)
{
assert(threadsToFinish_ != 0);
switch (onError({errorRequest_->msg, errorRequest_->failTime, errorRequest_->retryNumber})) //throw X
{
case PhaseCallback::ignore:
errorResponse_ = AFS::TraverserCallback::HandleError::ignore;
break;
case PhaseCallback::retry:
errorResponse_ = AFS::TraverserCallback::HandleError::retry;
break;
}
conditionHaveResponse_.notify_all(); //instead of notify_one(); work around bug: https://svn.boost.org/trac/boost/ticket/7796
}
if (threadsToFinish_ == 0)
{
dummy.unlock();
onStatusUpdate(getStatusLine(), itemsScanned_); //throw X; one last call for accurate stat-reporting!
return;
}
}
//call member functions outside of mutex scope:
onStatusUpdate(getStatusLine(), itemsScanned_); //throw X
}
}
//perf optimization: comparison phase is 7% faster by avoiding needless std::wstring construction for reportCurrentFile()
bool mayReportCurrentFile(int threadIdx, std::chrono::steady_clock::time_point& lastReportTime) const
{
if (threadIdx != notifyingThreadIdx_) //only one thread at a time may report status: the first in sequential order
return false;
const auto now = std::chrono::steady_clock::now();
if (now > lastReportTime + cbInterval_) //perform ui updates not more often than necessary
{
lastReportTime = now; //keep "lastReportTime" at worker thread level to avoid locking!
return true;
}
return false;
}
void reportCurrentFile(const std::wstring& filePath) //context of worker thread
{
assert(!runningOnMainThread());
std::lock_guard dummy(lockCurrentStatus_);
currentFile_ = filePath;
}
void incItemsScanned() { ++itemsScanned_; }
//perf: scanning is almost entirely file I/O bound, not CPU bound! => no prob having multiple threads poking at the same variable!
void notifyTaskBegin(int threadIdx, size_t parallelOps)
{
assert(!zen::runningOnMainThread());
std::lock_guard dummy(lockCurrentStatus_);
[[maybe_unused]] const auto [it, inserted] = activeThreadIdxs_.emplace(threadIdx, parallelOps);
assert(inserted);
notifyingThreadIdx_ = activeThreadIdxs_.begin()->first;
}
void notifyTaskEnd(int threadIdx)
{
assert(!zen::runningOnMainThread());
{
std::lock_guard dummy(lockCurrentStatus_);
[[maybe_unused]] const size_t no = activeThreadIdxs_.erase(threadIdx);
assert(no == 1);
notifyingThreadIdx_ = activeThreadIdxs_.empty() ? 0 : activeThreadIdxs_.begin()->first;
}
{
std::lock_guard dummy(lockRequest_);
assert(threadsToFinish_ > 0);
if (--threadsToFinish_ == 0)
conditionNewRequest.notify_all(); //perf: should unlock mutex before notify!? (insignificant)
}
}
private:
std::wstring getStatusLine() //context of main thread, call repreatedly
{
assert(runningOnMainThread());
size_t parallelOpsTotal = 0;
std::wstring filePath;
{
std::lock_guard dummy(lockCurrentStatus_);
parallelOpsTotal = activeThreadIdxs_.size();
filePath = currentFile_;
}
if (parallelOpsTotal >= 2)
return L'[' + _P("1 thread", "%x threads", parallelOpsTotal) + L"] " + filePath;
else
return filePath;
}
//---- main <-> worker communication channel ----
std::mutex lockRequest_;
std::condition_variable conditionReadyForNewRequest_;
std::condition_variable conditionNewRequest;
std::condition_variable conditionHaveResponse_;
std::optional<AFS::TraverserCallback::ErrorInfo > errorRequest_;
std::optional<AFS::TraverserCallback::HandleError> errorResponse_;
size_t threadsToFinish_; //can't use activeThreadIdxs_.size() which is locked by different mutex!
//also note: activeThreadIdxs_.size() may be 0 during worker thread construction!
//---- status updates ----
std::mutex lockCurrentStatus_; //different lock for status updates so that we're not blocked by other threads reporting errors
std::wstring currentFile_;
std::map<int /*threadIdx*/, size_t /*parallelOps*/> activeThreadIdxs_;
std::atomic<int> notifyingThreadIdx_{0}; //CAVEAT: do NOT use boost::thread::id: https://svn.boost.org/trac/boost/ticket/5754
const std::chrono::milliseconds cbInterval_;
//---- status updates II (lock-free) ----
std::atomic<int> itemsScanned_{0}; //std:atomic is uninitialized by default!
};
//-------------------------------------------------------------------------------------------------
struct TraverserConfig
{
const AbstractPath baseFolderPath; //thread-safe like an int! :)
const FilterRef filter;
const SymLinkHandling handleSymlinks;
std::unordered_map<Zstring, Zstringc>& failedDirReads;
std::unordered_map<Zstring, Zstringc>& failedItemReads;
AsyncCallback& acb;
const int threadIdx;
std::chrono::steady_clock::time_point& lastReportTime; //thread-level
};
class DirCallback : public AFS::TraverserCallback
{
public:
DirCallback(TraverserConfig& cfg,
Zstring&& parentRelPathPf, //postfixed with FILE_NAME_SEPARATOR (or empty!)
FolderContainer& output,
int level) :
cfg_(cfg),
parentRelPathPf_(std::move(parentRelPathPf)),
output_(output),
level_(level) {} //MUST NOT use cfg_ during construction! see BaseDirCallback()
virtual void onFile (const AFS::FileInfo& fi) override; //
virtual std::shared_ptr<TraverserCallback> onFolder (const AFS::FolderInfo& fi) override; //throw ThreadStopRequest
virtual HandleLink onSymlink(const AFS::SymlinkInfo& li) override; //
HandleError reportDirError (const ErrorInfo& errorInfo) override { return reportError(errorInfo, Zstring()); } //throw ThreadStopRequest
HandleError reportItemError(const ErrorInfo& errorInfo, const Zstring& itemName) override { return reportError(errorInfo, itemName); } //
private:
HandleError reportError(const ErrorInfo& errorInfo, const Zstring& itemName /*optional*/); //throw ThreadStopRequest
TraverserConfig& cfg_;
const Zstring parentRelPathPf_;
FolderContainer& output_;
const int level_;
};
class BaseDirCallback : public DirCallback
{
public:
BaseDirCallback(const DirectoryKey& baseFolderKey, DirectoryValue& output,
AsyncCallback& acb, int threadIdx, std::chrono::steady_clock::time_point& lastReportTime) :
DirCallback(travCfg_ /*not yet constructed!!!*/, Zstring(), output.folderCont, 0 /*level*/),
travCfg_
{
baseFolderKey.folderPath,
baseFolderKey.filter,
baseFolderKey.handleSymlinks,
output.failedFolderReads,
output.failedItemReads,
acb,
threadIdx,
lastReportTime,
}
{
if (acb.mayReportCurrentFile(threadIdx, lastReportTime))
acb.reportCurrentFile(AFS::getDisplayPath(baseFolderKey.folderPath)); //just in case first directory access is blocking
}
private:
TraverserConfig travCfg_;
};
void DirCallback::onFile(const AFS::FileInfo& fi) //throw ThreadStopRequest
{
interruptionPoint(); //throw ThreadStopRequest
const Zstring& relPath = parentRelPathPf_ + fi.itemName;
//update status information no matter if item is excluded or not!
if (cfg_.acb.mayReportCurrentFile(cfg_.threadIdx, cfg_.lastReportTime))
cfg_.acb.reportCurrentFile(AFS::getDisplayPath(AFS::appendRelPath(cfg_.baseFolderPath, relPath)));
//------------------------------------------------------------------------------------
//apply filter before processing (use relative name!)
if (!cfg_.filter.ref().passFileFilter(relPath))
return;
//note: sync.ffs_db database and lock files are excluded via path filter!
output_.addFile(fi.itemName,
{
.modTime = fi.modTime,
.fileSize = fi.fileSize,
.filePrint = fi.filePrint,
.isFollowedSymlink = fi.isFollowedSymlink,
});
cfg_.acb.incItemsScanned(); //add 1 element to the progress indicator
}
std::shared_ptr<AFS::TraverserCallback> DirCallback::onFolder(const AFS::FolderInfo& fi) //throw ThreadStopRequest
{
interruptionPoint(); //throw ThreadStopRequest
Zstring relPath = parentRelPathPf_ + fi.itemName;
//update status information no matter if item is excluded or not!
if (cfg_.acb.mayReportCurrentFile(cfg_.threadIdx, cfg_.lastReportTime))
cfg_.acb.reportCurrentFile(AFS::getDisplayPath(AFS::appendRelPath(cfg_.baseFolderPath, relPath)));
//------------------------------------------------------------------------------------
//apply filter before processing (use relative name!)
bool childItemMightMatch = true;
const bool passFilter = cfg_.filter.ref().passDirFilter(relPath, &childItemMightMatch);
if (!passFilter && !childItemMightMatch)
return nullptr; //do NOT traverse subdirs
//else: ensure directory filtering is applied later to exclude actually filtered directories!!!
FolderContainer& subFolder = output_.addFolder(fi.itemName, {.isFollowedSymlink = fi.isFollowedSymlink});
if (passFilter)
cfg_.acb.incItemsScanned(); //add 1 element to the progress indicator
//------------------------------------------------------------------------------------
if (level_ > FOLDER_TRAVERSAL_LEVEL_MAX) //Win32 traverser: stack overflow approximately at level 1000
//check after FolderContainer::addFolder()
for (size_t retryNumber = 0;; ++retryNumber)
switch (reportItemError({replaceCpy(_("Cannot read directory %x."), L"%x", AFS::getDisplayPath(AFS::appendRelPath(cfg_.baseFolderPath, relPath))) +
L"\n\n" L"Endless recursion.", std::chrono::steady_clock::now(), retryNumber}, fi.itemName)) //throw ThreadStopRequest
{
case AFS::TraverserCallback::HandleError::retry:
break;
case AFS::TraverserCallback::HandleError::ignore:
return nullptr;
}
return std::make_shared<DirCallback>(cfg_, std::move(relPath += FILE_NAME_SEPARATOR), subFolder, level_ + 1);
}
DirCallback::HandleLink DirCallback::onSymlink(const AFS::SymlinkInfo& si) //throw ThreadStopRequest
{
interruptionPoint(); //throw ThreadStopRequest
const Zstring& relPath = parentRelPathPf_ + si.itemName;
//update status information no matter if item is excluded or not!
if (cfg_.acb.mayReportCurrentFile(cfg_.threadIdx, cfg_.lastReportTime))
cfg_.acb.reportCurrentFile(AFS::getDisplayPath(AFS::appendRelPath(cfg_.baseFolderPath, relPath)));
switch (cfg_.handleSymlinks)
{
case SymLinkHandling::exclude:
return HandleLink::skip;
case SymLinkHandling::asLink:
if (cfg_.filter.ref().passFileFilter(relPath)) //always use file filter: Link type may not be "stable" on Linux!
{
output_.addSymlink(si.itemName, {.modTime = si.modTime});
cfg_.acb.incItemsScanned(); //add 1 element to the progress indicator
}
return HandleLink::skip;
case SymLinkHandling::follow:
//filter symlinks before trying to follow them: handle user-excluded broken symlinks!
//since we don't know yet what type the symlink will resolve to, only do this when both filter variants agree:
if (!cfg_.filter.ref().passFileFilter(relPath))
{
bool childItemMightMatch = true;
if (!cfg_.filter.ref().passDirFilter(relPath, &childItemMightMatch))
if (!childItemMightMatch)
return HandleLink::skip;
}
return HandleLink::follow;
}
assert(false);
return HandleLink::skip;
}
DirCallback::HandleError DirCallback::reportError(const ErrorInfo& errorInfo, const Zstring& itemName /*optional*/) //throw ThreadStopRequest
{
const HandleError handleErr = cfg_.acb.reportError(errorInfo); //throw ThreadStopRequest
switch (handleErr)
{
case HandleError::ignore:
if (itemName.empty())
cfg_.failedDirReads.emplace(beforeLast(parentRelPathPf_, FILE_NAME_SEPARATOR, IfNotFoundReturn::none), utfTo<Zstringc>(errorInfo.msg));
else
cfg_.failedItemReads.emplace(parentRelPathPf_ + itemName, utfTo<Zstringc>(errorInfo.msg));
break;
case HandleError::retry:
break;
}
return handleErr;
}
}
std::map<DirectoryKey, DirectoryValue> fff::parallelFolderScan(const std::set<DirectoryKey>& foldersToRead,
const TravErrorCb& onError, const TravStatusCb& onStatusUpdate,
std::chrono::milliseconds cbInterval)
{
std::map<DirectoryKey, DirectoryValue> output;
//aggregate folder paths that are on the same root device:
// => one worker thread *per device*: avoid excessive parallelism
// => parallel folder traversal considers "parallel file operations" as specified by user
// => (S)FTP: avoid hitting connection limits inadvertently
std::map<AfsDevice, std::set<DirectoryKey>> perDeviceFolders;
for (const DirectoryKey& key : foldersToRead)
perDeviceFolders[key.folderPath.afsDevice].insert(key);
//communication channel used by threads
AsyncCallback acb(perDeviceFolders.size() /*threadsToFinish*/, cbInterval); //manage life time: enclose InterruptibleThread's!!!
std::vector<InterruptibleThread> worker;
ZEN_ON_SCOPE_SUCCESS( for (InterruptibleThread& wt : worker) wt.join(); ); //no stop needed in success case => preempt ~InterruptibleThread()
ZEN_ON_SCOPE_FAIL( for (InterruptibleThread& wt : worker) wt.requestStop(); ); //stop *all* at the same time before join!
//init worker threads
for (const auto& [afsDevice, dirKeys] : perDeviceFolders)
{
const int threadIdx = static_cast<int>(worker.size());
Zstring threadName = Zstr("Compare[") + numberTo<Zstring>(threadIdx + 1) + Zstr('/') + numberTo<Zstring>(perDeviceFolders.size()) + Zstr("] ") +
utfTo<Zstring>(AFS::getDisplayPath({afsDevice, AfsPath()}));
const size_t parallelOps = 1;
std::map<DirectoryKey, DirectoryValue*> workload;
for (const DirectoryKey& key : dirKeys)
workload.emplace(key, &output[key]); //=> DirectoryValue* unshared for lock-free worker-thread access
worker.emplace_back([afsDevice /*clang bug*/= afsDevice, workload, threadIdx, &acb, parallelOps, threadName = std::move(threadName)]() mutable
{
setCurrentThreadName(threadName);
acb.notifyTaskBegin(threadIdx, parallelOps);
ZEN_ON_SCOPE_EXIT(acb.notifyTaskEnd(threadIdx));
std::chrono::steady_clock::time_point lastReportTime; //keep thread-local!
AFS::TraverserWorkload travWorkload;
for (auto& [folderKey, folderVal] : workload)
{
assert(folderKey.folderPath.afsDevice == afsDevice);
travWorkload.emplace_back(folderKey.folderPath.afsPath, std::make_shared<BaseDirCallback>(folderKey, *folderVal, acb, threadIdx, lastReportTime));
}
AFS::traverseFolderRecursive(afsDevice, travWorkload, parallelOps); //throw ThreadStopRequest
});
}
acb.waitUntilDone(onError, onStatusUpdate); //throw X
return output;
}