HTTP Request node forward on receive

Hello,

I'm trying to get the HTTP Request node to send out multiple payloads when a HTTP request is sent.

The payloads arrive as individual JSON objects every 100ms or so apart from a single request. They are small enough to not require multiple payloads and so are parsable.

Example:

$ curl http://192.1.2.59:11434/api/generate -d '{"model": "tinyllama","prompt": "Why is the sky not red?","options":{"num_ctx":64,"top_k":1,"top_p":0.1,"temperature": 0.1,"mirostat_tau":1.0}}'
{"model":"tinyllama","created_at":"2024-02-04T11:16:21.232303888Z","response":"The","done":false}
{"model":"tinyllama","created_at":"2024-02-04T11:16:21.285452561Z","response":" sky","done":false}
{"model":"tinyllama","created_at":"2024-02-04T11:16:21.338583188Z","response":" is","done":false}
{"model":"tinyllama","created_at":"2024-02-04T11:16:21.392181616Z","response":" not","done":false}
{"model":"tinyllama","created_at":"2024-02-04T11:16:21.445267597Z","response":" red","done":false}
{"model":"tinyllama","created_at":"2024-02-04T11:16:21.498385753Z","response":",","done":false}
... etc
{"model":"tinyllama","created_at":"2024-02-04T11:16:26.559776889Z","response":"","done":true,"context":[529, ...etc],"total_duration":7110858147,"load_duration":5946278,"prompt_eval_count":38,"prompt_eval_duration":1825929000,"eval_count":93,"eval_duration":5274177000}

Unfortunately, the HTTP Request node only outputs once the connection is completed and closed. I was wondering if there was something in the msg.whatever that could get it to output as it receives?

I could probably do this with a function node with fetch, readableStream.getReader, and node.send(), but was hoping it was possible with the request node.

Hi @Slyke,

I don't believe the Request Node supports a stream - which sounds like what this is.
there are nodes that support SSE - but it may not be compatible with your target if its not SSE.

In the past I have achieved this with a TCP Request Node.

Send a formatted HTTP POST using the TCP Request Node
( set Close to never - keep connection open)

msg.payload to the TCP request node (A HTTP Request)
ensure the TCP Request Node is connecting to 192.1.2.59:11434

POST /api/generate HTTP/1.1
Host: 192.1.2.59:11434
Content-Type: application/json
Content-Length: 21

{"model":"tinyllama"}

The TCP Node should continue to deliver payloads after the request.
what we are doing here is constructing our own HTTP Request using RAW TCP

That worked!

Here's my solution:

Function Node

// https://github.com/jmorganca/ollama/blob/main/docs/api.md
// https://github.com/ollama/ollama/blob/main/docs/modelfile.md
const llmOptions = {
    // "num_keep": 5,
    // "seed": 42,
    // "num_predict": 100,
    "top_k": 1,
    "top_p": 0.1,
    // "tfs_z": 0.5,
    // "typical_p": 0.7,
    // "repeat_last_n": 33,
    "temperature": 0.1,
    // "repeat_penalty": 1.2,
    // "presence_penalty": 1.5,
    // "frequency_penalty": 1.0,
    // "mirostat": 1,
    "mirostat_tau": 0.1,
    // "mirostat_eta": 0.6,
    // "penalize_newline": true,
    // "stop": ["\n", "user:"],
    // "numa": false,
    "num_ctx": 16,
    // "num_batch": 2,
    // "num_gqa": 1,
    // "num_gpu": 1,
    // "main_gpu": 0,
    // "low_vram": false,
    // "f16_kv": true,
    // "vocab_only": false,
    // "use_mmap": true,
    // "use_mlock": false,
    // "embedding_only": false,
    // "rope_frequency_base": 1.1,
    // "rope_frequency_scale": 0.8,
    // "num_thread": 8
};

const llmRequest = {
    "model": "tinyllama",
    "prompt": msg.payload,
    // stream: false,
    options: llmOptions
};

// For debugging purposes.
msg.llmRequest = llmRequest;
msg.startTime = new Date().getTime();

