# Data Curation API - Python Notebook

### Options

In [None]:
dc_api_url = "https://knowledge-enrichment.ai.experience.hyland.com/latest/api/data-curation"
dc_options = {
    "normalization": {
        "quotations": True, # Normalize quotation marks
        "dashes": True # Normalize dashes
    },
    "chunking": True, # Enable chunking of the text
    "chunk_size": 1000, # Desired size of each text chunk (may be longer to prevent breaking sentences/paragraphs)
    "embedding": False, # Generate text embeddings for the chunks (chunking is required)
    "json_schema": False, # "FULL", "MDAST", "PIPELINE", or False
    "pii": False # "redaction", "detection", or false
}

### Credentials

In [None]:
# OAuth client credentials need to be cofigured in the admin console.
# https://admin.experience.hyland.com/external-systems/external-applications
# Ensure the external application has the `environment_authorization` scope and is configured for an environment
# that has the `cin-data-curation-api` application (environment has a Data Curation API subscription).
oauth_url = "https://auth.iam.experience.hyland.com/idp"
oauth_client_id = ""
oauth_client_secret = ""

### Helper Functions

In [None]:
import base64
import datetime
from IPython.display import display, HTML, Markdown
import json
import os
from urllib.error import HTTPError
from urllib.parse import urlencode
from urllib.request import Request, urlopen

def send_http_request(method: str, url: str, headers: dict = {}, data: bytes | None = None) -> dict:
    """Sends an HTTP request and returns the response body as a dictionary."""
    request = Request(url=url, data=data, headers=headers, method=method)
    print(f"Sending request to: {request.method} {request.full_url}")
    response_body = {}
    try:
        response = urlopen(request)
        response_bytes = response.read()
        if len(response_bytes) > 0:
            response_body = json.loads(response_bytes.decode())
    except HTTPError as e:
        display(f"HTTP Error: {e.code} - {e.read().decode()}")
        if e.code != 404:
            raise
    finally:
        if "response" in locals():
            response.close()

    return response_body

def send_get_request(url: str, headers: dict = {}) -> dict:
    """Sends a GET request to the specified URL with optional headers."""
    return send_http_request(method="GET", url=url, headers=headers)

def send_post_request(url: str, headers: dict = {}, data: bytes | None = None) -> dict:
    """Sends a POST request to the specified URL with optional headers and data."""
    return send_http_request(method="POST", url=url, headers=headers, data=data)

def send_json_post_request(url: str, headers: dict = {}, data: dict | None = None) -> dict:
    """Sends a POST request to the specified URL with optional headers and JSON data."""
    headers["Content-Type"] = "application/json"
    json_data = json.dumps(data).encode()
    return send_post_request(url=url, data=json_data, headers=headers)

def send_form_post_request(url: str, headers: dict = {}, data: dict | None = None) -> dict:
    """Sends a POST request to the specified URL with optional headers and form data."""
    headers["Content-Type"] = "application/x-www-form-urlencoded"
    form_data = urlencode(data, doseq=True).encode()
    return send_post_request(url=url, data=form_data, headers=headers)

def send_put_request(url: str, headers: dict = {}, data: bytes | None = None) -> dict:
    """Sends a PUT request to the specified URL with optional headers and data."""
    return send_http_request(method="PUT", url=url, headers=headers, data=data)

def refresh_access_token() -> str | None:
    """Refreshes the access token using the OAuth client credentials."""
    print("Getting a new access token")
    access_token = None
    headers = {
        "Accept": "application/json",
        "Authorization": f"Basic {base64_encode(f"{oauth_client_id}:{oauth_client_secret}")}"
    }
    body = {
        "grant_type": "client_credentials",
        "scope": "environment_authorization"
    }
    token_url = f"{oauth_url}/connect/token"
    token_response = send_form_post_request(url=token_url, data=body, headers=headers)
    access_token = token_response.get("access_token")
    is_access_token_valid(access_token)

    return access_token

def decode_access_token(token: str) -> dict | None:
    """Decodes the access token to extract its JSON payload."""
    return json_loads(base64_decode(token.split(".")[1]))

