nydusd 源码理解(一)

尝试通过 nydus[1] 源码理解工作流程。可能由于代码变动导致和本文记录的内容有出入。

1. 环境准备

git clone https://github.com/dragonflyoss/image-service.git cd image-service make   

编译的目标文件位于 target 文件夹内,默认编译的 debug 版本。

可以看到,项目的二进制文件包含 nydusctl (命令行工具)、nydusd(nydus 主体程序,以守护进程的形式运行)、nydus-image(nydus 镜像文件处理工具)三种。

all: build  # Targets that are exposed to developers and users. build: .format     ${CARGO} build $(CARGO_COMMON)$(CARGO_BUILD_FLAGS)     # Cargo will skip checking if it is already checked     ${CARGO} clippy $(CARGO_COMMON) --workspace $(EXCLUDE_PACKAGES) --bins --tests -- -Dwarnings  .format:     ${CARGO} fmt -- --check   

执行 make编译项目时,会首先使用 cargo fmt -- --check 命令对代码格式进行检查。

本文使用的 nydus 版本:

./target/debug/nydusd --version   

2. 代码流程理解

项目的入口函数位于 src/bin 目录下:

分别对应生成的二进制文件 nydusctlnydusdnydus-image,首先,理解最重要的部分nydusd

Nydusd 是运行在用户态的守护进程,可以通过 nydus-snapshotter 进行管理,主要负责处理 fuse 下发的 I/O 请求,当数据不存在本地缓存时,从 backend(registry,OSS,localfs)获取数据内容。


mkdir /rafs_mnt ./target/debug/nydusd fuse --thread-num 4 --mountpoint /rafs_mnt --apisock api_sock   

2.1 入口函数


nydusd 源码理解(一)


接下来是解析子命令,nydusd 包括 3 个子命令,分别是 singleton、fuse 和 virtiofs:

对于每个子命令,都会再次获取对应的命令参数也就是 args 中 subcommand 的参数内容。fuse指定nydusd 作为专门针对 FUSE 的 server 运行,virtiofs指定nydusd专门作为 virtiofs 的 server 运行,singleton指定nydusd作为全局守护进程运行,可以同时为 blobcache/fscache/fuse/virtio-fs 提供服务。

2.2 FUSE subcommand 启动流程

process_default_fs_service(subargs, bti, apisock, true)?;  // 函数声明 fn process_default_fs_service(     args: SubCmdArgs,    //提取的子命令参数     bti: BuildTimeInfo,    // 编译时信息     apisock: Option<&str>,    // api socket 路径     is_fuse: bool,    // 是否为 fuse 文件系统 ) -> Result<()> { 内容太长,省略 }   



virtual_mnt 是挂载的目录位置。

(1)shared_dir 不为空时

let cmd = FsBackendMountCmd {     fs_type: nydus::FsBackendType::PassthroughFs,     source: shared_dir.to_string(),     config: "".to_string(),     mountpoint: virtual_mnt.to_string(),     prefetch_files: None, };   

(2)bootstrap 不为空(只使用 rafs 文件系统)

检测是否传入localfs-dir参数,如果传入,则根据传入的参数生成配置信息,否则,必须传入config参数。此外,解析传入的 prefetch_files 列表:

