Rotary phone, RPi5, STT & Ollama for an offline quirky assistant with TTS output - part 3

As shown HERE (Part 1) & HERE (Part 2), I've been fiddling around with an old rotary phone, adding some switches and servo and hooking it up to a Raspberry Pi 5 in order to do some Speech to Text processing, some local / offline LLM processing and some Text to Speech output.


Well, I've been faffing around with this on & off for a short while and I've now finally got the hardware & the software doing what it needs to do.  It is version 1.0, as in, it is in Python, however I do have it all setup to be able to drop down to C coding and see if that actually makes things faster / more efficient etc..


So, what was the plan?

I wanted to take an old rotary phone, attempt to leave the externals as standard as possible, but make it so that a person can pick up the handset, dial a number, "ask a question", put the phone handset back down and then have the phone ring when an answer is ready, the person picks the handset back up and the answer is spoken to the person through the speaker in the handset.  simplez.

Hardware:







I decided to hook up a microswitch to the rocker switch to detect when the handset was picked up / placed back down - the placement was an evolution of about 5 different places, but I think this final place worked out the best and most consistent.

I took apart THIS microphone from the casing, stripped it right down to the basics and along with a bit of blue-tack it fitted straight into the handset, I just had to desolder the connectors to get the wire fed through and then resoldered, job done.



The speaker was a bonus find, it was in a drawer from a previous robot project, wouldn't you know it was a perfect fit!  It just needed soldering up and wires fed through the handset - again, fortunately I had a 3.5mm cable that I could nicely cut up and re-purpose.

To hook the speaker up to the Raspberry Pi, I did have to purchase a small USB device to convert the 3.5mm jack into USB as the RPi5 does not have a 3.5mm socket anymore.  The first device I used (in photos above) was way too chunky and prevented other USBs from being connected properly, I ended up getting THIS ONE instead - worked great and gave more room / space.






Software:

The RPi 5 itself is running Ubuntu 24.04, with whisper.cpp installed as explained in an earlier posting.

As I was investigating talk-llama from llama.cpp I installed that and it's dependencies too, however, when I compiled & ran the code the model usage for STT and then LLM did take a long time to execute - I think that might have been the way the way the models were being used rather than the code itself.  I'll investigate at some point.

I ended up just using basic Python code in the end with off the shelf libraries - I did however cheekily cheat when it came to controlling the servo (that dings the bell), I just lifted the servo.py from this github REPO.


import RPi.GPIO as GPIO

import servo #local lib file in same folder as this code

import time

import subprocess

from subprocess import call

import sys

import os

import sounddevice as sd #pip3 install sounddevice | sudo apt-get install libportaudio2

import tempfile

import wave              #pip3 install wave

from langchain_community.llms import Ollama


GPIO.setwarnings(False)

GPIO.setmode(GPIO.BCM) #Use GPIO pin number

#HANDSET

GPIO.setup(23, GPIO.OUT)

GPIO.output(23, GPIO.HIGH)  #HIGH / LOW

#DIAL

GPIO.setup(16, GPIO.OUT)

GPIO.output(16, GPIO.HIGH)  #HIGH / LOW

start_state1 = GPIO.input(23)

print('initialise GPIO pin HANDSET')

print(start_state1) #should be set to 1 as handset is down

start_state2 = GPIO.input(16)

print('initialise GPIO pin DIAL')

print(start_state2) #should be set to 1 as dial not triggered


