Blog Posts

2023-12-17

Build filesystems with Python and libfuse

We have automated archival to tape at ETH, and automated retrieval from it. This is very useful for our biology research groups who can collect petabytes of data each year, so we like to integrate it into our data management platforms.

Unfortunately, while the interface is simple and intuitive for human users, it is less ideal for automation.

How we interact with our tape archiver

To archive a file, we simply copy it to a special NAS linked to the tape server. Once the file has been written, it copies it to a replica and schedules both to be sent by the robot to tape. Once archived, it truncates the files on the disk and sets the sticky bit on the file permissions. To retrieve it, we simply read the file. The read will get an I/O error until the robot has fetched the file from tape, after which the sticky bit will be reset and the read will execute normally.

Arching to tape and unarchiving again

Easy to automate, I hear you say. Yes, but not so easy to mock. As I don't want to write a file to tape every debugging cycle, I need a mock server with the same interface as the tape drive. That means intercepting reads and writes.

Mocking our tape archiver

At first I tried this with fuser on the writing side - a command that returns the PID of processes that have a named file open - and named pipes on the read side. This quickly became a mess. Then I discovered FUSE and it's C implementation, libfuse.

libfuse is a C library for building filesystems. The idea is you write the low-level file operations: open(), release() (ie close), read(), write(), etc. libfuse takes care of the rest. It mounts your filesystem at a location you choose. To other applications, it behaves like any other filesystem. libfuse can also transparently do multithreading (though you do have to take care of concurrent access in your code).

Python

I like writing tools in Python. There are a couple of libfuse Python bindings. The official one in python-fuse. There is also fusepy. Now here's the catch. I didn't find python-fuse very intuitive. fusepy is a lot more intuitive but - it hasn't been updated in 5 years. The README page says it has new maintainers, but that README was last updated in 2012 so interpret that how you will.

However, I like fusepy, and it works. So for now that's what I'm using.

Mocking our tape archive

Our archive to tape is triggered by writing a file to a special folder, which I am mocking with my FUSE filesystem. Then it copies it to a second, replica directory, sends both files to tape, truncates the files on disk and sets the sticky bit on the file stub in the replica folder. Of course, it can't do any of that until the file has finished being written to. So to mock that, I put my code in the release() function (for some reason, libfuse does not call this close() but that's what it is for). Here is the Python code:

from fuse import FUSE, FuseOSError, Operations, LoggingMixIn
# ...
def release(self, path, fh):
    statusbits = fcntl.fcntl(fh, fcntl.F_GETFL)
    ret =  os.close(fh)
    if (statusbits & 1 == 1): # if the file was open for writing not reading
        filename = os.path.basename(path)
        thread = threading.Thread(target=self.archiver, 
                                  name="Archive_"+filename,
                                  args=(filename,))
        thread.start()

In the thread that it starts, I copy the file to another folder (the replica) and set the sticky bit when finished:

def archiver(self, filename: str):
    logging.info("Archive " + filename)
    final_file = os.path.join(self.root, filename)
    replica_file = os.path.join(self.replica_dir, filename)
    shutil.copyfile(final_file, replica_file)
    if (self.archive_delay_secs > 0):
        time.sleep(self.archive_delay_secs)
    os.chmod(replica_file, 0o1644)   
    logging.info("Archived " + filename)

In our perhaps counterintuitive nomenclature, final is the name of the directory the file is written to in order to trigger the archiving. In my implementation, it is a separate directory which I actually write the file to - because I need to be able to 'unarchive' it later.

I do this in a separeate thread because our files can be big. I want to test performance under load so I need concurrent writes.

My write() just does a normal write, with locking:

from filelock import FileLock
# ...
def write(self, path, data, offset, fh):
    with FileLock(path+".lock"):
        os.lseek(fh, offset, 0)
        return os.write(fh, data)

How about unarchiving? Intuition says I should put my code in the read() function. However, the filesystem is optimised and if the file is cached, a read won't actually cause a read from disk. I could turn this off, but even if it is cached the file is still opened, just not read. In other words open() is always called even if read() isn't. So my code goes there:

def open(self, path, flags):
    fd = os.open(path, flags)
    statusbits = fcntl.fcntl(fd, fcntl.F_GETFL)
    basename = os.path.basename(path)
    replica_file = os.path.join(self.replica_dir, basename)
    if (statusbits & 1 == 0 and os.path.isfile(replica_file)): 
        if (os.stat(replica_file).st_mode & 0o1000 == 0o1000):
            # open for reading and file has been archived
            if (basename not in self.unarchiving_files):
                # archiving has not started - start it
                thread = threading.Thread(target=self.unarchiver, 
                                          name="Unarchive_"+basename,
                                          args=(basename,))
                thread.start()
                raise FuseOSError(EIO) # send the user an I/O error
            elif basename in (self.unarchiving_files):
                # archiving has started but not finished
                raise FuseOSError(EIO)
    return fd

Again, I do the actual work in a separate thread. Until that finishes, I send the user an I/O error, just like the tape does.

My unarchiving thread does this:

def unarchiver(self, filename : str):
    logging.info("Unarchive " + filename)
    replica_file = os.path.join(self.replica_dir, filename)
    self.unarchiving_files.add(filename)
    if (self.archive_delay_secs > 0):
        time.sleep(self.archive_delay_secs)
    os.chmod(replica_file, 0o644)   
    self.unarchiving_files.remove(filename)
    logging.info("Unarchived " + filename)

Testing error states

Our data management system in which we are developing this process does all the proper error checking like making sure the checksum after retrieval matches that of the original file. The nice thing about having a funtional mock is that I can simulate error states and unit test how our data management system handles them.

So that I can put this in a unit test, my mock archiver also has an HTTP interface where error can be activated and deactivated during tests. This is implemented with FastAPI, a Python web backend framework I have quickly come to love.

The code

This is still work-in-progress so I cannot share the full code yet. It will be open sourced though and, when it is in a publishable state, it will got on Github (where the data management platform already is).

Demo

The following video shows the mock archiver in action. The top window is running it. In the bottom window, I create a file, just using

echo "one" > final/myfile

I then read it back with

cat final/myfile

This triggers the unarchival, and I get an I/O error until the file has been retrieved, after which the contents are returned.

Summary

This blog decribed libfuse and fusepy, a C library for building filesystems, and Python bindings for it. I use this for mocking our tape archival system - intercepting file open and close.

Matt Baker - technology and scientific IT blog