Skip to content

Reference

Functions aiding and orchestrating the execution and validation of the weave workflow.

File helper functions

get_all_seq_dirs(top_dir, server)

Gather and return all sequencing directories from the top_dir. This is tightly coupled at the moment to the directory that is on RML-BigSky. In the future will need to the take a look at how to do this more generally

Source code in scripts/files.py
def get_all_seq_dirs(top_dir, server):
    """
        Gather and return all sequencing directories from the `top_dir`. 
        This is tightly coupled at the moment to the directory that is on RML-BigSky.
        In the future will need to the take a look at how to do this more generally
    """
    if isinstance(top_dir, str): top_dir = Path(top_dir)
    _dirs = []
    for _file in top_dir.glob('*'):
        if _file.is_dir():
            for _file2 in _file.glob('*'):
                if _file2.is_dir() and check_access(_file2, R_OK):
                    _dirs.append(_file2.resolve())
    # check if directory is processed or not
    return _dirs

is_dir_staged(server, run_dir)

filter check for wheter or not a directory has the appropriate breadcrumbs or not

CopyComplete.txt - file transfer from instrument breadcrumb, blank (won't be there on instruments != NextSeq2k)

RTAComplete.txt - sequencing breadcrumb, CSV file with values: Run Date, Run time, Instrument ID

RunInfo.xml - XML metainformation (RunID, Tiles, etc)

Source code in scripts/files.py
def is_dir_staged(server, run_dir):
    """
        filter check for wheter or not a directory has the appropriate breadcrumbs or not

        CopyComplete.txt - file transfer from instrument breadcrumb, blank (won't be there on instruments != NextSeq2k)

        RTAComplete.txt - sequencing breadcrumb, CSV file with values:
            Run Date, Run time, Instrument ID

        RunInfo.xml - XML metainformation (RunID, Tiles, etc)
    """
    analyzed_checks = [
        Path(run_dir, 'RTAComplete.txt').exists(),
        Path(run_dir, 'SampleSheet.csv').exists(),
        Path(run_dir, 'RunInfo.xml').exists(),
    ]
    return all(analyzed_checks)

parse_samplesheet(ss)

Parse the sample sheet into data structure

Source code in scripts/files.py
def parse_samplesheet(ss):
    """
        Parse the sample sheet into data structure
    """
    parser = sniff_samplesheet(ss)
    return parser(ss)

runid2samplesheet(runid, top_dir=DIRECTORY_CONFIGS['bigsky']['seq'])

Given a valid run id return the path to the sample sheet

Source code in scripts/files.py
def runid2samplesheet(runid, top_dir=DIRECTORY_CONFIGS['bigsky']['seq']):
    """
        Given a valid run id return the path to the sample sheet
    """
    ss_path = Path(top_dir, runid)
    if not ss_path.exists():
        raise FileNotFoundError(f"Run directory does not exist: {ss_path}")
    if Path(ss_path, f"SampleSheet.txt").exists():
        ss_path = Path(ss_path, f"SampleSheet.txt")
    elif Path(ss_path, f"SampleSheet.csv").exists():
        ss_path = Path(ss_path, f"SampleSheet.csv")
    elif Path(ss_path, f"SampleSheet_{runid}.txt").exists():
        ss_path = Path(ss_path, f"SampleSheet_{runid}.txt")
    elif Path(ss_path, f"SampleSheet_{runid}.csv").exists():
        ss_path = Path(ss_path, f"SampleSheet_{runid}.csv")
    else:
        raise FileNotFoundError("Run sample sheet does not exist: " + str(ss_path) + f"/SampleSheet_{runid}.[txt, csv]")
    return ss_path

sniff_samplesheet(ss)

Given a sample sheet file return the appropriate function to parse the sheet.

Source code in scripts/files.py
def sniff_samplesheet(ss):
    """
        Given a sample sheet file return the appropriate function to parse the
        sheet.
    """
    return IllumniaSampleSheet

Utility helper functions

exec_pipeline(configs, dry_run=False, local=False)

Execute the BCL->FASTQ pipeline.

This executes the pipeline.

Source code in scripts/utils.py
def exec_pipeline(configs, dry_run=False, local=False):
    """
        Execute the BCL->FASTQ pipeline.

        This executes the pipeline.
    """
    this_instrument = 'Illumnia'
    snake_file = SNAKEFILE[this_instrument]['ngs_qc']
    fastq_demux_profile = DIRECTORY_CONFIGS[get_current_server()]['profile']
    profile_config = {}
    if Path(fastq_demux_profile, 'config.yaml').exists():
        profile_config.update(yaml.safe_load(open(Path(fastq_demux_profile, 'config.yaml'))))

    top_singularity_dirs = [Path(c_dir, '.singularity').absolute() for c_dir in configs['out_to']]
    top_config_dirs = [Path(c_dir, '.config').absolute() for c_dir in configs['out_to']]
    _dirs = top_singularity_dirs + top_config_dirs
    mk_or_pass_dirs(*_dirs)
    skip_config_keys = ('resources', 'runqc', 'use_scratch')

    for i in range(0, len(configs['run_ids'])):
        this_config = {k: (v[i] if k not in skip_config_keys else v) for k, v in configs.items() if v}
        this_config.update(profile_config)

        extra_to_mount = [this_config['out_to'], this_config['demux_input_dir']]
        if this_config['bclconvert']:
            bclcon_log_dir = Path(this_config['out_to'], "logs", "bclconvert_demux")
            if not bclcon_log_dir.exists():
                bclcon_log_dir.mkdir(mode=0o755, parents=True)
            extra_to_mount.append(str(bclcon_log_dir) + ":" + "/var/log/bcl-convert:rw")
        if this_config.get('disambiguate', False):
            extra_to_mount.append(Path(this_config['host_genome']).parent)
            extra_to_mount.append(Path(this_config['pathogen_genome']).parent)
        singularity_binds = get_mounts(*extra_to_mount)
        config_file = Path(this_config['out_to'], '.config', f'config_job_{str(i)}.json').absolute()
        json.dump(this_config, open(config_file, 'w'), cls=PathJSONEncoder, indent=4)
        top_env = {}
        top_env['PATH'] = os.environ["PATH"]
        top_env['SNK_CONFIG'] = str(config_file.absolute())
        top_env['SINGULARITY_CACHEDIR'] = str(Path(this_config['out_to'], '.singularity').absolute())
        this_cmd = [
            "snakemake", "-p", "--use-singularity", "--rerun-incomplete", "--keep-incomplete",
            "--rerun-triggers", "mtime", "--verbose", "-s", snake_file,
        ]

        if singularity_binds and not dry_run:
            this_cmd.extend(["--singularity-args", f"\"--env 'TMPDIR=/tmp' -C -B '{singularity_binds}'\""])

        if dry_run:
            print(f"{esc_colors.OKGREEN}> {esc_colors.ENDC}{esc_colors.UNDERLINE}Dry run{esc_colors.ENDC} " + \
                  f"demultiplexing of run {esc_colors.BOLD}{esc_colors.OKGREEN}{this_config['run_ids']}{esc_colors.ENDC}...")
            this_cmd.extend(['--dry-run'])
        else:
            if not local:
                this_cmd.extend(["--profile", fastq_demux_profile])
            print(f"{esc_colors.OKGREEN}> {esc_colors.ENDC}Executing ngs qc pipeline for run {esc_colors.BOLD}"
                  f"{esc_colors.OKGREEN}{this_config['run_ids']}{esc_colors.ENDC}...")

        print(' '.join(map(str, this_cmd)))
        exec_snakemake(this_cmd, local=local, dry_run=dry_run, env=top_env, cwd=str(Path(this_config['out_to']).absolute()))

valid_runid(id_to_check)

Given an input ID get it's validity against the run id format: YYMMDD_INSTRUMENTID_TIME_FLOWCELLID

Source code in scripts/utils.py
def valid_runid(id_to_check):
    '''
        Given an input ID get it's validity against the run id format:
            YYMMDD_INSTRUMENTID_TIME_FLOWCELLID
    '''
    id_to_check = str(id_to_check)
    id_parts = id_to_check.split('_')
    if len(id_parts) != 4:
        raise ValueError(f"Invalid run id format: {id_to_check}")
    try:
        # YY MM DD
        date_parser(id_parts[0])
    except Exception as e:
        raise ValueError('Invalid run id date') from e
    try:
        # HH MM
        h = int(id_parts[2][0:3])
        m = int(id_parts[2][2:])
    except ValueError as e:
        raise ValueError('Invalid run id time') from e

    if h >= 25 or m >= 60:
        raise ValueError('Invalid run id time: ' + h + m)

    # TODO: check instruments against labkey
    return id_to_check

Server configuration functions

get_bigsky_seq_dirs()

Get a list of sequence directories, that have the required illumnia file artifacts: RTAComplete.txt - breadcrumb file created by bigsky transfer process and illumnia sequencing

Returns:

Type Description
list

list of pathlib.Paths of all sequencing directories on bigsky server

Source code in scripts/config.py
def get_bigsky_seq_dirs():
    """Get a list of sequence directories, that have the required illumnia file artifacts:
    RTAComplete.txt - breadcrumb file created by bigsky transfer process and illumnia sequencing

    Returns:
        (list): list of `pathlib.Path`s of all sequencing directories on bigsky server
    """
    top_dir = Path("/gs1/RTS/NextGen/SequencerRuns/")
    transfer_breadcrumb = "RTAComplete.txt"
    if not top_dir.exists():
        return None
    seq_dirs = []
    for this_dir in top_dir.iterdir():
        if not this_dir.is_dir(): continue
        for this_child_elem in this_dir.iterdir():
            try:
                elem_checks = [
                    this_child_elem.is_dir(), 
                    Path(this_child_elem, transfer_breadcrumb).exists(),
                    check_access(this_child_elem, R_OK)
                ]
            except (PermissionError, FileNotFoundError) as error:
                continue
            if all(elem_checks):
                seq_dirs.append(this_child_elem.absolute())
    return seq_dirs

get_biowulf_seq_dirs()

Get a list of sequence directories, that have the required illumnia file artifacts: RTAComplete.txt - breadcrumb file created by bigsky transfer process and illumnia sequencing

Returns:

Type Description
list

list of pathlib.Paths of all sequencing directories on biowulf server

Source code in scripts/config.py
def get_biowulf_seq_dirs():
    """Get a list of sequence directories, that have the required illumnia file artifacts:
    RTAComplete.txt - breadcrumb file created by bigsky transfer process and illumnia sequencing

    Returns:
        (list): list of `pathlib.Path`s of all sequencing directories on biowulf server
    """
    top_dir = Path("/data/RTB_GRS/SequencerRuns/")
    transfer_breadcrumb = "RTAComplete.txt"
    if not top_dir.exists():
        return None
    return [xx for x in top_dir.iterdir() if x.is_dir() for xx in x.iterdir() if xx.is_dir() and Path(xx, transfer_breadcrumb).exists()]

get_current_server()

Return the current server name by looking at the hostname

Returns:

Type Description
str

one of bigsky, biowulf, or locus

Source code in scripts/config.py
def get_current_server():
    """Return the current server name by looking at the hostname

    Returns:
        (str): one of `bigsky`, `biowulf`, or `locus`
    """
    hn = gethostname()
    # bigsky hostnames
    re_bigsky = (r"ai-rml.*\.niaid\.nih\.gov", "bigsky")

    # biowulf hostnames
    re_biowulf_head = (r"biowulf\.nih\.gov", "biowulf")
    re_biowulf_compute = (r"cn\d{4}", "biowulf")

    # skyline hostnames
    re_skyline_head = (r"ai-hpc(submit|n)(\d+)?", "skyline")
    re_skyline_compute = (r"ai-hpc(submit|n)(\d+)?", "skyline")

    host_profiles = [re_bigsky, re_biowulf_compute, re_biowulf_head, re_skyline_head, re_skyline_compute]

    host = None
    for pat, this_host in host_profiles:
        if re.match(pat, hn):
            host = this_host
            break
    if host is None:
        raise ValueError(f"Unknown host profile")
    return host

get_resource_config()

Return a dictionary containing server specific references utilized in the workflow for directories or reference files.

Returns:

Type Description
dict

return configuration key value pairs of current server::

{ "sif": "/server/location/to/sif/directory", "mounts": { "refence binding": { "to": "/bind/to", "from": "/bind/from", "mode": "ro/rw" }, ... }

Source code in scripts/config.py
def get_resource_config():
    """Return a dictionary containing server specific references utilized in 
    the workflow for directories or reference files.

    Returns:
        (dict): return configuration key value pairs of current server::

            {
            "sif": "/server/location/to/sif/directory",
            "mounts": {
                "refence binding": {
                    "to": "/bind/to",
                    "from": "/bind/from",
                    "mode": "ro/rw"
                },
                ...
            }
    """
    resource_dir = Path(__file__, '..', '..', 'config').absolute()
    resource_json = Path(resource_dir, get_current_server() + '.json').resolve()

    if not resource_json.exists():
        return None

    return json.load(open(resource_json))