def process_audio(wav_file, model_name="base.en"): #small.en

    """

    Processes an audio file using a specified model and returns the processed string.

    :param wav_file: Path to the WAV file

    :param model_name: Name of the model to use

    :return: Processed string output from the audio processing

    :raises: Exception if an error occurs during processing

    """

    model = f"/home/tony/dev/whisper.cpp/models/ggml-{model_name}.bin"

    # Check if the file exists

    if not os.path.exists(model):

        raise FileNotFoundError(f"Model file not found: {model} \n\nDownload a model with this command:\n\n> bash ./models/download-ggml-model.sh {model_name}\n\n")

    if not os.path.exists(wav_file):

        raise FileNotFoundError(f"WAV file not found: {wav_file}")

    full_command = f"/home/tony/dev/whisper.cpp/main -m {model} -f {wav_file} -np -nt"

    # Execute the command

    process = subprocess.Popen(full_command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # Get the output and error (if any)

    output, error = process.communicate()

    if error:

        raise Exception(f"Error processing audio: {error.decode('utf-8')}")

    # Process and return the output string

    decoded_str = output.decode('utf-8').strip()

    processed_str = decoded_str.replace('[BLANK_AUDIO]', '').strip()

    print('this is what we believed you asked:')

    print(processed_str)

    

    llm = Ollama(model="tinyllama") #

#    llm = Ollama(model="phi3:mini") #much more detailed responses, will take a few minutes to process though - does hit the limits of RAM for the RPi5 and spikes the 4xCPUs to 100%

    llmresult = llm.invoke(processed_str)

    print('and the llm answered your question with:')

    print(llmresult)

    

    #here we need to do the Bell ringing, when the person picks up phone

    #we stop the bell ringing and invoke the espeak

    s = servo.AngularServo(18,min_us=200,max_us=2200,max_angle=200)

    try:

      while True:

        angle = 90

        s.angle(angle)

        angle = 0

        s.angle(angle)

        #check GPIO pin for handset lift up

        state1 = GPIO.input(23)

        if(state1 == 1):

          print('handset picked up')

          break

    except Exception as e:

      print(f"Error: {e}")

    

    print('about to do espeak')

#    command = ['espeak', processed_str]

#    command = "espeak -v en-uk-north+m -s 225 -p 50 -a 200 -g 5 -k 5 '"+processed_str+"'"

    # -v <voice-name>

    # -s <speed of words per minute>

    # -p <pitch adjustment>

    # -a amplitude

    # -g <word gap>

    # -k <indicate capital letters pitch increase>

#execute the command

#    print(command)

#    result = subprocess.run(command, capture_output=True, text=True)

    call(["espeak", "-v","en-uk-north+m","-s","225","-p","50","-a","200","-g","5","-k","5", llmresult])     

    print('finished the espeak')

    return



def callback(indata, frames, time, status):

    #raise for status if required

    if status:

        print(status)

    #create a tempfile to save the audio to, with autodeletion

    with tempfile.NamedTemporaryFile(delete=True, suffix='.wav', prefix='audio_', dir='.') as tmpfile:

    #save the 5 second audio to a .wav file

        with wave.open(tmpfile.name, 'wb') as wav_file:

            wav_file.setnchannels(1)     #mono-audio

            wav_file.setsampwidth(2)     #16-bit audio

            wav_file.setframerate(16000) #sample rate

            wav_file.writeframes(indata)

        #prepare the output filename

        output_filename = tmpfile.name.replace('.wav', '')

        #now convert wav to txt file & get text

        try:

          print('about to process_audio')

          process_audio(tmpfile.name, "base.en")

        except Exception as e:

          print(f"Error: {e}")


def main():

  #loop and use the handset up and down to trigger recordings

  while True: #loop forever

    #get initial states

    state1 = GPIO.input(23)

    state2 = GPIO.input(16)

    if(state1 == 1):

#can comment next line

      print('handset picked up')

      if(state2 == 0): #1 = not dialed, 0 = dialed

#can comment next line

        print('dial triggered - now start recording')

        #start recording with a rolling 5-second buffer

        with sd.InputStream(callback=callback, dtype='int16', channels=1, samplerate=16000, blocksize=16000*5):

          print("Recording...to stop hang up!")

          while True:

            #do a check within this loop to determine the handset state

            state1 = GPIO.input(23)

            if(state1 == 0):

#can comment next lines

              print('handset put back down')

              print('therefore stop recording')

              print(state2)

              break

    else:

#can comment next line

      print('handset is down')

    #wait 1 second before reading state again

    time.sleep(1)


if __name__ == "__main__":

    main()

#    GPIO.cleanup()


As I said before, this was "prove it works" code, it's a bit clunky, but it does the job.

I've not catered for non-happy path, so you can screw it up.  I've also not set the code to auto-start up when the RPi starts up, so you still have to plug in a keyboard/mouse & screen to start the code and verify the output.  That can be for version 2.0.

Here are some example debug outputs:



and a video of me testing the STT and using espeak as an output:



I'll try and do a better video that actually shows the whole thing working, matching the use case described earlier - I assure you it does do everything, I just need to find a way to record it from my brick of a phone!


Novelty points = 100. Am happy.


Comments