def is_access_token_valid(token: str) -> bool:
    """Validates the access token by checking its expiration time."""
    if not token:
        print("Access token is empty")
        print()
        return False
    try:
        token_json = decode_access_token(token)
        token_expiration_time = datetime.datetime.fromtimestamp(token_json["exp"]) - datetime.datetime.now()
        print(f"Access Token Expires In: {token_expiration_time}")
        print()
        return token_expiration_time > datetime.timedelta(seconds=30)  # Check if token is valid for at least 30 seconds
    except Exception as e:
        print(f"Error validating access token: {e}")
        print()
        return False
    
def base64_encode(data: str) -> str:
    """Encodes a string to base64."""
    return base64.b64encode(data.encode()).decode()

def base64_decode(data: str) -> str:
    """Decodes a base64 encoded string."""
    return base64.b64decode(data.encode()).decode()
    
def json_dumps(obj) -> str:
    """Converts an object to a JSON string with indentation."""
    return json.dumps(obj, indent=2, default=str)

def json_loads(json_data: str):
    """Converts an object to a JSON string with indentation."""
    return json.loads(json_data)

def list_files(directory: str) -> dict:
    """Lists files in the specified directory."""
    directory_abs = os.path.abspath(directory)
    file_list = os.listdir(directory_abs)
    display(Markdown(f"Listing files in path: {directory_abs}\r\n- {'\r\n- '.join(file_list)}"))
    return [os.path.abspath(os.path.join(directory_abs, file_name)) for file_name in file_list]

def obj_to_html(obj, top_level: bool = True) -> str:
    """Converts an object to an HTML representation."""
    if hasattr(obj, "__dict__"):
        rows = []
        for attr_name, attr_value in vars(obj).items():
            html_value = obj_to_html(attr_value, False)  # Recursively convert nested objects to HTML
            rows.append(f"<tr><td>{attr_name}</td><td>{html_value}</td></tr>")
        body = f"<table><thead><tr><th><i>field</i></th><th>value</th></tr></thead><tbody>{''.join(rows)}</tbody></table>"
    elif isinstance(obj, list):
        rows = [f"<tr><td>{i}</td><td>{obj_to_html(item, False)}</td></tr>" for i, item in enumerate(obj)]
        body = f"<table><thead><tr><th><i>index</i></th><th>value</th></tr></thead><tbody>{''.join(rows)}</tbody></table>"
    elif isinstance(obj, dict):
        rows = [f"<tr><td>{key}</td><td>{obj_to_html(value, False)}</td></tr>" for key, value in obj.items()]
        body = f"<table><thead><tr><th><i>key</i></th><th>value</th></tr></thead><tbody>{''.join(rows)}</tbody></table>"
    else:
        body = str(obj).replace("<", "&lt;").replace(">", "&gt;").replace("\n", "<br/>")
    return body if top_level or (body and (isinstance(obj, str) or obj is None)) else f"""
        <details {'open="open"' if top_level else ''} class="dni-treeview">
            <summary>
                <span class="dni-code-hint">
                    <code>{str(obj.__class__.__name__)}</code>
                </span>
            </summary>
            <div>{body}</div>
        </details>
            """

def display_html(obj):
    """Displays an object as HTML."""
    style = """
        <style>
        .dni-code-hint {
            font-style: italic;
            overflow: hidden;
            white-space: nowrap;
        }
        .dni-treeview {
            white-space: nowrap;
        }
        .dni-treeview td {
            vertical-align: top;
            text-align: start;
        }
        details.dni-treeview {
            padding-left: 1em;
        }
        table td {
            text-align: start;
        }
        table tr { 
            vertical-align: top; 
            margin: 0em 0px;
        }
        table tr td pre 
        { 
            vertical-align: top !important; 
            margin: 0em 0px !important;
        } 
        table th {
            text-align: start;
        }
        </style>
    """
    display(HTML(obj_to_html(obj) + style))

### List Files to Upload

In [None]:
# Expects there to be an "input" directory with files to process
input_file_names = list_files("input")

### Get Access Token

In [None]:
access_token = refresh_access_token()
if access_token:
    token_json = decode_access_token(access_token)
    print(f"Access Token: {json_dumps(token_json)}")

### Get Presign URL

In [None]:
from dataclasses import dataclass

if not is_access_token_valid(access_token):
    access_token = refresh_access_token()

@dataclass
class PresignResponse:
    job_id: str
    put_url: str
    get_url: str
    file_name: str

