Gestionar recursos de cómputo y datos de Qiskit Serverless
Versiones de paquetes
El código de esta página fue desarrollado con los siguientes requisitos. Recomendamos usar estas versiones o versiones más recientes.
qiskit[all]~=2.0.0
qiskit-ibm-runtime~=0.37.0
qiskit-serverless~=0.22.0
Con Qiskit Serverless, puedes gestionar el cómputo y los datos en tu patrón Qiskit, incluyendo CPUs, QPUs y otros aceleradores de cómputo.
Establecer estados detallados
Las cargas de trabajo serverless tienen varias etapas dentro de un flujo de trabajo. De forma predeterminada, los siguientes estados se pueden consultar con job.status():
QUEUED: la carga de trabajo está en cola para recursos clásicosINITIALIZING: la carga de trabajo se está configurandoRUNNING: la carga de trabajo se está ejecutando en recursos clásicosDONE: la carga de trabajo se ha completado con éxito
También puedes establecer estados personalizados que describan con mayor detalle la etapa específica del flujo de trabajo, como se muestra a continuación.
# Added by doQumentation — required packages for this notebook
!pip install -q qiskit qiskit-ibm-runtime qiskit-serverless
# This cell is hidden from users, it just creates a new folder
from pathlib import Path
Path("./source_files").mkdir(exist_ok=True)
%%writefile ./source_files/status_example.py
from qiskit_serverless import update_status, Job
## If your function has a mapping stage, particularly application functions, you can set the status to "RUNNING: MAPPING" as follows:
update_status(Job.MAPPING)
## While handling transpilation, error suppression, and so forth, you can set the status to "RUNNING: OPTIMIZING_FOR_HARDWARE":
update_status(Job.OPTIMIZING_HARDWARE)
## After you submit jobs to Qiskit Runtime, the underlying quantum job will be queued. You can set status to "RUNNING: WAITING_FOR_QPU":
update_status(Job.WAITING_QPU)
## When the Qiskit Runtime job starts running on the QPU, set the following status "RUNNING: EXECUTING_QPU":
update_status(Job.EXECUTING_QPU)
## Once QPU is completed and post-processing has begun, set the status "RUNNING: POST_PROCESSING":
update_status(Job.POST_PROCESSING)
Writing ./source_files/status_example.py
Una vez que la carga de trabajo se complete con éxito (con save_result()), este estado se actualizará a DONE automáticamente.
Flujos de trabajo en paralelo
Para tareas clásicas que pueden paralelizarse, usa el decorador @distribute_task para definir los requisitos de cómputo necesarios para realizar una tarea. Comienza recordando el ejemplo transpile_remote.py del tema Escribe tu primer programa con Qiskit Serverless con el siguiente código.
El siguiente código requiere que ya hayas guardado tus credenciales.
%%writefile ./source_files/transpile_remote.py
from qiskit.transpiler import generate_preset_pass_manager
from qiskit_ibm_runtime import QiskitRuntimeService
from qiskit_serverless import distribute_task
service = QiskitRuntimeService()
@distribute_task(target={"cpu": 1})
def transpile_remote(circuit, optimization_level, backend):
"""Transpiles an abstract circuit (or list of circuits) into an ISA circuit for a given backend."""
pass_manager = generate_preset_pass_manager(
optimization_level=optimization_level,
backend=service.backend(backend)
)
isa_circuit = pass_manager.run(circuit)
return isa_circuit
Writing ./source_files/transpile_remote.py
En este ejemplo, has decorado la función transpile_remote() con @distribute_task(target={"cpu": 1}). Al ejecutarse, esto crea una tarea de trabajador paralelo asíncrona con un único núcleo de CPU, y devuelve una referencia para rastrear el trabajador. Para obtener el resultado, pasa la referencia a la función get(). Podemos usar esto para ejecutar múltiples tareas en paralelo:
%%writefile --append ./source_files/transpile_remote.py
from time import time
from qiskit_serverless import get, get_arguments, save_result, update_status, Job
# Get arguments
arguments = get_arguments()
circuit = arguments.get("circuit")
optimization_level = arguments.get("optimization_level")
backend = arguments.get("backend")
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Start distributed transpilation
update_status(Job.OPTIMIZING_HARDWARE)
start_time = time()
transpile_worker_references = [
transpile_remote(circuit, optimization_level, backend)
for circuit in arguments.get("circuit_list")
]
transpiled_circuits = get(transpile_worker_references)
end_time = time()
Appending to ./source_files/transpile_remote.py
%%writefile --append ./source_files/transpile_remote.py
# Save result, with metadata
result = {
"circuits": transpiled_circuits,
"metadata": {
"resource_usage": {
"RUNNING: OPTIMIZING_FOR_HARDWARE": {
"CPU_TIME": end_time - start_time,
"QPU_TIME": 0,
},
}
},
}
save_result(result)
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It uploads the serverless program and checks it runs.
def test_serverless_job(title, entrypoint):
# Import in function to stop them interfering with user-facing code
from qiskit.circuit.random import random_circuit
from qiskit_serverless import IBMServerlessClient, QiskitFunction
import time
import uuid
title += "_" + uuid.uuid4().hex[:8]
serverless = IBMServerlessClient()
transpile_remote_demo = QiskitFunction(
title=title,
entrypoint=entrypoint,
working_dir="./source_files/",
)
serverless.upload(transpile_remote_demo)
job = serverless.get(title).run(
circuit=random_circuit(3, 3),
circuit_list=[random_circuit(3, 3) for _ in range(3)],
backend="ibm_torino",
optimization_level=1,
)
for retry in range(25):
time.sleep(5)
status = job.status()
if status == "DONE":
print("Job completed successfully")
return
if status not in [
"QUEUED",
"INITIALIZING",
"RUNNING",
"RUNNING: OPTIMIZING_FOR_HARDWARE",
"DONE",
]:
raise Exception(
f"Unexpected job status '{status}'.\nHere's the logs:\n"
+ job.logs()
)
print(f"Waiting for job (status '{status}')")
raise Exception("Job did not complete in time")
test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully
Explorar diferentes configuraciones de tareas
Puedes asignar CPU, GPU y memoria de forma flexible para tus tareas mediante @distribute_task(). Para Qiskit Serverless en IBM Quantum® Platform, cada programa dispone de 16 núcleos de CPU y 32 GB de RAM, que pueden asignarse dinámicamente según sea necesario.
Los núcleos de CPU pueden asignarse como núcleos completos o incluso como asignaciones fraccionarias, como se muestra a continuación.
La memoria se asigna en número de bytes. Recuerda que hay 1024 bytes en un kilobyte, 1024 kilobytes en un megabyte y 1024 megabytes en un gigabyte. Para asignar 2 GB de memoria a tu trabajador, debes asignar "mem": 2 * 1024 * 1024 * 1024.
%%writefile --append ./source_files/transpile_remote.py
@distribute_task(target={
"cpu": 16,
"mem": 2 * 1024 * 1024 * 1024
})
def transpile_remote(circuit, optimization_level, backend):
return None
Appending to ./source_files/transpile_remote.py
# This cell is hidden from users.
# It checks the distributed program works.
test_serverless_job(
title="transpile_remote_serverless_test", entrypoint="transpile_remote.py"
)
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'QUEUED')
Waiting for job (status 'RUNNING')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Waiting for job (status 'RUNNING: OPTIMIZING_FOR_HARDWARE')
Job completed successfully
Gestionar datos en tu programa
Qiskit Serverless te permite gestionar archivos en el directorio /data en todos tus programas. Esto incluye varias limitaciones:
- Solo se admiten archivos
taryh5por el momento - Este almacenamiento es únicamente plano en
/data, y no puede tener subdirectorios/data/folder/
A continuación se muestra cómo subir archivos. Asegúrate de haberte autenticado en Qiskit Serverless con tu cuenta de IBM Quantum (consulta Desplegar en IBM Quantum Platform para obtener instrucciones).
import tarfile
from qiskit_serverless import IBMServerlessClient
# Create a tar
filename = "transpile_demo.tar"
file = tarfile.open(filename, "w")
file.add("./source_files/transpile_remote.py")
file.close()
# Get a reference to a QiskitFunction
serverless = IBMServerlessClient()
transpile_remote_demo = next(
program
for program in serverless.list()
if program.title == "transpile_remote_serverless"
)
# Upload the tar to Serverless data directory
serverless.file_upload(file=filename, function=transpile_remote_demo)
'{"message":"/usr/src/app/media/5e1f442128cdf60018496a04/transpile_demo.tar"}'
A continuación, puedes listar todos los archivos en tu directorio data. Estos datos son accesibles para todos los programas.
serverless.files(function=transpile_remote_demo)
['classifier_name.pkl.tar', 'output.json.tar', 'transpile_demo.tar']
Esto se puede hacer desde un programa usando file_download() para descargar el archivo al entorno del programa y descomprimir el tar.
%%writefile ./source_files/extract_tarfile.py
import tarfile
from qiskit_serverless import IBMServerlessClient
serverless = IBMServerlessClient(token="<YOUR_API_KEY>") # Use the 44-character API_KEY you created and saved from the IBM Quantum Platform Home dashboard
files = serverless.files()
demo_file = files[0]
downloaded_tar = serverless.file_download(demo_file)
with tarfile.open(downloaded_tar, 'r') as tar:
tar.extractall()
En este punto, tu programa puede interactuar con los archivos tal como lo harías en un experimento local. file_upload(), file_download() y file_delete() pueden llamarse desde tu experimento local o desde tu programa subido, para una gestión de datos consistente y flexible.
Próximos pasos
- Consulta un ejemplo completo que migra código existente a Qiskit Serverless.
- Lee un artículo en el que investigadores usaron Qiskit Serverless y la supercomputación cuántica centrada en quantum para explorar la química cuántica.