let config = match args.value_of("localfs-dir") {     Some(v) => {         format!(             r###" {{     "device": {{         "backend": {{             "type": "localfs",             "config": {{                 "dir": {:?},                 "readahead": true             }}         }},         "cache": {{             "type": "blobcache",             "config": {{                 "compressed": false,                 "work_dir": {:?}             }}         }}     }},     "mode": "direct",     "digest_validate": false,     "iostats_files": false }} "###,             v, v         )     }     None => match args.value_of("config") {         Some(v) => std::fs::read_to_string(v)?,         None => {             let e = DaemonError::InvalidArguments(                 "both --config and --localfs-dir are missing".to_string(),             );             returnErr(e.into());         }     }, };  let prefetch_files: Option<Vec<String>> = args     .values_of("prefetch-files")     .map(|files| files.map(|s| s.to_string()).collect()); 
let cmd = FsBackendMountCmd {     fs_type: nydus::FsBackendType::Rafs,     source: b.to_string(),     config: std::fs::read_to_string(config)?,     mountpoint: virtual_mnt.to_string(),     prefetch_files, };   

当生成挂载命令cmd后,接下来会根据 opts 参数新建 vfs 实例。

let vfs = fuse_backend_rs::api::Vfs::new(opts); let vfs = Arc::new(vfs);   

2.3 Vfs 结构体分析

/// A union fs that combines multiple backend file systems. pubstruct Vfs {     next_super: AtomicU8,     root: PseudoFs,     // mountpoints maps from pseudo fs inode to mounted fs mountpoint data     mountpoints: ArcSwap<HashMap<u64, Arc<MountPointData>>>,     // superblocks keeps track of all mounted file systems     superblocks: ArcSuperBlock,     opts: ArcSwap<VfsOptions>,     initialized: AtomicBool,     lock: Mutex<()>, }   

新建 Vfs 实例的时候:

impl Vfs {     /// Create a new vfs instance     pubfn new(opts: VfsOptions) -> Self {         Vfs {             // 下一个可用的 pseudo index             next_super: AtomicU8::new((VFS_PSEUDO_FS_IDX + 1) asu8),             // 挂载点,是一个 Hashmap             mountpoints: ArcSwap::new(Arc::new(HashMap::new())),             // 超级块,数组             superblocks: ArcSwap::new(Arc::new(vec![None; MAX_VFS_INDEX])),             // root,是一个 PseudoFs 实例             root: PseudoFs::new(),             // 传入的参数             opts: ArcSwap::new(Arc::new(opts)),             // 锁             lock: Mutex::new(()),             // 是否已经初始化             initialized: AtomicBool::new(false),         }     }     ... }   

next_super的值初始化为 1,长度为 64 位的 inode number 被拆分为两部分,前 8 位用于标记被挂载的文件系统类型,剩下的 56 位供后端文件系统使用,最大值为VFS_MAX_INO

/// Maximum inode number supported by the VFS for backend file system pubconst VFS_MAX_INO: u64 = 0xff_ffff_ffff_ffff;  // The 64bit inode number for VFS is divided into two parts: // 1. an 8-bit file-system index, to identify mounted backend file systems. // 2. the left bits are reserved for backend file systems, and it's limited to VFS_MAX_INO. const VFS_INDEX_SHIFT: u8 = 56; const VFS_PSEUDO_FS_IDX: VfsIndex = 0;   


pubstruct PseudoFs {     // 下一个可用的 inode     next_inode: AtomicU64,     // 根 inode,指向 PseudoInode 类型的指针     root_inode: Arc<PseudoInode>,     // inodes,类行为 Hashmap     inodes: ArcSwap<HashMap<u64, Arc<PseudoInode>>>,     lock: Mutex<()>, // Write protect PseudoFs.inodes and PseudoInode.children }   


struct PseudoInode {     // 当前 inode     ino: u64,     // parent 的 inode     parent: u64,     // children 的列表(PseudoInode 类型的指针)     children: ArcSwap<Vec<Arc<PseudoInode>>>,     name: String, }   

nydus 中 Vfs 结构体的组成图示:

回到新建 vfs 实例之后的流程。接下来会获取 daemon_id 和 supervisor 参数(在 live-upgrade/failover 的时候需要)。

然后,根据挂载命令创建 NydusDaemon

2.4 针对 FUSE 的 NydusDaemon

is_fusetrue 时,开始创建 daemon:

(1)获取 fuse server 的线程数量值;

(2)获取 mountpoint 参数的值;

(3)创建 daemon

let daemon = {     fusedev::create_fuse_daemon(         mountpoint,     // 挂载点路径         vfs,            // 创建的 vfs 实例         supervisor,         daemon_id,         threads,        // 线程数量         apisock,        // api socket 路径         args.is_present("upgrade"),         !args.is_present("writable"),         p,              // failover-policy         mount_cmd,      // 挂载命令         bti,     )     .map(|d| {         info!("Fuse daemon started!");         d     })     .map_err(|e| {         error!("Failed in starting daemon: {}", e);         e     })? }; DAEMON_CONTROLLER.set_daemon(daemon);   

fusedev::create_fuse_daemon 函数中,主要的逻辑如下:

(1)创建两个 channel

let (trigger, events_rx) = channel::<DaemonStateMachineInput>(); let (result_sender, result_receiver) = channel::<DaemonResult<()>>();   

channel 是用于线程间通信,返回值分别为 senderrecver,例如:(trigger, events_rx) 中,trigger 为发送者,events_rx 为接收者。

(2)创建 Service 实例

let service = FusedevFsService::new(vfs, &mnt, supervisor.as_ref(), fp, readonly)?;  impl FusedevFsService {     fn new(         vfs: Arc<Vfs>,         mnt: &Path,         supervisor: Option<&String>,         fp: FailoverPolicy,         readonly: bool,     ) -> Result<Self> {         // 创建和 FUSE 的 session         let session = FuseSession::new(mnt, "rafs", "", readonly).map_err(|e| eother!(e))?;         let upgrade_mgr = supervisor             .as_ref()             .map(|s| Mutex::new(UpgradeManager::new(s.to_string().into())));          Ok(FusedevFsService {             vfs: vfs.clone(),             conn: AtomicU64::new(0),             failover_policy: fp,             session: Mutex::new(session),             server: Arc::new(Server::new(vfs)),             upgrade_mgr,              backend_collection: Default::default(),             inflight_ops: Mutex::new(Vec::new()),         })     }     ... }   

(3)创建 Daemon 实例:

let daemon = Arc::new(FusedevDaemon {     bti,     id,     supervisor,     threads_cnt,    // 线程数量      state: AtomicI32::new(DaemonState::INIT asi32),     result_receiver: Mutex::new(result_receiver),     request_sender: Arc::new(Mutex::new(trigger)),     service: Arc::new(service),     state_machine_thread: Mutex::new(None),     fuse_service_threads: Mutex::new(Vec::new()), });   

其中,FusedevFsService::new() 函数会调用FuseSession::new函数,创建和内核 FUSE 通信的 session,只是还没有挂载和连接请求。

FuseSession::new() 为外部 fuse-backend-rs[2] creat,对应代码如下:

创建好的 session 实例存储在 FusedevFsService 结构体的 session 属性,同时用 Mutex 包裹,只允许互斥访问。

创建好的service 作为 FusedevDaemon 结构体 service 属性的值,使用 Arc 包裹,允许并发访问。

2.5 nydusd 状态机

machineDaemonStateMachineContext 结构体的实例,存储了 daemon 的 PID,指向 daemon 实例的指针,以及接收请求和返回结果的 channel,用于线程间通信。

let machine = DaemonStateMachineContext::new(daemon.clone(), events_rx, result_sender);   

nydusd 的状态机用于维护 nydusd 的状态,具体的状态转移策略如下:

state_machine! {     derive(Debug, Clone)     pub DaemonStateMachine(Init)     // Init意味着 nydusd 刚启动,可能已经配置好了,     // 但还没有和内核协商双方的能力,也没有尝试通过     // 挂载 /fuse/dev 来建立fuse会话(如果是fusedev后端)     Init => {         Mount => Ready,         Takeover => Ready[Restore],         Stop => Die[StopStateMachine],     },     // Ready表示 nydusd 已经准备就绪,     // Fuse会话被创建。状态可以转换为 Running 或 Die     Ready => {         Start => Running[StartService],         Stop => Die[Umount],         Exit => Die[StopStateMachine],     },     // Running 意味着 nydusd 已经成功地准备好了     // 作为用户空间 fuse 文件系统所需的内容,     // 但是,必要的 capability 协商可能还没有完成,     // 通过 fuse-rs 来判断     Running => {         Stop => Ready [TerminateService],     }, }   

machine.kick_state_machine() 方法用于启动状态机线程。

let machine_thread = machine.kick_state_machine()?;   

该线程的名称为state_machine,通过 top -Hp NYDUSD_PID 可以看到:

nydusd 源码理解(一)

该线程是一个死循环,用于接收来自 channel 消息。(消息从哪发送?)


其中,recv() 函数会阻塞,接收 DaemonStateMachineInput 类型的消息,保存在 event 变量中,self.sm.consume(&event) 方法处理每个 event,完成相应操作,并修改状态为新的值。

处理完成后,通过 result_sender channel 返回状态消息。(传递给谁?)


启动 nydusd 时打印的关于 State machine 的日志信息:

状态机线程接收的消息来自哪里呢?这就需要回到创建 channel的地方:

request_receiver对应的 channel名为trigger,和result_sender对应的channel名为result_receiver,都存储在daemon中:

let daemon = Arc::new(FusedevDaemon {     ...     result_receiver: Mutex::new(result_receiver),     request_sender: Arc::new(Mutex::new(trigger)),     ... });   


impl DaemonStateMachineSubscriber for FusedevDaemon {     fn on_event(&self, event: DaemonStateMachineInput) -> DaemonResult<()> {         self.request_sender             .lock()             .unwrap()             .send(event)             .map_err(|e| DaemonError::Channel(format!("send {:?}", e)))?;          self.result_receiver             .lock()             .expect("Not expect poisoned lock!")             .recv()             .map_err(|e| DaemonError::Channel(format!("recv {:?}", e)))?     } }   

因此,state_machine 通过 channel接收来自nydusd 的消息,从而改变状态,例如,对于stop操作:

2.5.1 FUSE 启动 service

上面提到,state_machine线程会改变nydusd的状态,对于 StartService 事件,会运行 d.start() 方法,并且在运行成功之后通过 set_state(DaemonState::RUNNING) 将 Daemon 的状态设置为 RUNNING。

let r = match action {     Some(a) => match a {         StartService => d.start().map(|r| {             d.set_state(DaemonState::RUNNING);             r         }),         ...     },     _ => Ok(()), };   

不同类型 Daemon 的 d.start() 方法实现不一样,对于 FusedevDaemon,start() 内容如下:

fn start(&self) -> DaemonResult<()> {     info!("start {} fuse servers", self.threads_cnt);     for _ in0..self.threads_cnt {         let waker = DAEMON_CONTROLLER.alloc_waker();         self.kick_one_server(waker)             .map_err(|e| DaemonError::StartService(format!("{:?}", e)))?;     }     Ok(()) }   

这里会根据 threads_cnt,开启对应数量的线程。其中,DAEMON_CONTROLLER.alloc_waker() 只是复制了对 DAEMON_CONTROLLER.waker 的引用。

pubfn alloc_waker(&self) -> Arc<Waker> {     self.waker.clone() }   

kick_one_server(waker)FusedevDaemon 结构体的方法:

fn kick_one_server(&self, waker: Arc<Waker>) -> Result<()> {     letmut s = self.service.create_fuse_server()?;     let inflight_op = self.service.create_inflight_op();     let thread = thread::Builder::new()         .name("fuse_server".to_string())         .spawn(move || {             ifletErr(err) = s.svc_loop(&inflight_op) {                 warn!("fuse server exits with err: {:?}, exiting daemon", err);                 ifletErr(err) = waker.wake() {                     error!("fail to exit daemon, error: {:?}", err);                 }             }             // Notify the daemon controller that one working thread has exited.              Ok(())         })         .map_err(DaemonError::ThreadSpawn)?;      self.fuse_service_threads.lock().unwrap().push(thread);      Ok(()) }   

kick_one_server方法启动了名为 fuse_server 的线程,成功启动的线程存储在 FusedevDaemon.fuse_service_threads 中。

2.5.2 FUSE server 线程(处理 FUSE 请求)

在启动线程前,创建了 fuse serverinflight operatoinscreate_fuse_server() 是 FusedevFsService 结构实现的方法:

fn create_fuse_server(&self) -> Result<FuseServer> {     FuseServer::new(self.server.clone(), self.session.lock().unwrap().deref()) }   

create_fuse_server()方法通过 FuseServer::new()方法进行实例化,传入的参数中,self.server.clone() 是对 server 的引用,self.session.lock().unwrap().deref()session 的去引用实例,方法的返回值是 FuseServer 结构的实例。

fn new(server: Arc<Server<Arc<Vfs>>>, se: &FuseSession) -> Result<FuseServer> {     let ch = se.new_channel().map_err(|e| eother!(e))?;     Ok(FuseServer { server, ch }) }   

创建 FuseServer 结构的实例之前,首先通过 FuseSessionnew_channel() 方法创建 fuse channel,并存储在 FuseServer 实例中。

FuseSession 是 fuse-backend-rs 中的结构,new_channel() 方法用于创建新的 channel:

nydusd 源码理解(一)


create_inflight_op() 方法也是 FusedevFsService 结构实现的方法,返回的 inflight_op 被添加到 FusedevFsService 结构的 inflight_ops中:

fn create_inflight_op(&self) -> FuseOpWrapper {     let inflight_op = FuseOpWrapper::default();      // "Not expected poisoned lock"     self.inflight_ops.lock().unwrap().push(inflight_op.clone());      inflight_op }   

FuseOpWrapper::default() 方法用于对 FuseOpWrapper 初始化,随后被追加到self.inflight_ops中。

创建好fuse serverinflight operatoins之后,启动fuse_server线程。其中,s.svc_loop(&inflight_op) 方法是线程的主要处理逻辑:

fn svc_loop(&mutself, metrics_hook: &dyn MetricsHook) -> Result<()> {         // Given error EBADF, it means kernel has shut down this session.         let _ebadf = Error::from_raw_os_error(libc::EBADF);          loop {             // 通过 channel(epoll)获取 FUSE 请求             ifletSome((reader, writer)) = self.ch.get_request().map_err(|e| {                 warn!("get fuse request failed: {:?}", e);                 Error::from_raw_os_error(libc::EINVAL)             })? {                 ifletErr(e) =                     self.server                         .handle_message(reader, writer.into(), None, Some(metrics_hook))                 {                     match e {                         fuse_backend_rs::Error::EncodeMessage(_ebadf) => {                             returnErr(eio!("fuse session has been shut down"));                         }                         _ => {                             error!("Handling fuse message, {}", DaemonError::ProcessQueue(e));                             continue;                         }                     }                 }             } else {                 info!("fuse server exits");                 break;             }         }          Ok(())     }   

这是一个死循环,self.ch.get_request() 也是 fuse-backend-rs 中 FuseChannel 结构的方法,用于通过 channel 从 fuse 内核模块获取(通过 unix socket fd 进行通信) fuse 请求。

nydusd 源码理解(一)

返回的值包括 readerwriter,作为方法handle_message() 的参数,同时还会传入metrics_hook用于收集数据。self.server.handle_message() 负责处理每个 fuse 请求,也是 fuse-backend-rs 中 Server 实现的方法:

nydusd 源码理解(一)


let res = match in_header.opcode {     x if x == Opcode::Lookup asu32 => self.lookup(ctx),     x if x == Opcode::Forget asu32 => self.forget(ctx), // No reply.     x if x == Opcode::Getattr asu32 => self.getattr(ctx),     x if x == Opcode::Setattr asu32 => self.setattr(ctx),     x if x == Opcode::Readlink asu32 => self.readlink(ctx),     x if x == Opcode::Symlink asu32 => self.symlink(ctx),     x if x == Opcode::Mknod asu32 => self.mknod(ctx),     x if x == Opcode::Mkdir asu32 => self.mkdir(ctx),     x if x == Opcode::Unlink asu32 => self.unlink(ctx),     x if x == Opcode::Rmdir asu32 => self.rmdir(ctx),     x if x == Opcode::Rename asu32 => self.rename(ctx),     x if x == Opcode::Link asu32 => self.link(ctx),     x if x == Opcode::Open asu32 => self.open(ctx),     x if x == Opcode::Read asu32 => self.read(ctx),     x if x == Opcode::Write asu32 => self.write(ctx),     x if x == Opcode::Statfs asu32 => self.statfs(ctx),     x if x == Opcode::Release asu32 => self.release(ctx),     x if x == Opcode::Fsync asu32 => self.fsync(ctx),     x if x == Opcode::Setxattr asu32 => self.setxattr(ctx),     x if x == Opcode::Getxattr asu32 => self.getxattr(ctx),     x if x == Opcode::Listxattr asu32 => self.listxattr(ctx),     x if x == Opcode::Removexattr asu32 => self.removexattr(ctx),     x if x == Opcode::Flush asu32 => self.flush(ctx),     x if x == Opcode::Init asu32 => self.init(ctx),     x if x == Opcode::Opendir asu32 => self.opendir(ctx),     x if x == Opcode::Readdir asu32 => self.readdir(ctx),     x if x == Opcode::Releasedir asu32 => self.releasedir(ctx),     x if x == Opcode::Fsyncdir asu32 => self.fsyncdir(ctx),     x if x == Opcode::Getlk asu32 => self.getlk(ctx),     x if x == Opcode::Setlk asu32 => self.setlk(ctx),     x if x == Opcode::Setlkw asu32 => self.setlkw(ctx),     x if x == Opcode::Access asu32 => self.access(ctx),     x if x == Opcode::Create asu32 => self.create(ctx),     x if x == Opcode::Bmap asu32 => self.bmap(ctx),     x if x == Opcode::Ioctl asu32 => self.ioctl(ctx),     x if x == Opcode::Poll asu32 => self.poll(ctx),     x if x == Opcode::NotifyReply asu32 => self.notify_reply(ctx),     x if x == Opcode::BatchForget asu32 => self.batch_forget(ctx),     x if x == Opcode::Fallocate asu32 => self.fallocate(ctx),     x if x == Opcode::Readdirplus asu32 => self.readdirplus(ctx),     x if x == Opcode::Rename2 asu32 => self.rename2(ctx),     x if x == Opcode::Lseek asu32 => self.lseek(ctx),     #[cfg(feature = "virtiofs")]     x if x == Opcode::SetupMapping asu32 => self.setupmapping(ctx, vu_req),     #[cfg(feature = "virtiofs")]     x if x == Opcode::RemoveMapping asu32 => self.removemapping(ctx, vu_req),     // Group reqeusts don't need reply together     x => match x {         x if x == Opcode::Interrupt asu32 => {             self.interrupt(ctx);             Ok(0)         }         x if x == Opcode::Destroy asu32 => {             self.destroy(ctx);             Ok(0)         }         _ =>ctx.reply_error(io::Error::from_raw_os_error(libc::ENOSYS)),     }, };   


这个fs指的是什么呢?在Server结构体定义中看到,fs是实现了FileSystem + Sync的 trait:

/// Fuse Server to handle requests from the Fuse client and vhost user master. pubstruct Server<F: FileSystem + Sync> {     fs: F,     vers: ArcSwap<ServerVersion>, }   


struct FuseServer {     server: Arc<Server<Arc<Vfs>>>,     ch: FuseChannel, }  impl FuseServer {     fn new(server: Arc<Server<Arc<Vfs>>>, se: &FuseSession) -> Result<FuseServer> {         let ch = se.new_channel().map_err(|e| eother!(e))?;         Ok(FuseServer { server, ch })     }     ... }   


fuse-backend-rs中对 Vfs 实现了 FileSystem trait:

nydusd 源码理解(一)

fuse_server 线程可以通过top -Hp NYDUSD_PID 看到:

nydusd 源码理解(一)


2.5.3 FUSE 终止 service


TerminateService => {     d.interrupt();     let res = d.wait_service();     if res.is_ok() {         d.set_state(DaemonState::READY);     }      res }   

interrupt() 方法:

fn interrupt(&self) {     let session = self         .service         .session         .lock()         .expect("Not expect poisoned lock.");     ifletErr(e) = session.wake().map_err(DaemonError::SessionShutdown) {         error!("stop fuse service thread failed: {:?}", e);     } }   

wait_service() 方法:

fn wait_service(&self) -> DaemonResult<()> {     loop {         let handle = self.fuse_service_threads.lock().unwrap().pop();         ifletSome(handle) = handle {             handle                 .join()                 .map_err(|e| {                     DaemonError::WaitDaemon(                         *e.downcast::<Error>()                             .unwrap_or_else(|e| Box::new(eother!(e))),                     )                 })?                 .map_err(DaemonError::WaitDaemon)?;         } else {             // No more handles to wait             break;         }     }      Ok(()) }   

2.5.4 FUSE Umount 操作

Umount 事件和 TerminateService 事件的操作几乎一样,只是会在执行d.interrupt()之前先断开和 fuse 内核模块的连接:

Umount => d.disconnect().map(|r| {     // Always interrupt fuse service loop after shutdown connection to kernel.     // In case that kernel does not really shutdown the session due to some reasons     // causing service loop keep waiting of `/dev/fuse`.     d.interrupt();     d.wait_service()         .unwrap_or_else(|e| error!("failed to wait service {}", e));     // at least all fuse thread stopped, no matter what error each thread got     d.set_state(DaemonState::STOPPED);     r }),   

断开连接的d.disconnect() 方法:

fn disconnect(&self) -> DaemonResult<()> {     self.service.disconnect() }   

最终调用了session.umount() 方法:

fn disconnect(&self) -> DaemonResult<()> {     let mutsession = self.session.lock().expect("Not expect poisoned lock."); session.umount().map_err(DaemonError::SessionShutdown)?; session.wake().map_err(DaemonError::SessionShutdown)?;     Ok(()) }   

fuse-backend-rs 中umount方法的实现:

/// Destroy a fuse session. pub fnumount(&mutself) -> Result<()> {     ifletSome(file) =self.file.take() {         ifletSome(mountpoint) =self.mountpoint.to_str() {             fuse_kern_umount(mountpoint, file)         } else {             Err(SessionFailure("invalid mountpoint".to_string()))         }     } else {         Ok(())     } }   

此外,还有 Restore 和 StopStateMachine 事件:

Restore => {     let res = d.restore();     if res.is_ok() {         d.set_state(DaemonState::READY);     }     res } StopStateMachine => {     d.set_state(DaemonState::STOPPED);     Ok(()) }   

Daemon 的状态为 STOPPED 时会结束此进程:

if d.get_state() == DaemonState::STOPPED {     break; }   



2.6 Mount FUSE 文件系统

如果不是热升级和 failover 操作,会向 FUSE 内核模块发起 mount 操作请求:

// 1. api_sock 已经存在,但不是热升级操作,也不是 failover // 2. api_sock 不存在 if (api_sock.as_ref().is_some() && !upgrade && !is_crashed(&mnt, api_sock.as_ref().unwrap())?)     || api_sock.is_none() {     ifletSome(cmd) = mount_cmd {         daemon.service.mount(cmd)?;     }     daemon.service.session.lock().unwrap()         .mount()         .map_err(|e| eother!(e))?;     daemon.on_event(DaemonStateMachineInput::Mount)         .map_err(|e| eother!(e))?;     daemon.on_event(DaemonStateMachineInput::Start)         .map_err(|e| eother!(e))?;     daemon.service.conn         .store(calc_fuse_conn(mnt)?, Ordering::Relaxed); }   

如果mount_cmd不为 None,则通过daemon.service.mount(cmd)挂载后端文件系统:

// NOTE: This method is not thread-safe, however, it is acceptable as // mount/umount/remount/restore_mount is invoked from single thread in FSM fn mount(&self, cmd: FsBackendMountCmd) -> DaemonResult<()> {     ifself.backend_from_mountpoint(&cmd.mountpoint)?.is_some() {         returnErr(DaemonError::AlreadyExists);     }     let backend = fs_backend_factory(&cmd)?;     let index = self.get_vfs().mount(backend, &cmd.mountpoint)?;     info!("{} filesystem mounted at {}", &cmd.fs_type, &cmd.mountpoint);     self.backend_collection().add(&cmd.mountpoint, &cmd)?;      // Add mounts opaque to UpgradeManager     ifletSome(mutmgr_guard) = self.upgrade_mgr() {         upgrade::add_mounts_state(&mutmgr_guard, cmd, index)?;     }      Ok(()) }   


backend_from_mountpoint方法调用了Vfsget_rootfs方法,首先得到传入pathinode,然后查看对应inode是否存在mountpoints Hashmap 中:

/// Get the mounted backend file system alongside the path if there's one. pubfn get_rootfs(&self, path: &str) -> VfsResult<Option<Arc<BackFileSystem>>> {     // Serialize mount operations. Do not expect poisoned lock here.     let _guard = self.lock.lock().unwrap();     let inode = matchself.root.path_walk(path).map_err(VfsError::PathWalk)? {         Some(i) => i,         None => returnOk(None),     };      ifletSome(mnt) = self.mountpoints.load().get(&inode) {         Ok(Some(self.get_fs_by_idx(mnt.fs_idx).map_err(|e| {             VfsError::NotFound(format!("fs index {}, {:?}", mnt.fs_idx, e))         })?))     } else {         // Pseudo fs dir inode exists, but that no backend is ever mounted         // is a normal case.         Ok(None)     } }   

然后,通过fs_backend_factory(&cmd)方法获取文件系统后端,该方法的返回值是实现了BackendFileSystem+Sync+Sendtrait 的结构体。


pubenum FsBackendType {     Rafs,     PassthroughFs, }   

2.6.1 初始化 RAFS backend

首先,解析从cmd传入的config内容,并根据传入的bootstrap文件路径,打开用于(从 bootstrap 中)读取文件系统的元数据信息的reader,绑定到bootstrap变量。接下来创建 rafs 实例,传入参数包括配置信息、挂载路径、bootstrap文件对应的reader

FsBackendType::Rafs => {     let rafs_config = RafsConfig::from_str(cmd.config.as_str())?;     let mutbootstrap = <dyn RafsIoRead>::from_file(&cmd.source)?;     let mutrafs = Rafs::new(rafs_config, &cmd.mountpoint, &mutbootstrap)?; rafs.import(bootstrap, prefetch_files)?;     info!("RAFS filesystem imported");     Ok(Box::new(rafs)) }   

通过Rafs::new(rafs_config, &cmd.mountpoint, &mut bootstrap)方法创建 rafs 实例。

首先,准备配置信息storage_conf,并通过传入的conf参数创建RafsSuper实例。创建RafsSuper只是初始化配置信息,包括 RafsMode(有 Direct 和 Cached 两种可选)。接下来,通过sb.load(r)方法从bootstarp加载 RAFS 超级块的信息。RAFS V5 和 V6 两个版本的加载方式不同,try_load_v6方法:

pub(crate) fntry_load_v6(&mutself,r: &mut RafsIoReader) -> Result<bool> {     let end =r.seek_to_end(0)?; r.seek_to_offset(0)?;      // 创建 RAFSV6SuperBlock 实例     let mutsb = RafsV6SuperBlock::new();     // 读取 RAFS V6 的超级块信息     // offset 1024,length 128     ifsb.load(r).is_err() {         returnOk(false);     }     if !sb.is_rafs_v6() {         returnOk(false);     } sb.validate(end)?;     // 设置 RAFS 超级块的 meta 信息 self.meta.version = RAFS_SUPER_VERSION_V6; self.meta.magic =sb.magic(); self.meta.meta_blkaddr =sb.s_meta_blkaddr; self.meta.root_nid =sb.s_root_nid;      // 创建 RafsV6SuperBlockExt 实例     let mutext_sb = RafsV6SuperBlockExt::new();     // 读取 RAFS V6 的扩展超级块信息     // offset 1024 + 128,length 256 ext_sb.load(r)?; ext_sb.validate(end)?;     // 设置 RAFS 超级块的 meta 信息 self.meta.chunk_size =ext_sb.chunk_size(); self.meta.blob_table_offset =ext_sb.blob_table_offset(); self.meta.blob_table_size =ext_sb.blob_table_size(); self.meta.chunk_table_offset =ext_sb.chunk_table_offset(); self.meta.chunk_table_size =ext_sb.chunk_table_size(); self.meta.inodes_count =sb.inodes_count();  self.meta.flags = RafsSuperFlags::from_bits(ext_sb.flags())         .ok_or_else(|| einval!(format!("invalid super flags {:x}",ext_sb.flags())))?;     info!("rafs superblock features: {}",self.meta.flags);      // 设置 RAFS 超级块 meta 中的预取列表信息 self.meta.prefetch_table_entries =ext_sb.prefetch_table_size() / size_of::<u32>() asu32; self.meta.prefetch_table_offset =ext_sb.prefetch_table_offset();     trace!(         "prefetch table offset {} entries {} ", self.meta.prefetch_table_offset, self.meta.prefetch_table_entries     );      matchself.mode {         // 如果 RAFS 模式是 Direct,还需要创建         // DirectSuperBlockV6 实例并读取相关信息         RafsMode::Direct => {             let mutsb_v6 = DirectSuperBlockV6::new(&self.meta); sb_v6.load(r)?; self.superblock = Arc::new(sb_v6);             Ok(true)         }         RafsMode::Cached => Err(enosys!("Rafs v6 does not support cached mode")),     } }   

RAFS 超级块信息加载后,获取blob信息,然后创建rafs实例:

pubfn new(conf: RafsConfig, id: &str,r: &mut RafsIoReader) -> RafsResult<Self> {     let storage_conf = Self::prepare_storage_conf(&conf)?;     let mutsb = RafsSuper::new(&conf).map_err(RafsError::FillSuperblock)?; sb.load(r).map_err(RafsError::FillSuperblock)?;     // 获取 super block 之后,从中获取 blob 信息(BlobInfo)     let blob_infos =sb.superblock.get_blob_infos();     // 根据配置信息和 blobs 信息,遍历每条 blob_info,     // 创建 BlobDevice 的实例     let device =         BlobDevice::new(&storage_conf, &blob_infos).map_err(RafsError::CreateDevice)?;     // 创建 rafs 实例     let rafs = Rafs {         id: id.to_string(),         device,    // BlobDevice         ios: metrics::FsIoStats::new(id),         sb: Arc::new(sb),          initialized: false,    // 还未初始化         digest_validate: conf.digest_validate,         fs_prefetch: conf.fs_prefetch.enable,    // 支持预取         amplify_io: conf.amplify_io,         prefetch_all: conf.fs_prefetch.prefetch_all,         xattr_enabled: conf.enable_xattr,    // 开启 xattr          i_uid: geteuid().into(),    // uid         i_gid: getegid().into(),    // gid         i_time: SystemTime::now()             .duration_since(SystemTime::UNIX_EPOCH)             .unwrap()             .as_secs(),     };      // Rafs v6 does must store chunk info into local file cache. So blob cache is required     if rafs.metadata().is_v6() {         if conf.device.cache.cache_type != "blobcache" {             returnErr(RafsError::Configure(                 "Rafs v6 must have local blobcache configured".to_string(),             ));         }          if conf.digest_validate {             returnErr(RafsError::Configure(                 "Rafs v6 doesn't support integrity validation yet".to_string(),             ));         }     }      rafs.ios.toggle_files_recording(conf.iostats_files);     rafs.ios.toggle_access_pattern(conf.access_pattern);     rafs.ios         .toggle_latest_read_files_recording(conf.latest_read_files);      Ok(rafs) }   

关于 rafs 文件系统(以 v6 为例)元数据在 bootstrap 文件中的分布,在 rafs/src/metadata/layout/v6.rs 中有详细定义:

/// EROFS metadata slot size. pubconst EROFS_INODE_SLOT_SIZE: usize = 1 << EROFS_INODE_SLOT_BITS; /// EROFS logical block size. pubconst EROFS_BLOCK_SIZE: u64 = 1u64 << EROFS_BLOCK_BITS; /// EROFS plain inode. pubconst EROFS_INODE_FLAT_PLAIN: u16 = 0; /// EROFS inline inode. pubconst EROFS_INODE_FLAT_INLINE: u16 = 2; /// EROFS chunked inode. pubconst EROFS_INODE_CHUNK_BASED: u16 = 4; /// EROFS device table offset. pub constEROFS_DEVTABLE_OFFSET: u16 =     EROFS_SUPER_OFFSET + EROFS_SUPER_BLOCK_SIZE + EROFS_EXT_SUPER_BLOCK_SIZE;  pubconst EROFS_I_VERSION_BIT: u16 = 0; pubconst EROFS_I_VERSION_BITS: u16 = 1; pubconst EROFS_I_DATALAYOUT_BITS: u16 = 3;  // Offset of EROFS super block. pub constEROFS_SUPER_OFFSET: u16 = 1024; // Size of EROFS super block. pubconst EROFS_SUPER_BLOCK_SIZE: u16 = 128; // Size of extended super block, used for rafs v6 specific fields const EROFS_EXT_SUPER_BLOCK_SIZE: u16 = 256; // Magic number for EROFS super block. const EROFS_SUPER_MAGIC_V1: u32 = 0xE0F5_E1E2; // Bits of EROFS logical block size. const EROFS_BLOCK_BITS: u8 = 12; // Bits of EROFS metadata slot size. const EROFS_INODE_SLOT_BITS: u8 = 5;   

创建rafs实例后,通过rafs.import(bootstrap, prefetch_files)方法初始化(导入bootstrapprefetch信息):

/// Import an rafs bootstrap to initialize the filesystem instance. pub fnimport(     &mutself,     r: RafsIoReader,     prefetch_files: Option<Vec<PathBuf>>, ) -> RafsResult<()> {     ifself.initialized {         returnErr(RafsError::AlreadyMounted);     }     ifself.fs_prefetch {         // Device should be ready before any prefetch. self.device.start_prefetch(); self.prefetch(r, prefetch_files);     } self.initialized = true;      Ok(()) }   

主要是开启prefetch线程,self.prefetch(r, prefetch_files)方法传入两个参数,r是 bootstrap 文件的 reader,prefetch_files是已经从 bootstrap 读取的预取文件列表:

fn prefetch(&self, reader: RafsIoReader, prefetch_files: Option<Vec<PathBuf>>) {     let sb = self.sb.clone();     let device = self.device.clone();     let prefetch_all = self.prefetch_all;     let root_ino = self.root_ino();      let _ = std::thread::spawn(move || {         Self::do_prefetch(root_ino, reader, prefetch_files, prefetch_all, sb, device);     }); }   


pub fnimport(     &mutself,     r: RafsIoReader,     prefetch_files: Option<Vec<PathBuf>>, ) -> RafsResult<()> {     ifself.initialized {         returnErr(RafsError::AlreadyMounted);     }     ifself.fs_prefetch {         // Device should be ready before any prefetch. self.device.start_prefetch(); self.prefetch(r, prefetch_files);     } self.initialized = true;      Ok(()) }   

self.prefetch(r, prefetch_files)方法中,开启了预取线程:

fn prefetch(&self, reader: RafsIoReader, prefetch_files: Option<Vec<PathBuf>>) {     let sb = self.sb.clone();     let device = self.device.clone();     let prefetch_all = self.prefetch_all;     let root_ino = self.root_ino();      let _ = std::thread::spawn(move || {         Self::do_prefetch(root_ino, reader, prefetch_files, prefetch_all, sb, device);     }); }   

线程中运行do_prefetch方法,按 chunk 粒度进行预取:

fn do_prefetch(     root_ino: u64,     mutreader: RafsIoReader,    // bootstrap 对应的 reader     prefetch_files: Option<Vec<PathBuf>>,     prefetch_all: bool,     sb: Arc<RafsSuper>,     device: BlobDevice, ) {     // First do range based prefetch for rafs v6.     if sb.meta.is_v6() {         // 生成 BlobPrefetchRequest,按 chunk 为粒度的请求         let mutprefetches = Vec::new();          for blob in sb.superblock.get_blob_infos() {             let sz = blob.prefetch_size();             if sz > 0 {                 let mutoffset = 0;                 whileoffset < sz {                     // 按 chunk 为粒度生成请求                     let len = cmp::min(sz -offset, RAFS_DEFAULT_CHUNK_SIZE); prefetches.push(BlobPrefetchRequest {                         blob_id: blob.blob_id().to_owned(),                         offset,                         len,                     }); offset+= len;                 }             }         }         if !prefetches.is_empty() {             // 通过 device 的 prefetch 进行预取             device.prefetch(&[], &prefetches).unwrap_or_else(|e| {                 warn!("Prefetch error, {:?}", e);             });         }     }      let fetcher = |desc: &mut BlobIoVec, last: bool| {         ifdesc.size() asu64 > RAFS_MAX_CHUNK_SIZE             ||desc.len() > 1024             || (last &&desc.size() > 0)         {             trace!(                 "fs prefetch: 0x{:x} bytes for {} descriptors", desc.size(), desc.len()             );             device.prefetch(&[desc], &[]).unwrap_or_else(|e| {                 warn!("Prefetch error, {:?}", e);             }); desc.reset();         }     };      let mutignore_prefetch_all = prefetch_files         .as_ref()         .map(|f| f.len() == 1 && f[0].as_os_str() == "/")         .unwrap_or(false);      // Then do file based prefetch based on:     // - prefetch listed passed in by user     // - or file prefetch list in metadata     let inodes = prefetch_files.map(|files| Self::convert_file_list(&files, &sb));     let res = sb.prefetch_files(&device, &mutreader, root_ino, inodes, &fetcher);     match res {         Ok(true) =>ignore_prefetch_all = true,         Ok(false) => {}         Err(e) => info!("No file to be prefetched {:?}", e),     }      // Last optionally prefetch all data     if prefetch_all && !ignore_prefetch_all {         let root = vec![root_ino];         let res = sb.prefetch_files(&device, &mutreader, root_ino, Some(root), &fetcher);         ifletErr(e) = res {             info!("No file to be prefetched {:?}", e);         }     } }   


/// Try to prefetch specified blob data. pubfn prefetch(     &self,     io_vecs: &[&BlobIoVec],     prefetches: &[BlobPrefetchRequest], ) -> io::Result<()> {     for idx in0..prefetches.len() {         // 根据 blob_id 获取 blob 信息         ifletSome(blob) = self.get_blob_by_id(&prefetches[idx].blob_id) {             // 通过 blob 的 prefetch 方法进行预取             let _ = blob.prefetch(blob.clone(), &prefetches[idx..idx + 1], &[]);         }     }      for io_vec in io_vecs.iter() {         ifletSome(blob) = self.get_blob_by_iovec(io_vec) {             // Prefetch errors are ignored.             let _ = blob                 .prefetch(blob.clone(), &[], &io_vec.bi_vec)                 .map_err(|e| {                     error!("failed to prefetch blob data, {}", e);                 });         }     }      Ok(()) }   

根据 blob_id获取 blob 后,调用prefetch方法:

fn prefetch(     &self,     blob_cache: Arc<dyn BlobCache>,     prefetches: &[BlobPrefetchRequest],     bios: &[BlobIoDesc], ) -> StorageResult<usize> {     // Handle blob prefetch request first, it may help performance.     for req in prefetches {         // 生成异步预取请求消息         let msg = AsyncPrefetchMessage::new_blob_prefetch(             blob_cache.clone(),             req.offset asu64,             req.len asu64,         );         // 将请求消息通过 channel 传递给 worker         let _ = self.workers.send_prefetch_message(msg);     }      // Then handle fs prefetch     let max_comp_size = self.prefetch_batch_size();     let mutbios = bios.to_vec(); bios.sort_by_key(|entry| entry.chunkinfo.compressed_offset());     self.metrics.prefetch_unmerged_chunks.add(bios.len() asu64);     BlobIoMergeState::merge_and_issue(         &bios,         max_comp_size,         max_comp_size asu64 >> RAFS_MERGING_SIZE_TO_GAP_SHIFT,         |req: BlobIoRange| {             // 生成异步预取请求消息             let msg = AsyncPrefetchMessage::new_fs_prefetch(blob_cache.clone(), req);             let _ = self.workers.send_prefetch_message(msg);         },     );      Ok(0) }   


asyncfn handle_prefetch_requests(mgr: Arc<AsyncWorkerMgr>, rt: &Runtime) {     // Max 1 active requests per thread.     mgr.prefetch_sema.add_permits(1);      whileletOk(msg) = mgr.prefetch_channel.recv().await {         mgr.handle_prefetch_rate_limit(&msg).await;         let mgr2 = mgr.clone();          match msg {             AsyncPrefetchMessage::BlobPrefetch(blob_cache, offset, size) => {                 let token = Semaphore::acquire_owned(mgr2.prefetch_sema.clone())                     .await                     .unwrap();                 if blob_cache.is_prefetch_active() {                     rt.spawn_blocking(move || {                         let _ = Self::handle_blob_prefetch_request(                             mgr2.clone(),                             blob_cache,                             offset,                             size,                         );                         drop(token);                     });                 }             }             AsyncPrefetchMessage::FsPrefetch(blob_cache, req) => {                 let token = Semaphore::acquire_owned(mgr2.prefetch_sema.clone())                     .await                     .unwrap();                  if blob_cache.is_prefetch_active() {                     rt.spawn_blocking(move || {                         let _ = Self::handle_fs_prefetch_request(mgr2.clone(), blob_cache, req);                         drop(token)                     });                 }             }             AsyncPrefetchMessage::Ping => {                 let _ = mgr.ping_requests.fetch_add(1, Ordering::Relaxed);             }             AsyncPrefetchMessage::RateLimiter(_size) => {}         }          mgr.prefetch_inflight.fetch_sub(1, Ordering::Relaxed);     } }   

目前,有两种预取的方法:Blob 模式和 Fs 模式。

(1) Blob 模式预取


fn handle_blob_prefetch_request(     mgr: Arc<AsyncWorkerMgr>,     cache: Arc<dyn BlobCache>,     offset: u64,     size: u64, ) -> Result<()> {     trace!(         "storage: prefetch blob {} offset {} size {}",         cache.blob_id(),         offset,         size     );     if size == 0 {         returnOk(());     }     // 获取 blob object     ifletSome(obj) = cache.get_blob_object() {         // 获取 (offset, offset + size) 范围内的内容         ifletErr(e) = obj.fetch_range_compressed(offset, size) {             warn!(                 "storage: failed to prefetch data from blob {}, offset {}, size {}, {}, will try resend",                 cache.blob_id(),                 offset,                 size,                 e             );              ASYNC_RUNTIME.spawn(asyncmove {                 let mutinterval = interval(Duration::from_secs(1)); interval.tick().await;                 // 如果失败,重新发起预取消息                 let msg = AsyncPrefetchMessage::new_blob_prefetch(cache.clone(), offset, size);                 let _ = mgr.send_prefetch_message(msg);             });         }     } else {         warn!("prefetch blob range is not supported");     }      Ok(()) }   

其中,主要的处理函数为obj.fetch_range_compressed(offset, size)

fn fetch_range_compressed(&self, offset: u64, size: u64) -> Result<()> {     let meta = self.meta.as_ref().ok_or_else(|| einval!())?;     let meta = meta.get_blob_meta().ok_or_else(|| einval!())?;     let mutchunks = meta.get_chunks_compressed(offset, size, self.prefetch_batch_size())?;     ifletSome(meta) = self.get_blob_meta_info()? { chunks = self.strip_ready_chunks(meta, None,chunks);     }     ifchunks.is_empty() {         Ok(())     } else {         self.do_fetch_chunks(&chunks, true)     } }   

meta.get_chunks_compressed方法用于获取包含(offset, offset + size)范围的chunk列表:

pubfn get_chunks_compressed(     &self,     start: u64,     size: u64,     batch_size: u64, ) -> Result<Vec<Arc<dyn BlobChunkInfo>>> {     let end = start.checked_add(size).ok_or_else(|| {         einval!(einval!(format!(             "get_chunks_compressed: invalid start {}/size {}",             start, size         )))     })?;     if end > self.state.compressed_size {         returnErr(einval!(format!(             "get_chunks_compressed: invalid end {}/compressed_size {}",             end, self.state.compressed_size         )));     }     let batch_end = if batch_size <= size {         end     } else {         std::cmp::min(             start.checked_add(batch_size).unwrap_or(end),             self.state.compressed_size,         )     };      self.state         .get_chunks_compressed(start, end, batch_end, batch_size) }   


fn _get_chunks_compressed<T: BlobMetaChunkInfo>(     state: &Arc<BlobMetaState>,     chunk_info_array: &[T],     start: u64,     end: u64,     batch_end: u64,     batch_size: u64, ) -> Result<Vec<Arc<dyn BlobChunkInfo>>> {     let mutvec = Vec::with_capacity(512);     let mutindex = Self::_get_chunk_index_nocheck(chunk_info_array, start, true)?;     let entry = Self::get_chunk_entry(state, chunk_info_array,index)?;      // Special handling of ZRan chunks     if entry.is_zran() {         let zran_index = entry.get_zran_index();         let pos = state.zran_info_array[zran_index asusize].in_offset();         let mutzran_last = zran_index;          whileindex > 0 {             let entry = Self::get_chunk_entry(state, chunk_info_array,index - 1)?;             if !entry.is_zran() {                 returnErr(einval!(                     "inconsistent ZRan and non-ZRan chunk information entries"                 ));             } elseif entry.get_zran_index() != zran_index {                 // reach the header chunk associated with the same ZRan context.                 break;             } else { index-= 1;             }         }          let mutvec = Vec::with_capacity(128);         for entry in &chunk_info_array[index..] {             entry.validate(state)?;             if !entry.is_zran() {                 returnErr(einval!(                     "inconsistent ZRan and non-ZRan chunk information entries"                 ));             }             if entry.get_zran_index() !=zran_last {                 let ctx = &state.zran_info_array[entry.get_zran_index() asusize];                 if ctx.in_offset() + ctx.in_size() asu64 - pos > batch_size                     && entry.compressed_offset() > end                 {                     returnOk(vec);                 } zran_last = entry.get_zran_index();             } vec.push(BlobMetaChunk::new(index, state));         }         returnOk(vec);     }  vec.push(BlobMetaChunk::new(index, state));     let mutlast_end = entry.compressed_end();     iflast_end >= batch_end {         Ok(vec)     } else {         whileindex + 1 < chunk_info_array.len() { index+= 1;              let entry = Self::get_chunk_entry(state, chunk_info_array,index)?;             // Avoid read amplify if next chunk is too big.             iflast_end >= end && entry.compressed_end() > batch_end {                 returnOk(vec);             }  vec.push(BlobMetaChunk::new(index, state)); last_end = entry.compressed_end();             iflast_end >= batch_end {                 returnOk(vec);             }         }          Err(einval!(format!(             "entry not found index {} chunk_info_array.len {}", index,             chunk_info_array.len(),         )))     } }   


fn strip_ready_chunks(     &self,     meta: Arc<BlobMetaInfo>,     old_chunks: Option<&[Arc<dyn BlobChunkInfo>]>,     mutextended_chunks: Vec<Arc<dyn BlobChunkInfo>>, ) -> Vec<Arc<dyn BlobChunkInfo>> {     ifself.is_zran {         let mutset = HashSet::new();         for c inextended_chunks.iter() {             if !matches!(self.chunk_map.is_ready(c.as_ref()), Ok(true)) { set.insert(meta.get_zran_index(c.id()));             }         }          let first = old_chunks.as_ref().map(|v| v[0].id()).unwrap_or(u32::MAX);         let mutstart = 0;         whilestart <extended_chunks.len() {             let id =extended_chunks[start].id();             if id == first ||set.contains(&meta.get_zran_index(id)) {                 break;             } start+= 1;         }          let last = old_chunks             .as_ref()             .map(|v| v[v.len() - 1].id())             .unwrap_or(u32::MAX);         let mutend =extended_chunks.len() - 1;         whileend >start {             let id =extended_chunks[end].id();             if id == last ||set.contains(&meta.get_zran_index(id)) {                 break;             } end-= 1;         }          assert!(end >=start);         ifstart == 0 &&end ==extended_chunks.len() - 1 { extended_chunks         } else { extended_chunks[start..=end].to_vec()         }     } else {         while !extended_chunks.is_empty() {             let chunk = &extended_chunks[extended_chunks.len() - 1];             if matches!(self.chunk_map.is_ready(chunk.as_ref()), Ok(true)) { extended_chunks.pop();             } else {                 break;             }         } extended_chunks     } }   

然后,通过self.do_fetch_chunks(&chunks, true)方法获取chunks的数据:

fn do_fetch_chunks(&self, chunks: &[Arc<dyn BlobChunkInfo>], prefetch: bool) -> Result<()> {     // Validate input parameters.     assert!(!chunks.is_empty());     if chunks.len() > 1 {         for idx in0..chunks.len() - 1 {             assert_eq!(chunks[idx].id() + 1, chunks[idx + 1].id());         }     }      // Get chunks not ready yet, also marking them as in-flight.     let bitmap = self         .chunk_map         .as_range_map()         .ok_or_else(|| einval!("invalid chunk_map for do_fetch_chunks()"))?;     let chunk_index = chunks[0].id();     let count = chunks.len() asu32;     let pending = match bitmap.check_range_ready_and_mark_pending(chunk_index, count)? {         None => returnOk(()),         Some(v) => v,     };      let mutstatus = vec![false; count asusize];     let (start_idx, end_idx) = ifself.is_zran {         for chunk_id in pending.iter() { status[(*chunk_id - chunk_index) asusize] = true;         }         (0, pending.len())     } else {         let mutstart = u32::MAX;         let mutend = 0;         for chunk_id in pending.iter() { status[(*chunk_id - chunk_index) asusize] = true; start = std::cmp::min(*chunk_id - chunk_index,start); end = std::cmp::max(*chunk_id - chunk_index,end);         }         ifend <start {             returnOk(());         }         (start asusize,end asusize)     };      let start_chunk = &chunks[start_idx];     let end_chunk = &chunks[end_idx];     let (blob_offset, blob_end, blob_size) =         self.get_blob_range(&chunks[start_idx..=end_idx])?;     trace!(         "fetch data range {:x}-{:x} for chunk {}-{} from blob {:x}",         blob_offset,         blob_end,         start_chunk.id(),         end_chunk.id(),         chunks[0].blob_index()     );      // 从 backend 读取数据     matchself.read_chunks_from_backend(         blob_offset,         blob_size,         &chunks[start_idx..=end_idx],         prefetch,     ) {         Ok(mutbufs) => {             ifself.is_compressed {                 let res =                     Self::persist_cached_data(&self.file, blob_offset,bufs.compressed_buf());                 for idx in start_idx..=end_idx {                     ifstatus[idx] {                         self.update_chunk_pending_status(chunks[idx].as_ref(), res.is_ok());                     }                 }             } else {                 for idx in start_idx..=end_idx {                     let mutbuf = matchbufs.next() {                         None => returnErr(einval!("invalid chunk decompressed status")),                         Some(Err(e)) => {                             for idx in idx..=end_idx {                                 ifstatus[idx] {                                     bitmap.clear_range_pending(chunks[idx].id(), 1)                                 }                             }                             returnErr(e);                         }                         Some(Ok(v)) => v,                     };                      ifstatus[idx] {                         ifself.dio_enabled {                             self.adjust_buffer_for_dio(&mutbuf)                         }                         self.persist_chunk_data(chunks[idx].as_ref(),buf.as_ref());                     }                 }             }         }         Err(e) => {             for idx in0..chunks.len() {                 ifstatus[idx] {                     bitmap.clear_range_pending(chunks[idx].id(), 1)                 }             }             returnErr(e);         }     }      if !bitmap.wait_for_range_ready(chunk_index, count)? {         if prefetch {             returnErr(eio!("failed to read data from storage backend"));         }          // if we are in on-demand path, retry for the timeout chunks         for chunk in chunks {             matchself.chunk_map.check_ready_and_mark_pending(chunk.as_ref()) {                 Err(e) => returnErr(eio!(format!("do_fetch_chunks failed, {:?}", e))),                 Ok(true) => {}                 Ok(false) => {                     info!("retry for timeout chunk, {}", chunk.id());                     let mutbuf = alloc_buf(chunk.uncompressed_size() asusize);                     self.read_chunk_from_backend(chunk.as_ref(), &mutbuf)                         .map_err(|e| {                             self.update_chunk_pending_status(chunk.as_ref(), false);                             eio!(format!("read_raw_chunk failed, {:?}", e))                         })?;                     ifself.dio_enabled {                         self.adjust_buffer_for_dio(&mutbuf)                     }                     self.persist_chunk_data(chunk.as_ref(), &buf);                 }             }         }     }      Ok(()) }   

其中self.read_chunks_from_backend方法实现从 backend 读取数据:

fn read_chunks_from_backend<'a, 'b>(     &'aself,     blob_offset: u64,     blob_size: usize,     chunks: &'b [Arc<dyn BlobChunkInfo>],     prefetch: bool, ) -> Result<ChunkDecompressState<'a, 'b>> where     Self: Sized, {     // Read requested data from the backend by altogether.     let mutc_buf = alloc_buf(blob_size);     let start = Instant::now();     let nr_read = self         .reader()         .read(c_buf.as_mut_slice(), blob_offset)         .map_err(|e| eio!(e))?;     if nr_read != blob_size {         returnErr(eio!(format!(             "request for {} bytes but got {} bytes",             blob_size, nr_read         )));     }     let duration = Instant::now().duration_since(start).as_millis();     debug!(         "read_chunks_from_backend: {} {} {} bytes at {}, duration {}ms",         std::thread::current().name().unwrap_or_default(),         if prefetch { "prefetch" } else { "fetch" },         blob_size,         blob_offset,         duration     );      let chunks = chunks.iter().map(|v| v.as_ref()).collect();     Ok(ChunkDecompressState::new(blob_offset, self, chunks,c_buf)) }   

self.reader().read方法是对 backend 的抽象,每个请求失败后会重试retry_count次:

fn read(&self,buf: &mut [u8], offset: u64) -> BackendResult<usize> {     let mutretry_count = self.retry_limit();     let begin_time = self.metrics().begin();      loop {         matchself.try_read(buf, offset) {             Ok(size) => {                 self.metrics().end(&begin_time,buf.len(), false);                 returnOk(size);             }             Err(err) => {                 ifretry_count > 0 {                     warn!(                         "Read from backend failed: {:?}, retry count {}",                         err,retry_count                     ); retry_count-= 1;                 } else {                     self.metrics().end(&begin_time,buf.len(), true);                     ERROR_HOLDER                         .lock()                         .unwrap()                         .push(&format!("{:?}", err))                         .unwrap_or_else(|_| error!("Failed when try to hold error"));                     returnErr(err);                 }             }         }     } }   

不同 backend 的try_read方法实现不同,目前,nydus分别实现了localfsregistryOSS三种 backend。

(2) Fs 模式预取


fn handle_fs_prefetch_request(     mgr: Arc<AsyncWorkerMgr>,     cache: Arc<dyn BlobCache>,     req: BlobIoRange, ) -> Result<()> {     let blob_offset = req.blob_offset;     let blob_size = req.blob_size;     trace!(         "storage: prefetch fs data from blob {} offset {} size {}",         cache.blob_id(),         blob_offset,         blob_size     );     if blob_size == 0 {         returnOk(());     }      // Record how much prefetch data is requested from storage backend.     // So the average backend merged request size will be prefetch_data_amount/prefetch_mr_count.     // We can measure merging possibility by this.     mgr.metrics.prefetch_mr_count.inc();     mgr.metrics.prefetch_data_amount.add(blob_size);      ifletSome(obj) = cache.get_blob_object() {         obj.prefetch_chunks(&req)?;     } else {         cache.prefetch_range(&req)?;     }      Ok(()) }   

Fs 模式的预取有两种情况,(1)如果有缓存的blob时:

fn prefetch_chunks(&self, range: &BlobIoRange) -> Result<()> {     let chunks_extended;     let mutchunks = &range.chunks;     ifletSome(v) = self.extend_pending_chunks(chunks, self.prefetch_batch_size())? {         chunks_extended = v; chunks = &chunks_extended;     }      let mutstart = 0;     whilestart <chunks.len() {         // Figure out the range with continuous chunk ids, be careful that `end` is inclusive.         let mutend =start;         whileend <chunks.len() - 1 &&chunks[end + 1].id() ==chunks[end].id() + 1 { end+= 1;         }         self.do_fetch_chunks(&chunks[start..=end], true)?; start =end + 1;     }      Ok(()) }   

准备好chunks后,也是调用了do_fetch_chunks方法,和 Blob 模式相同。


fn prefetch_range(&self, range: &BlobIoRange) -> Result<usize> {     let mutpending = Vec::with_capacity(range.chunks.len());     if !self.chunk_map.is_persist() {         let mutd_size = 0;         for c in range.chunks.iter() { d_size = std::cmp::max(d_size, c.uncompressed_size() asusize);         }         let mutbuf = alloc_buf(d_size);          for c in range.chunks.iter() {             ifletOk(true) = self.chunk_map.check_ready_and_mark_pending(c.as_ref()) {                 // The chunk is ready, so skip it.                 continue;             }              // For digested chunk map, we must check whether the cached data is valid because             // the digested chunk map cannot persist readiness state.             let d_size = c.uncompressed_size() asusize;             matchself.read_file_cache(c.as_ref(), &mutbuf[0..d_size]) {                 // The cached data is valid, set the chunk as ready.                 Ok(_v) => self.update_chunk_pending_status(c.as_ref(), true),                 // The cached data is invalid, queue the chunk for reading from backend.                 Err(_e) =>pending.push(c.clone()),             }         }     } else {         for c in range.chunks.iter() {             ifletOk(true) = self.chunk_map.check_ready_and_mark_pending(c.as_ref()) {                 // The chunk is ready, so skip it.                 continue;             } else { pending.push(c.clone());             }         }     }      let muttotal_size = 0;     let mutstart = 0;     whilestart <pending.len() {         // Figure out the range with continuous chunk ids, be careful that `end` is inclusive.         let mutend =start;         whileend <pending.len() - 1 &&pending[end + 1].id() ==pending[end].id() + 1 { end+= 1;         }          let (blob_offset, _blob_end, blob_size) = self.get_blob_range(&pending[start..=end])?;         matchself.read_chunks_from_backend(blob_offset, blob_size, &pending[start..=end], true)         {             Ok(mutbufs) => { total_size+= blob_size;                 ifself.is_compressed {                     let res = Self::persist_cached_data(                         &self.file,                         blob_offset, bufs.compressed_buf(),                     );                     for c inpending.iter().take(end + 1).skip(start) {                         self.update_chunk_pending_status(c.as_ref(), res.is_ok());                     }                 } else {                     for idx instart..=end {                         let buf = matchbufs.next() {                             None => returnErr(einval!("invalid chunk decompressed status")),                             Some(Err(e)) => {                                 forchunk in &mutpending[idx..=end] {                                     self.update_chunk_pending_status(chunk.as_ref(), false);                                 }                                 returnErr(e);                             }                             Some(Ok(v)) => v,                         };                         self.persist_chunk_data(pending[idx].as_ref(), &buf);                     }                 }             }             Err(_e) => {                 // Clear the pending flag for all chunks in processing.                 forchunk in &mutpending[start..=end] {                     self.update_chunk_pending_status(chunk.as_ref(), false);                 }             }         }  start =end + 1;     }      Ok(total_size) }   

明确需要获取的数据 range 后,直接调用read_chunks_from_backend从 backend 读取内容。

2.6.2 初始化 PassthroughFs backend

创建 fs 配置信息实例,根据配置信息创建 PassthroughFs 实例:

let fs_cfg = Config {     root_dir: cmd.source.to_string(),     do_import: false,     writeback: true,     no_open: true,     xattr: true,     ..Default::default() }; // TODO: Passthrough Fs needs to enlarge rlimit against host. We can exploit `MountCmd` // `config` field to pass such a configuration into here. let passthrough_fs =     PassthroughFs::<()>::new(fs_cfg).map_err(DaemonError::PassthroughFs)?; passthrough_fs     .import()     .map_err(DaemonError::PassthroughFs)?; info!("PassthroughFs imported"); Ok(Box::new(passthrough_fs))   

创建 PassthroughFs 实例:

/// Create a Passthrough file system instance. pubfn new(cfg: Config) -> io::Result<PassthroughFs<S>> {     // Safe because this is a constant value and a valid C string.     let proc_self_fd_cstr = unsafe { CStr::from_bytes_with_nul_unchecked(PROC_SELF_FD_CSTR) };     // 打开 /proc/self/fd 文件     let proc_self_fd = Self::open_file(         libc::AT_FDCWD,         proc_self_fd_cstr,         libc::O_PATH | libc::O_NOFOLLOW | libc::O_CLOEXEC,         0,     )?;      Ok(PassthroughFs {         inode_map: InodeMap::new(),         next_inode: AtomicU64::new(fuse::ROOT_ID + 1),          handle_map: HandleMap::new(),         next_handle: AtomicU64::new(1),         mount_fds: MountFds::new(),          proc_self_fd,          writeback: AtomicBool::new(false),         no_open: AtomicBool::new(false),         no_opendir: AtomicBool::new(false),         killpriv_v2: AtomicBool::new(false),         no_readdir: AtomicBool::new(cfg.no_readdir),         perfile_dax: AtomicBool::new(false),         cfg,          phantom: PhantomData,     }) }   

passthrough_fs.import() 初始化文件系统。

/// Initialize the Passthrough file system. pubfn import(&self) -> io::Result<()> {     let root = CString::new(self.cfg.root_dir.as_str()).expect("CString::new failed");      let (file_or_handle, st, ids_altkey, handle_altkey) = Self::open_file_or_handle(         self.cfg.inode_file_handles,         libc::AT_FDCWD,         &root,         &self.mount_fds,         |fd, flags, _mode| {             let pathname = CString::new(format!("{}", fd))                 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;             Self::open_file(self.proc_self_fd.as_raw_fd(), &pathname, flags, 0)         },     )     .map_err(|e| {         error!("fuse: import: failed to get file or handle: {:?}", e);         e     })?;      // Safe because this doesn't modify any memory and there is no need to check the return     // value because this system call always succeeds. We need to clear the umask here because     // we want the client to be able to set all the bits in the mode.     unsafe { libc::umask(0o000) };      // Not sure why the root inode gets a refcount of 2 but that's what libfuse does.     self.inode_map.insert(         fuse::ROOT_ID,         InodeData::new(             fuse::ROOT_ID,             file_or_handle,             2,             ids_altkey,             st.get_stat().st_mode,         ),         ids_altkey,         handle_altkey,     );      Ok(()) }   

初始化 backend 文件系统完成。

回到daemon.service.mount(cmd)方法。接下来,通过self.get_vfs().mount(backend, &cmd.mountpoint)方法挂载 backend 文件系统:

/// Mount a backend file system to path pubfn mount(&self, fs: BackFileSystem, path: &str) -> VfsResult<VfsIndex> {     let (entry, ino) = fs.mount().map_err(VfsError::Mount)?;     if ino > VFS_MAX_INO {         fs.destroy();         returnErr(VfsError::InodeIndex(format!(             "Unsupported max inode number, requested {} supported {}",             ino, VFS_MAX_INO         )));     }      // Serialize mount operations. Do not expect poisoned lock here.     let _guard = self.lock.lock().unwrap();     ifself.initialized() {         let opts = self.opts.load().deref().out_opts;         fs.init(opts).map_err(|e| {             VfsError::Initialize(format!("Can't initialize with opts {:?}, {:?}", opts, e))         })?;     }     let index = self.allocate_fs_idx().map_err(VfsError::FsIndex)?;     self.insert_mount_locked(fs, entry, index, path)         .map_err(VfsError::Mount)?;      Ok(index) }   

首先,通过fs.mount()方法获取 backend 文件系统root inodeentry和最大的inode,对于 RAFS:

impl BackendFileSystem for Rafs {     fn mount(&self) -> Result<(Entry, u64)> {         let root_inode = self.sb.get_inode(self.root_ino(), self.digest_validate)?;         self.ios.new_file_counter(root_inode.ino());         let e = self.get_inode_entry(root_inode);         // e 为 root inode 的 entry,第二个参数是支持的最大 inode 值         Ok((e, self.sb.get_max_ino()))     }     ... }   


由于nydus通过index区分不同的pseudofs文件系统(具体来说,长度为 64 位的 inode 中前 8 位),因此,最多可以有 256 个pseudofs文件系统。

接下来,通过self.insert_mount_locked(fs, entry, index, path)方法挂载path,并且将index和新建pseudofsentry关联起来:

fn insert_mount_locked(     &self,     fs: BackFileSystem,     mutentry: Entry,     fs_idx: VfsIndex,     path: &str, ) -> Result<()> {     // The visibility of mountpoints and superblocks:     // superblock should be committed first because it won't be accessed until     // a lookup returns a cross mountpoint inode.     let mutsuperblocks = self.superblocks.load().deref().deref().clone();     let mutmountpoints = self.mountpoints.load().deref().deref().clone();     // 挂载 path,得到 inode     let inode = self.root.mount(path)?;     let real_root_ino =entry.inode;      // 根据 index 对 inodes 进行 hash entry.inode = self.convert_inode(fs_idx,entry.inode)?;      // 如果已经存在 mountpoint,先设置为 None     // Over mount would invalidate previous superblock inodes.     ifletSome(mnt) =mountpoints.get(&inode) { superblocks[mnt.fs_idx asusize] = None;     } superblocks[fs_idx asusize] = Some(Arc::new(fs));     self.superblocks.store(Arc::new(superblocks));     trace!("fs_idx {} inode {}", fs_idx, inode);      let mountpoint = Arc::new(MountPointData {         fs_idx,         ino: real_root_ino,         root_entry:entry,         _path: path.to_string(),     });     // 将新的 mount 添加到 self.mountpoints mountpoints.insert(inode, mountpoint);     self.mountpoints.store(Arc::new(mountpoints));      Ok(()) }   


// mount creates path walk nodes all the way from root // to @path, and returns pseudo fs inode number for the path pubfn mount(&self, mountpoint: &str) -> Result<u64> {     let path = Path::new(mountpoint);     if !path.has_root() {         error!("pseudo fs mount failure: invalid mount path {}", mountpoint);         returnErr(Error::from_raw_os_error(libc::EINVAL));     }      letmut inodes = self.inodes.load();     letmut inode = &self.root_inode;      'outer: for component in path.components() {         trace!("pseudo fs mount iterate {:?}", component.as_os_str());         match component {             Component::RootDir => continue,             Component::CurDir => continue,             Component::ParentDir => inode = inodes.get(&inode.parent).unwrap(),             Component::Prefix(_) => {                 error!("unsupported path: {}", mountpoint);                 returnErr(Error::from_raw_os_error(libc::EINVAL));             }             Component::Normal(path) => {                 let name = path.to_str().unwrap();                  // Optimistic check without lock.                 for child in inode.children.load().iter() {                     if child.name == name {                         inode = inodes.get(&child.ino).unwrap();                         continue'outer;                     }                 }                 ...                 // 没找到对应 name 的 node,新建                 let new_node = self.create_inode(name, inode);                 inodes = self.inodes.load();                 inode = inodes.get(&new_node.ino).unwrap();             }         }     }      // Now we have all path components exist, return the last one     Ok(inode.ino) }   

self.convert_inode(fs_idx, entry.inode)方法将pseudofs的 inode 根据 index 进行偏移,避免多个pseudofs的 inode 相同:

// 1. Pseudo fs 的根 inode 不进行 hash // 2. 由于 Index 总是大于 0,因此 pseudo fs 的 inodes 不受影响(也会进行 hash) // 3. 其它 inodes通过 (index << 56 | inode) 进行 hash fn convert_inode(&self, fs_idx: VfsIndex, inode: u64) -> Result<u64> {     // Do not hash negative dentry     if inode == 0 {         returnOk(inode);     }     if inode > VFS_MAX_INO {         returnErr(Error::new(             ErrorKind::Other,             format!(                 "Inode number {} too large, max supported {}",                 inode, VFS_MAX_INO             ),         ));     }     let ino: u64 = ((fs_idx asu64) << VFS_INDEX_SHIFT) | inode;     trace!(         "fuse: vfs fs_idx {} inode {} fuse ino {:#x}",         fs_idx,         inode,         ino     );     Ok(ino) }   

挂载 backend 文件系统结束。

根据mount_cmd准备好文件系统后端(例如,RAFS backend),接下来通过 FUSE 进行挂载。daemon.service.session.lock().unwrap().mount()函数是fuse-backend-rsFuseSession结构体的方法:

nydusd 源码理解(一)

fuse_kern_mount方法中,准备好需要的参数后,会调用nix crate 中的mount方法,这个方法最终调用了libc中的mount函数:

nydusd 源码理解(一)


nydusd 源码理解(一)


StartService => d.start().map(|r| {     d.set_state(DaemonState::RUNNING);     r }),   

nydusd 在运行期间有 8 个线程,到目前为止,我们已经启动了其中的 6 个线程(fuse_server 的数量可以配置),接下来,还要启动两个线程 nydus-http-server 和 api-server。

最后,获取挂载点的 major 和 minor 信息,存储在元数据中。

create_fuse_daemon() 方法执行完成后,如果成功会打印如下日志信息:

nydusd 源码理解(一)