presign_responses = []
for input_file_name in input_file_names:
    presign_headers = {
        "Authorization": f"Bearer {access_token}"
    }
    presign_url = f"{dc_api_url}/presign"
    presign_response_json = send_json_post_request(url=presign_url, headers=presign_headers, data=dc_options)
    print(f"Presign Response for {input_file_name}:")
    print(json_dumps(presign_response_json))
    print()
    presign_response = PresignResponse(
        job_id=presign_response_json.get("job_id"),
        put_url=presign_response_json.get("put_url"),
        get_url=presign_response_json.get("get_url"),
        file_name=input_file_name,
    )
    if (presign_response.job_id is not None and presign_response.put_url is not None and presign_response.get_url is not None):
        presign_responses.append(presign_response)


### Upload Files

In [None]:
for presign_response in presign_responses:
    if not presign_response.put_url or not presign_response.file_name:
        continue

    print(f"Uploading file: {presign_response.file_name}")
    with open(presign_response.file_name, mode="rb") as input_file:
        input_bytes = input_file.read()
    length = len(input_bytes)
    put_headers = {
        'Content-Type': 'application/octet-stream',
        'Content-Length': length
    }
    _ = send_put_request(url=presign_response.put_url, data=input_bytes, headers=put_headers)
    print(f"Upload succeeded for {presign_response.job_id}")
    print()

### Get Status

In [None]:
from dataclasses import dataclass

if not is_access_token_valid(access_token):
    access_token = refresh_access_token()

@dataclass
class StatusResponse:
    job_id: str
    status: str

for presign_response in presign_responses:
    status_headers = {
        "Authorization": f"Bearer {access_token}"
    }
    status_url = f"{dc_api_url}/status/{presign_response.job_id}"
    status_response_json = send_get_request(url=status_url, headers=status_headers)
    status_response = StatusResponse(
        job_id=status_response_json["jobId"],
        status=status_response_json["status"]
    )
    print(f"Status for {presign_response.job_id} = {status_response.status}")
    print()

### Download Results

In [None]:
output_results_json = []
for presign_response in presign_responses:
    print(f"Downloading results for: {presign_response.job_id}")
    get_response_json = send_get_request(url=presign_response.get_url)
    if get_response_json:
        output_results_json.append(get_response_json)

    print(f"Results for {presign_response.job_id}:")
    print(json_dumps(get_response_json))
    print()

### Parse Results

In [None]:
from dataclasses import dataclass

@dataclass
class ApiResponseChunk:
    text: str
    location: str
    embeddings: list[float]

@dataclass
class ApiResponseMarkdown:
    output: str
    chunks: list[ApiResponseChunk]

@dataclass
class ApiResponse:
    markdown: ApiResponseMarkdown
    json: dict

output_results = []
for output_result_json in output_results_json:
    chunks_json = output_result_json["markdown"].get("chunks", None)
    chunk_with_embeddings_json = output_result_json["markdown"].get("chunks_with_embeddings", None)
    locations_json = output_result_json["markdown"]["locations"]
    locations_json = [l.replace("<", "&lt;").replace(">", "&gt;") for l in locations_json]

    chunks = []
    if chunk_with_embeddings_json:
        for i in range(len(chunk_with_embeddings_json)):
            embeddings_json = chunk_with_embeddings_json[i]["embeddings"]
            chunks.append(ApiResponseChunk(
                text=chunk_with_embeddings_json[i]["chunk"],
                location=locations_json[i],
                embeddings=embeddings_json
            ))
    elif chunks_json:
        for i in range(len(chunks_json)):
            chunks.append(ApiResponseChunk(
                text=chunks_json[i],
                location=locations_json[i],
                embeddings=None
            ))

    api_response = ApiResponse(
        markdown=ApiResponseMarkdown(
            output=output_result_json["markdown"]["output"],
            chunks=chunks
        ),
        json=output_result_json.get("json", None)
    )
    output_results.append(api_response)
display_html(output_results)

### Display Markdown

In [None]:
from IPython.display import display, Markdown

for i in range(len(output_results)):
    print(presign_responses[i].file_name)
    display(Markdown(output_results[i].markdown.output))

### Display JSON

In [None]:
from IPython.display import display

for i in range(len(output_results)):
    print(presign_responses[i].file_name)
    display(output_results[i].json)
