Nextflow

Nextflow is the preferred workflow framework in Qristal. The typical use case is a large, multi-stage HPC application that has sequential stages (best executed within an interactive session); followed by stages that are parallel (best executed as independent job submissions to the HPC scheduler).

Qristal’s class structure is well suited to producing command line executable programs that utilize MPI for parallelization. We then use Nextflow to stitch together multiple MPI-enabled executables into a main application workflow. For hybrid quantum-classical applications, each process of the MPI executable has functions that run either on a simulated quantum backend, or an actual QPU.

Installing Nextflow

Prerequisites

Java Runtime Environment

sudo apt install openjdk-17-jre

Downloading Nextflow

curl -s https://get.nextflow.io | bash

Structure of a Nextflow project

The root directory of a project will contain these files:

nextflow.config

An example of this file is shown below:

profiles {
    standard {
        process.executor = 'local'
        process.cpus = 4
        process.memory = '2 GB'
    }
    cluster {
        process.executor = 'slurm'
        process.queue = 'work'
        process.cpus = 16
        process.memory = '16 GB'
    }
    qdk {
        process.executor = 'local'
        process.cpus = 1
        process.memory = '2 GB'
    }
    aws {
        process.cpus = 1
        process.memory = '2 GB'
    }
}

main.nf

This is the entrypoint script that drives the Nextflow execution. Below is an example of a main.nf that sweeps over 5 settings for SVD cutoff (used by the MPS method):

#!/usr/bin/env nextflow
nextflow.enable.dsl=2

process sweepSvdCutoff {
    debug false
    input:
        val svdcutoff

    output:
        stdout

    """
    #!/usr/bin/python3
    import qb.core
    import numpy as np

    tqb = qb.core.session()
    tqb.qb12()
    tqb.qn = 2     # Number of qubits
    tqb.sn = 1024  # Number of shots
    tqb.acc = "tnqvm"
    tqb.noplacement = True
    tqb.nooptimise = True
    tqb.notiming = True
    tqb.output_oqm_enabled = False
    tqb.svd_cutoff[0][0][0] = ${svdcutoff}
    tqb.instring = '''

    __qpu__ void QBCIRCUIT(qreg q) {
    OPENQASM 2.0;
    include "qelib1.inc";
    creg c[2];
    h q[0];
    cx q[0],q[1];
    measure q[1] -> c[1];
    measure q[0] -> c[0];
    }

    '''
    tqb.run()
    print("SVD cutoff: ", tqb.svd_cutoff[0][0][0])
    print(tqb.out_raw[0][0])
    """
}

workflow {
  channel.of(0.1, 0.01, 0.001, 0.0001, 0.00001) | sweepSvdCutoff | view
}

bin/

This directory should contain all scripts (eg Bash, Perl, Python) that are executable. This directory is automatically added to the search path by Nextflow.

work/

This is where output from Nextflow is stored. There will be one subdirectory per pipeline execution here.

Executing a Nextflow pipeline and generating a memory usage and execution time report

Execute with the standard profile:

nextflow run main.nf -with-report

Execute with the cluster profile:

nextflow run main.nf -profile cluster -with-report

Example demonstrating asynchronous execution over multiple workers (shot partitioning)

The pipeline parallelises circuit evaluation across multiple processes, and across multiple threads per process. This example detects all circuits in the current working directory (extension .oqm). These are assumed to be in OpenQASM 2.0 format.

main-partitioned.nf

#!/usr/bin/env nextflow
nextflow.enable.dsl=2
import groovy.json.JsonSlurper

def N_SHOTS = 512
def N_PROCESSES = 4
def N_ASYNC_THREADS = 2

def N_PHYSICAL_QUBITS = 2
def QRISTAL_ACC = "aer"

def jsonSlurper = new JsonSlurper()

process partitionCircuitQubitBackend {
    debug false
    input:
        path circuit
        each shots_N

    output:
       stdout

    """
    #!/usr/bin/python3
    import ast
    import json
    import numpy as np
    import time
    import qb.core
    tqb = qb.core.session()
    tqb.qb12()

    tqb.qn = $N_PHYSICAL_QUBITS  # Number of qubits
    tqb.infile = "$circuit"

    tqb.noplacement = True
    tqb.nooptimise = True
    tqb.notiming = True
    tqb.output_oqm_enabled = False

    NW = $N_ASYNC_THREADS  # number of async workers
    SLEEP_SECONDS = 0.1 # seconds to sleep between progress
    ALG_UNDER_TEST = 0

    # Set up workers
    # Set up the pool of backends for parallel task distribution
    qpu_config = {"accs": NW*[{"acc": "$QRISTAL_ACC"}]}
    tqb.set_parallel_run_config(json.dumps(qpu_config))

    # Set the number of threads to use in the thread pool
    tqb.num_threads = NW

    # Set up jobs that partition the requested number of shots
    tqb.sn[ALG_UNDER_TEST].clear()
    for jj in range(NW):
        tqb.sn[ALG_UNDER_TEST].append($shots_N // NW)

    handles = []
    for i in range(NW):
        handles.append(tqb.run_async(ALG_UNDER_TEST, i))
        time.sleep(0.001)

    # Gather the results
    allres = {}
    componentres = [ast.literal_eval(handles[i].get()) for i in range(NW)]
    for ii in range(NW):
        allres = {k: allres.get(k,0) + componentres[ii].get(k,0) for k in set(allres) | set(componentres[ii])}

    # View the results
    print(json.dumps(allres))

    # Store the settings
    save_js = {}
    save_js['shots'] = $shots_N
    save_js['backend'] = "$QRISTAL_ACC"
    save_js['workers'] = $N_ASYNC_THREADS
    save_js['circuit'] = "$circuit"
    save_js['qubits'] = $N_PHYSICAL_QUBITS
    with open('settings.json', 'w') as f:
        json.dump(save_js, f)
    """
}
def gatherall = [:]
workflow {
  circuit_ch = Channel.fromPath("*.oqm")
  shots_N_ch = (0..<N_PROCESSES).collect { N_SHOTS/N_PROCESSES }
  shotoutcomes_ch = partitionCircuitQubitBackend(circuit_ch, shots_N_ch).map { jsonSlurper.parseText( it ) }
  (shotoutcomes_ch.map { gatherall = (gatherall.keySet() + it.keySet()).collectEntries { k -> [k, (gatherall[k] ?: 0) + (it[k] ?: 0)] } }).last().view()
}

Example OpenQASM file: ex-nf1.oqm

OPENQASM 2.0;
include "qelib1.inc";
qreg q[2];
creg c[2];
h q[0];
cx q[0],q[1];
measure q[0] -> c[0];
measure q[1] -> c[1];

Execute the pipeline using:

nextflow run main-partitioned.nf

The results are stored in subdirectories under work/. View the .command.out and settings.json files there.