#!/usr/bin/env python # coding: utf-8 # # RSync with IPython Parallel # # Experiment based on [this issue discussion](https://github.com/ipython/ipyparallel/issues/209#issuecomment-263567055). # # This is a prototype, and doesn't do everything you might expect. # If actual rsync is available to you, you should probably use it. # # Shortcomings: # # - only considers one engine's state when deciding what to send (assumes engines are in sync with each other) # - requires computing full remote file list before sending, rather than incremental sends # - only uses mtime, no support for other methods of deciding what to send (like checksums, force, etc.) # - no support for `--delete`; files are only added or modified # In[71]: import ipyparallel as ipp rc = ipp.Client() dview = rc[:] # In[255]: import os import io from tornado.gen import coroutine def full_listing(path): """Compute the full listing on a path Called on engines and used to decide what to send. """ import os listing = {} for dirpath, dirnames, filenames in os.walk(path): relpath = os.path.relpath(dirpath, path) if relpath == '.': relpath = '' listing[relpath] = { 'kind': 'dir', 'dirnames': dirnames, 'filenames': filenames, } for fname in filenames: file_path = os.path.join(dirpath, fname) listing[os.path.join(relpath, fname)] = { 'kind': 'file', 'mtime': os.stat(file_path).st_mtime, } return listing @coroutine def send_file(dview, local_path, remote_path): """Send a single file""" print("sending file", local_path, remote_path) def _recv(remote_path, bdata, mtime): import io, os if os.path.exists(remote_path): os.unlink(remote_path) with io.open(remote_path, 'wb') as f: f.write(data) os.utime(remote_path, (mtime, mtime)) st = os.stat(local_path) with io.open(local_path, 'rb') as f: data = f.read() yield dview.apply_async(_recv, remote_path, data, st.st_mtime) @coroutine def send_dir(dview, local_path, remote_path): """Send a full directory recursively to the engines Assumes that the remote destination does not exist. """ print("sending dir", local_path, remote_path) def _recv_dirs(remote_path, subdirs): import os if os.path.isfile(remote_path): os.unlink(remote_path) if os.path.isdir(remote_path): return os.mkdir(remote_path) for subdir in subdirs: os.mkdir(os.path.join(remote_path, subdir)) rel_dirs = [] rel_files = [] for dirpath, dirnames, filenames in os.walk(local_path): rel_dir = os.path.relpath(dirpath, local_path) if rel_dir == '.': rel_dir = '' else: rel_dirs.append(rel_dir) rel_files.extend(os.path.join(rel_dir, f) for f in filenames) # send directory structure yield dview.apply_async(_recv_dirs, remote_path, rel_dirs) futures = [] # send files for rel_file in rel_files: local_fname = os.path.abspath(os.path.join(local_path, rel_file)) remote_fname = os.path.abspath(os.path.join(remote_path, rel_file)) futures.append(send_file(dview, local_fname, remote_fname)) for f in futures: yield f @coroutine def rsync(dview, local_path, remote_path, prefix='', remote_listing=None): """Sync local directory to the remote directory Uses mtime on files to determine if files should be sent. Files are """ local_path = os.path.abspath(local_path) if remote_listing is None: remote_listings = yield dview.apply_async(full_listing, remote_path) # FIXME: only consider first engine's state for syncing remote_listing = remote_listings[0] if not remote_listing: result = yield send_dir(dview, local_path, remote_path) return result futures = [] for entry in os.scandir(local_path): rel_path = os.path.relpath(entry.path, local_path) remote_dest = os.path.join(remote_path, rel_path) key = os.path.join(prefix, rel_path) f = None if key not in remote_listing: # doesn't exist, send it if entry.is_dir(): f = send_dir(dview, entry.path, remote_dest) else: f = send_file(dview, entry.path, remote_dest) else: # does exist remote_entry = remote_listing[key] if entry.is_dir(): # it's a directory, recursive rsync if remote_entry['kind'] == 'dir': f = rsync(dview, entry.path, remote_dest, prefix=os.path.join(prefix, entry.name), remote_listing=remote_listing, ) else: f = send_dir(dview, entry.path, remote_dest) else: # it's a file, compare mtime st = entry.stat() if st.st_mtime > remote_entry['mtime']: f = send_file(dview, entry.path, remote_dest) else: pass if f: futures.append(f) # await futures together for f in futures: yield f # In[256]: from tornado.ioloop import IOLoop IOLoop().run_sync(lambda : rsync(dview, '.', '/tmp/dest'))