const httpPayload = JSON.stringify(llmRequest);
const httpPayloadLength = Buffer.byteLength(httpPayload);

msg.host = '192.1.2.59';
msg.port = 11434;
msg.payload = `POST /api/generate HTTP/1.1
Host: ${msg.host}:${msg.port}
Content-Type: application/json
Content-Length: ${httpPayloadLength}

${httpPayload}`;

return msg;

1 Like

Just for reference, here's my implementation of parsing NDJSON stream. It's just cobbled together, and I'm sure there's tonnes of edge cases, but it seems to work well with Ollama LLM. You can connect this directly to the output of the TCP node.

const messageQueue = flow.get('tcpMessageQueue', 'memoryOnly') ?? {};

const parseRawHttpResponse = (rawData, processHeaders = true) => {
    let headers = {};
    let bodyData = [];
    let contentLength = 0;

    if (processHeaders) {
        const [rawHeaders, body] = rawData.split('\r\n\r\n', 2);
        rawHeaders.split('\r\n').forEach((line, index) => {
            if (index === 0) {
                headers['status-line'] = line;
            } else {
                const [key, value] = line.split(': ');
                headers[key.toLowerCase()] = value;
            }
        });

        if (headers['content-type'] !== 'application/x-ndjson') {
            throw new Error('Unsupported content type: ' + headers['content-type']);
        }

        if (body) {
            const match = body.match(/^([a-fA-F0-9]+)(\r\n|\r|\n)/);
            if (match) {
                contentLength = parseInt(match[1], 16);
                bodyData = body.substring(match[0].length);
            } else {
                throw new Error(`Content length format error or missing in body: ${body}`);
            }
        }
    } else {
        const match = rawData.match(/^([a-fA-F0-9]+)(\r\n|\r|\n)/);
        if (match) {
            contentLength = parseInt(match[1], 16);
            bodyData = rawData.substring(match[0].length);
        } else {
            throw new Error(`Content length format error or missing in rawData: ${rawData}`);
        }
    }
    
    bodyData = bodyData.replace(/(\r\n|\r|\n)0(\r\n|\r|\n)*$/, ''); // Strip end of stream.
    return { headers, body: bodyData, contentLength };
};

// First message contains HTTP headers, every subsequent message just contains content length (in hex) and JSON.
let headerPayload = false;
if (!Array.isArray(messageQueue?.[msg._msgid] ?? false)) {
    headerPayload = true;
    messageQueue[msg._msgid] = [];
}

const responseObject = parseRawHttpResponse(msg?.payload, headerPayload);
let payloadJson = '';
try {
    payloadJson = JSON.parse(responseObject.body);
} catch(err) {
    throw new Error(`${err}: '${responseObject?.body}'`);
}

messageQueue[msg._msgid].push(payloadJson);

const llmMessageCompleted = payloadJson?.done ?? false;
const messageBuffer = messageQueue[msg._msgid].map((msgIndex) => {
    return msgIndex.response;
}).join('');

msg.messageQueue = messageQueue;
flow.set('tcpMessageQueue', messageQueue, 'memoryOnly');
msg.payload = messageBuffer;

// Discord API seems to rate limit to 3 API requests every second. So if you are editing the message as the stream comes in, you may want to combine multiple payload streams into 1 message. This code outputs as it gets them, and so the messages will appear very slowly on Discord.
// Comment this out if you don't want it any messages combined
const combineEveryXMessages = 16;
if (((messageQueue[msg._msgid].length % combineEveryXMessages) !== 0) && !llmMessageCompleted) {
    return null;
}
// It would be possible to combine every message received after the first 3 (in each second) in a buffer and then spit them out after the second has passed, but I think that will complicate the process.


if (llmMessageCompleted) {
    delete messageQueue[msg._msgid];
    flow.set('tcpMessageQueue', messageQueue, 'memoryOnly');
}

return msg;

Nice, I used this approach to play with a local Mistral model. Now there are some funny things happening with the D2 elements .. maybe the model has control over my Node-RED session :scream: :rofl:

6781af9d-68af-4fb9-8ed1-e8aee245f49f

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.