1.1.8. Hardware-Resource-Provider Node

Orchestrator (Back-end) Orchestrator (Back-end) ML Model Metadata Node ML Model Metadata Node CO2 footprint CO2 footprint HW Constraints Node Carbontracker Node Carbontracker Node HW Constraints HW Constraints HW Resource HW Resource ML Model ML Model User input data User input data ML Model ML Model HW Resource HW Resource ML Metadata ML Metadata Baseline forOptimization Application-levelRequirements Node User input data User input data User input data User input data App Requirements App Requirements CO2 footprint CO2 footprint Front-end Front-end User input data User input data Output data Output data User User Model Provider Node ML Solution Provider ML Optimization HW Provider Node FPGA Selector... PIM Results

The Hardware-Resource-Provider Node selects the hardware resource that optimally fits the proposed machine learning model.

1.1.8.1. Inputs and Outputs

The following table summarizes the inputs and outputs of the Hardware-Resource-Provider Node:

1.1.8.2. Node Template

Following is the Python API provided for the Hardware-Resource-Provider Node. User is meant to implement the funcionality of the node within the test:callback(). And inside configuration_callback() implement the response to the configuration request from the orchestrator.

# Copyright 2023 SustainML Consortium
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""SustainML HW Resources Provider Node Implementation."""

from sustainml_py.nodes.HardwareResourcesNode import HardwareResourcesNode

# Managing UPMEMEM LLM
import upmem_llm_framework as upmem_layers
import transformers
import onnxruntime
import os
import signal
import threading
import time
import json
import torch
import yaml

# Whether to go on spinning or interrupt
running = False

# ONNX Model-based testing class
class ONNXModel(torch.nn.Module):
    def __init__(self, onnx_model_path):
        super(ONNXModel, self).__init__()
        self.onnx_session = onnxruntime.InferenceSession(onnx_model_path)

    def forward(self, inputs: torch.Tensor):
        input_name = self.onnx_session.get_inputs()[0].name
        np_input = inputs.detach().cpu().numpy()
        outputs = self.onnx_session.run(None, {input_name: np_input})

        if len(outputs) == 1:
            return torch.from_numpy(outputs[0])

        elif len(outputs) == 2:
            bounding_boxes = torch.from_numpy(outputs[0])
            class_scores = torch.from_numpy(outputs[1])
            return bounding_boxes, class_scores

        else:
            return tuple(torch.from_numpy(out) for out in outputs)

    # def forward(self, inputs):
    #     # TODO - Make something intelligent to determine the forward method
    #     return torch.nn.functional.softmax(inputs, dim=0)

def load_any_model(model_name, hf_token=None, **kwargs):

    model = None
    available_model_classes = [
        ("CausalLM", transformers.AutoModelForCausalLM, {"ignore_mismatched_sizes": True}),
        ("Seq2Seq", transformers.AutoModelForSeq2SeqLM, {}),
        ("TokenClassification", transformers.AutoModelForTokenClassification, {}),
        ("SequenceClassification", transformers.AutoModelForSequenceClassification, {}),
        ("PreTraining", transformers.AutoModelForPreTraining, {}),
        ("MaskedLM", transformers.AutoModelForMaskedLM, {}),
        ("MaskGeneration", transformers.AutoModelForMaskGeneration, {}),
        ("MultipleChoice", transformers.AutoModelForMultipleChoice, {}),
        ("NextSentencePrediction", transformers.AutoModelForNextSentencePrediction, {}),
        ("QuestionAnswering", transformers.AutoModelForQuestionAnswering, {}),
        ("TextEncoding", transformers.AutoModelForTextEncoding, {}),
        ("DepthEstimation", transformers.AutoModelForDepthEstimation, {}),
        ("ImageClassification", transformers.AutoModelForImageClassification, {}),
        ("VideoClassification", transformers.AutoModelForVideoClassification, {}),
        ("KeypointDetection", transformers.AutoModelForKeypointDetection, {}),
        ("MaskedImageModeling", transformers.AutoModelForMaskedImageModeling, {}),
        ("ObjectDetection", transformers.AutoModelForObjectDetection, {}),
        ("ImageSegmentation", transformers.AutoModelForImageSegmentation, {}),
        ("ImageToImage", transformers.AutoModelForImageToImage, {}),
        ("SemanticSegmentation", transformers.AutoModelForSemanticSegmentation, {}),
        ("InstanceSegmentation", transformers.AutoModelForInstanceSegmentation, {}),
        ("UniversalSegmentation", transformers.AutoModelForUniversalSegmentation, {}),
        ("ZeroShotImageClassification", transformers.AutoModelForZeroShotImageClassification, {}),
        ("ZeroShotObjectDetection", transformers.AutoModelForZeroShotObjectDetection, {}),
        ("AudioClassification", transformers.AutoModelForAudioClassification, {}),
        ("AudioFrameClassification", transformers.AutoModelForAudioFrameClassification, {}),
        ("CTC", transformers.AutoModelForCTC, {}),
        ("SpeechSeq2Seq", transformers.AutoModelForSpeechSeq2Seq, {}),
        ("AudioXVector", transformers.AutoModelForAudioXVector, {}),
        ("TextToSpectrogram", transformers.AutoModelForTextToSpectrogram, {}),
        ("TextToWaveform", transformers.AutoModelForTextToWaveform, {}),
        ("TableQuestionAnswering", transformers.AutoModelForTableQuestionAnswering, {}),
        ("DocumentQuestionAnswering", transformers.AutoModelForDocumentQuestionAnswering, {}),
        ("VisualQuestionAnswering", transformers.AutoModelForVisualQuestionAnswering, {}),
        ("Vision2Seq", transformers.AutoModelForVision2Seq, {}),
        ("ImageTextToText", transformers.AutoModelForImageTextToText, {}),
        ("VitPose", transformers.VitPoseForPoseEstimation, {}),
        ("Generic", transformers.AutoModel, {})
    ]

    for label, model_class, extra_args in available_model_classes:
        try:
            model = model_class.from_pretrained(
                model_name,
                token=hf_token,
                trust_remote_code=True,
                **{**extra_args, **kwargs}
            )
            print(f"[OK] Model loaded as {label}")
            break
        except Exception as e:
            print(f"[WARN] Could not load model as {label}: {e}")

    if model is None:
        raise Exception(f"Model {model_name} is not currently supported")

    available_token_classes = [
        ("Token", transformers.AutoTokenizer, {}),
        ("Image", transformers.AutoImageProcessor, {"use_fast": True}),
        ("FeatureExtractor", transformers.AutoFeatureExtractor, {}),
        ("Processor", transformers.AutoProcessor, {})
    ]

    for label, token_class, extra_args in available_token_classes:
        try:
            tokenizer = token_class.from_pretrained(
                model_name,
                token=hf_token,
                trust_remote_code=True,
                **{**extra_args, **kwargs}
            )
            print(f"[OK] Token loaded as {label}")
            break
        except Exception as e:
            print(f"[WARN] Could not load token as {label}: {e}")

    if tokenizer is None:
        raise Exception(f"Error initializing tokenizer for model {model_name}: {e}")

    input = None
    try:
        # Text
        if label == "Token":
            if tokenizer.eos_token is None:
                tokenizer.eos_token = "<|endoftext|>"
            if tokenizer.pad_token is None:
                tokenizer.pad_token = tokenizer.eos_token
            text = "How to prepare coffee?"
            input = tokenizer(
                text,
                return_tensors="pt",
                padding=True,
                truncation=True
            )

        # Image or Video
        elif label == "Image" or label == "FeatureExtractor" or "image" in tokenizer.__class__.__name__.lower():
            from PIL import Image
            import numpy as np

            # Check for video case based on tokenizer class name containing "video"
            if "video" in tokenizer.__class__.__name__.lower():
                # Video case: create a list of 16 frames (all white images)
                arr = np.ones((224, 224, 3), dtype=np.uint8) * 255
                img = Image.fromarray(arr)
                video_frames = [img for _ in range(16)]
                input = tokenizer(
                    images=video_frames,
                    return_tensors="pt",
                )
            else:
                # Image case: create a single white image
                arr = np.ones((224, 224, 3), dtype=np.uint8) * 255
                img = Image.fromarray(arr)
                input = tokenizer(
                    images=img,
                    return_tensors="pt",
                )
            input = {k: v.to(torch.float16) if v.dtype == torch.float32 else v for k, v in input.items()}

        # Multimodal
        elif label == "Processor":
            from PIL import Image
            import numpy as np
            # Create a dummy white image
            arr = np.ones((224, 224, 3), dtype=np.uint8) * 255
            img = Image.fromarray(arr)
            text = "How to prepare coffee?"
            # Combine text and image to create input for the processor
            input = tokenizer(text=text, images=img, return_tensors="pt")

        print(f"[OK] Input created correctly as a {label}")

    except Exception as e:
        raise Exception(f"Error creating input for model {model_name}, tokenizer {tokenizer} : {e}")

    return model, tokenizer, input

# Signal handler
def signal_handler(sig, frame):
    print("\nExiting")
    HardwareResourcesNode.terminate()
    global running
    running = False

# User Callback implementation
# Inputs: ml_model, app_requirements, hw_constraints
# Outputs: node_status, hw
def task_callback(ml_model, app_requirements, hw_constraints, node_status, hw):

    global hf_token

    upmem_layers.initialize_profiling_options(simulation=True)
    upmem_layers.profiler_init()

    hw_selected = hw_constraints.hardware_required()[0]

    layer_mapping = {
        "input_layernorm": hw_selected,
        "q_proj": hw_selected,
        "k_proj": hw_selected,
        "rotary_emb": hw_selected,
        "v_proj": hw_selected,
        "o_proj": hw_selected,
        "output_layernorm": hw_selected,
        "gate_proj": hw_selected,
        "up_proj": hw_selected,
        "down_proj": hw_selected,
        "norm": hw_selected,
        "lm_head": hw_selected,
    }

    model_path = ml_model.model_path()
    if isinstance(model_path, (list, tuple)):
        try:
            model_path = ''.join(chr(b) for b in model_path)
        except Exception:
            model_path = ""

    # Use model path if available
    if model_path and model_path != "Error":
        print(f"Using ONNX model path: {model_path}")
        try:
            onnx_model = ONNXModel(model_path)
            my_tensor = torch.rand(1,3,640,640, dtype=torch.float32)
            upmem_layers.profiler_start(layer_mapping)
            onnx_model.forward(my_tensor)
            upmem_layers.profiler_end()

        except Exception as e:
            print(f"[WARN] Failed to load/run ONNX at '{model_path}': {e}. Falling back to HF model.")
            model_path = ""

    # Use Hugging Face model
    else:
        try:
            print(f"Using Hugging Face model")
            hf_token = None
            extra_data_bytes = hw_constraints.extra_data()
            if extra_data_bytes:
                extra_data_str = ''.join(chr(b) for b in extra_data_bytes)
                extra_data_dict = json.loads(extra_data_str)
                if "hf_token" in extra_data_dict:
                    hf_token = extra_data_dict["hf_token"]
            if hf_token is None:
                raise Exception("HF token was not provided. Please set the HF_TOKEN environment variable.")

            model, tokenizer, input = load_any_model(
                ml_model.model(),
                hf_token=hf_token,
                low_cpu_mem_usage=True,
                torch_dtype=torch.float16
            )
            print(f"Model, Tokenizer and Input loaded successfully")
            print(f"Model: {model}")
            print(f"Tokenizer: {tokenizer}")
            print(f"Input: {input}")

            model.eval()  # Put model in evaluation / inference mode

            # noinspection PyUnresolvedReferences
            upmem_layers.profiler_start(layer_mapping)
            # In case we want to time the original execution (comment out profiler_start)
            # start = time.time_ns()

            try:
                output = model.generate(
                    **input, do_sample=True, temperature=0.9, min_length=64, max_length=64
                )
            except Exception as e_gen:
                print(f"Error generating output with generate: {e_gen}. Trying forward instead.")
                try:
                    output = model(**input, bool_masked_pos=False)
                except Exception as e_model:
                    print(f"Error generating output using model: {e_model}")
                    raise Exception from e_model

            # noinspection PyUnresolvedReferences
            upmem_layers.profiler_end()

        except Exception as e:
            import traceback
            traceback.print_exc()
            print(f"Error testing model on hardware: {e}")
            print(f"Please provide different model")
            hw.hw_description("Error")
            hw.power_consumption(0.0)
            hw.latency(0.0)
            error_message = "Failed to test model on hardware: " + str(e)
            error_info = {"error": error_message}
            encoded_error = json.dumps(error_info).encode("utf-8")
            hw.extra_data(encoded_error)
            return

    hw.hw_description(hw_selected)
    hw.power_consumption(upmem_layers.profiler_get_power_consumption())
    hw.latency(upmem_layers.profiler_get_latency())
    print(f"Power Consumption: {upmem_layers.profiler_get_power_consumption():.8f} W")
    print(f"Latency: {upmem_layers.profiler_get_latency()} ms")

# User Configuration Callback implementation
# Inputs: req
# Outputs: res
def configuration_callback(req, res):

    # Callback for configuration implementation here

    # Dummy JSON configuration and implementation
    dummy_config = {
        "param1": "value1",
        "param2": "value2",
        "param3": "value3"
    }
    res.configuration(json.dumps(dummy_config))
    res.node_id(req.node_id())
    res.transaction_id(req.transaction_id())
    res.success(True)
    res.err_code(0) # 0: No error || 1: Error

# Main workflow routine
def run():
    node = HardwareResourcesNode(callback=task_callback, service_callback=configuration_callback)
    global running
    running = True
    node.spin()

# Call main in program execution
if __name__ == '__main__':
    signal.signal(signal.SIGINT, signal_handler)

    """Python does not process signals async if
    the main thread is blocked (spin()) so, tun
    user work flow in another thread """
    runner = threading.Thread(target=run)
    runner.start()

    while running:
        time.sleep(1)

    runner.join()