Companion Computer Setup for Flight Controller 2024.pdf
def tracking():
while True:
objects = huskyLens.requestAll()
for item in objects:
x = item['x']
y = item['y']
# Initialize velocity components
vx = 0
vy = 0
vz = 0
# Adjust horizontal velocity based on x position
if x > 165:
vy = 1 # Move RIGHT
elif x < 155:
vy = -1 # Move LEFT
# Adjust forward/backward velocity based on y position
if y < 115:
vx = 1 # Move FORWARD
elif y > 125:
vx = -1 # Move BACKWARD
# If the object is within the desired range, stop the vehicle
if 155 <= x <= 165 and 115 <= y <= 125:
vx = 0
vy = 0
print("Target reached")
# Send the velocity command
send_ned_velocity(vx, vy, vz)
update dronekit init.py .abc.mutablemapping
mavproxy.py --master=/dev/ttyAMA0 --baudrate 921600 --out 127.0.0.1:14550
# Description: This code is used to detect the victim using
# the TFLite model and stream the video with the bounding box
# around the victim. The drone will follow the victim by
# moving towards the victim's center.
## IMPORT LIBRARY
# FLASK SERVER TO STREAM VIDEO
from flask import Flask, Response, render_template_string
# LIBRARY TO RECORD VIDEO
from picamera2 import Picamera2, Preview
from libcamera import controls
from PIL import Image
# LIBRARY TO GIVE COORDINATES
import numpy as np
# LIBRARY TO DRAW BOUNDING BOXES
import cv2
# LIBRARY TO LOAD TFLITE MODEL
import tflite_runtime.interpreter as tflite
# DRONEKIT LIBRARY TO CONNECT TO DRONE
from dronekit import connect, VehicleMode, LocationGlobalRelative
from pymavlink import mavutil
# OTHER NECESSARY LIBRARIES
import logging
import time
import signal
import sys
# CONNECT TO DRONE VEHICLE
vehicle = connect('127.0.0.1:14550', baud=921600, wait_ready=True)
vehicle.parameters.set('PLND_ENABLED', 1)
vehicle.parameters.set('PLND_TYPE', 1)
vehicle.parameters.set('PLND_EST_TYPE', 0)
# ARM AND TAKEOFF FUNCTION
def arm_and_takeoff(altitude):
while not vehicle.is_armable:
print("waiting to be armable")
time.sleep(1)
print("Arming motors")
vehicle.mode = VehicleMode("GUIDED")
vehicle.armed = True
while not vehicle.armed: time.sleep(1)
print("Taking Off")
vehicle.simple_takeoff(altitude)
while True:
v_alt = vehicle.location.global_relative_frame.alt
print(">> Altitude = %.1f m"%v_alt)
if v_alt >= altitude - 1.0:
print("Target altitude reached")
break
time.sleep(1)
# FUNCTION TO MOVE THE DRONE AT VELOCITY
def send_ned_velocity(vehicle, vx, vy, vz, duration, MV_Status):
vehicle.mode = VehicleMode("GUIDED")
msg = vehicle.message_factory.set_position_target_local_ned_encode(
0,
0, 0,
9,
1479,
0, 0, 0, # POSITION m
vx, vy, vz, # VELOCITY m/s
0, 0, 0, # ACCELERATIONS m/s^2
0, 0
)
for x in range(0, duration):
vehicle.send_mavlink(msg)
print(MV_Status)
time.sleep(1)
def condition_yaw(vehicle, radian, duration, MV_Status):
vehicle.mode = VehicleMode("GUIDED")
msg = vehicle.message_factory.set_position_target_local_ned_encode(
0,
0, 0,
9,
2503,
0, 0, 0, # POSITION m
0, 0, 0, # VELOCITY m/s
0, 0, 0, # ACCELERATIONS m/s^2
radian, 0
)
for x in range(0, duration):
vehicle.send_mavlink(msg)
print(MV_Status)
time.sleep(1)
def square_movement():
"""Move the drone in a square pattern."""
print("Executing square movement pattern.")
for _ in range(4): # Repeat 4 times for a square
# Move forward
send_ned_velocity(vehicle, 1, 0, 0, duration=3) # Move forward for 3 seconds
# Turn right (yaw)
radian = 1.5708
condition_yaw(vehicle, radian, 1, MV_Status="YAW TO THE RIGHT") # Rotate by 90 degrees TO RIGHT
time.sleep(5) # Wait a second for yaw to complete
def close_drone_connection(signum, frame):
"""Handles closing the drone connection gracefully."""
vehicle.Mode = VehicleMode("RTL")
print("Closing drone connection...")
if vehicle.armed:
print("Disarming vehicle...")
vehicle.armed = False
while vehicle.armed:
time.sleep(1)
vehicle.close()
print("Drone connection closed.")
sys.exit(0)
app = Flask(__name__)
# Load TensorFlow Lite model with specified number of threads
print("AI MODEL IS INITIATING")
model_path = "/home/sedna/Downloads/PROJECT_IRIS_V1/IRIS.tflite" # DIRECTORY PATH OF TFLITE MODELS
label_path = "/home/sedna/Downloads/PROJECT_IRIS_V1/labelmap.txt" # DIRECTORY PATH OF LABELMAP FILE
num_threads = 4 # Modify this value to control the number of threads
print("Loading model:", model_path)
print("Using", num_threads, "threads")
print("Loading labels:", label_path)
print("Using", num_threads, "threads")
if model_path is None:
print("Model not found!")
exit()
elif label_path is None:
print("Label map not found!")
exit()
else :
print("AI MODEL IS SUCCESSFULLY INITIATED")
# Use 'Interpreter' with 'num_threads' option
interpreter = tflite.Interpreter(model_path=model_path, num_threads=num_threads)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
# Load labels
with open(label_path, 'r') as f:
labels = [line.strip() for line in f.readlines()]
# Setup PiCamera2 with autofocus enabled
print("CAMERA IS INITIATING")
picam2 = Picamera2()
picam2.set_controls({"AfMode":controls.AfModeEnum.Continuous,"AfSpeed":controls.AfSpeedEnum.Fast,"AfRange":controls.AfRangeEnum.Full,"AeExposureMode":controls.AeExposureModeEnum.Short})
camera_config = picam2.create_preview_configuration(main={"format": "RGB888", "size": (1280, 720)})
picam2.configure(camera_config)
picam2.start()
print("CAMERA IS SUCCESSFULLY INITIATED")
# ARM AND TAKEOFF THE DRONE
print("DRONE IS ARMING AND TAKING OFF")
arm_and_takeoff(10)
vehicle.flush()
'''
### THIS CODE IS READY FOR MOVING TO GPS POINT
print("Going towards first point for 30 seconds ...")
point1 = LocationGlobalRelative(5.147264589053249, 100.49392067963265, 20)
vehicle.simple_goto(point1)
'''
# Register the signal handler for keyboard interrupt
signal.signal(signal.SIGINT, close_drone_connection)
# Set verbosity
verbose = True # Change this to False to reduce logging output
if verbose:
logging.basicConfig(level=logging.INFO)
else:
logging.basicConfig(level=logging.WARNING)
def preprocess_image(image):
"""Resize and preprocess image for the TFLite model."""
# Ensure the input shape matches the model's input dimensions
input_shape = input_details[0]['shape'][1:3] # [300, 300]
# Convert the image to RGB format (Pillow's default is RGB)
image = Image.fromarray(image).convert('RGB')
# Resize the image to the expected input size
image = image.resize(input_shape, Image.LANCZOS) # Pillow 10 uses LANCZOS for high-quality downsampling
# Convert image to numpy array and expand dimensions to match [1, 300, 300, 3]
input_tensor = np.expand_dims(np.array(image), axis=0)
# Ensure the image is in uint8 format as required by the model
#input_tensor = input_tensor.astype(np.uint8)
input_tensor = input_tensor.astype(np.float32)
return input_tensor
# Configure logging
logging.basicConfig(level=logging.DEBUG)
def detect_objects(image, overlap_thresh=0.5):
"""Run object detection on the input image with thresholding and NMS."""
input_tensor = preprocess_image(image)
interpreter.set_tensor(input_details[0]['index'], input_tensor)
interpreter.invoke()
# Check output layer name to determine if this model was created with TF2 or TF1,
# because outputs are ordered differently for TF2 and TF1 models
outname = output_details[0]['name']
if ('StatefulPartitionedCall' in outname): # This is a TF2 model
boxes_idx, classes_idx, scores_idx = 1, 3, 0
else: # This is a TF1 model
boxes_idx, classes_idx, scores_idx = 0, 1, 2
# Retrieve and copy the output tensors immediately to avoid the reference issue
boxes = interpreter.get_tensor(output_details[boxes_idx]['index'])[0].copy() # Bounding boxes
classes = interpreter.get_tensor(output_details[classes_idx]['index'])[0].copy() # Class IDs
scores = interpreter.get_tensor(output_details[scores_idx]['index'])[0].copy() # Confidence scores
# Set a confidence threshold
confidence_threshold = 0.3
selected_indices = np.where(scores > confidence_threshold)
# If no objects exceed the confidence threshold, skip detection
if len(selected_indices[0]) == 0:
return [], [], []
# Filter boxes, scores, and classes
boxes = boxes[selected_indices]
scores = scores[selected_indices]
classes = classes[selected_indices]
# Convert bounding boxes to pixel coordinates
bounding_boxes = []
for box in boxes:
ymin, xmin, ymax, xmax = box
x = int(xmin * image.shape[1])
y = int(ymin * image.shape[0])
width = int((xmax - xmin) * image.shape[1])
height = int((ymax - ymin) * image.shape[0])
bounding_boxes.append([x, y, width, height])
# Apply NMS
indices = cv2.dnn.NMSBoxes(
bounding_boxes, scores.tolist(),
confidence_threshold, overlap_thresh
)
if len(indices) > 0:
indices = indices.flatten()
bounding_boxes = np.array([bounding_boxes[i] for i in indices])
scores = np.array([scores[i] for i in indices])
classes = np.array([classes[i] for i in indices])
return bounding_boxes, scores, classes
def annotate_image(image, boxes, scores, classes):
"""Annotate the image with detection results."""
centers = []
highest_score = -1
highest_score_index = -1
for i in range(len(boxes)):
x, y, w, h = boxes[i]
start_point = (x, y)
end_point = (x + w, y + h)
# Draw bounding boxes for all detections
cv2.rectangle(image, start_point, end_point, (0, 255, 0), 2)
object_name = labels[int(classes[i])] # Look up object name from "labels" array using class index
label = f"{object_name}: {scores[i]:.2f}"
cv2.putText(image, label, (start_point[0], start_point[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
# Calculate the center of the bounding box
center_x = x + w // 2
center_y = y + h // 2
centers.append((center_x, center_y))
# Track the highest scoring detection
if scores[i] > highest_score:
highest_score = scores[i]
highest_score_index = i
# Return the modified image, all centers, and the center of the highest-scoring detection
return image, centers, centers[highest_score_index] if highest_score_index != -1 else None
# Define the target framerate (e.g., 10 frames per second)
target_fps = 10
frame_time = 1.0 / target_fps
# Global variable to store the latest coordinates
latest_centers = []
def follow_target(centers, scores, last_detection_time):
"""Function to follow the target with the highest score."""
current_time = time.time()
if len(centers) > 0:
# RESET THE DETECTION TIME SINCE WE HAVE A DETECTION
# Get the center coordinates of the highest-scoring detection
# Find the index of the highest-scoring detection
max_score_idx = np.argmax(scores) # Get the index of the highest score
center_x, center_y = centers[max_score_idx] # Use the center of the highest-scoring target
# Set default velocities
vx = 0
vy = 0
vz = 0
# DEFINE THE IMAGE WIDTH AND HEIGHT
image_width = 1280
image_height = 720
# DEFINE THE CENTER TOLERANCE RANGES FOR X AND Y
x_min = image_width // 2 - 10
x_max = image_width // 2 + 10
y_min = image_height // 2 - 10
y_max = image_height // 2 + 10
'''center_x is the x-coordinate of the center of the bounding box,
center_y is the y-coordinate of the center of the bounding box'''
# Adjust horizontal velocity based on the x position (left/right)
# MOVE RIGHT
if center_x > x_max:
vy = 1
vz = 0
# MOVE LEFT
elif center_x < x_min:
vy = -1
vz = 0
# Adjust forward/backward velocity based on the y position
# MOVE FORWARD
if center_y < y_min:
vx = 1
vz = 0
# MOVE BACKWARD
elif center_y > y_max:
vx = -1
vz = 0
if (center_x > x_min and center_x < x_max) and (center_y > y_min and center_y < y_max):
vx = 0
vy = 0
vz = 0
vehicle.mode = VehicleMode("LOITER")
# Send the velocity command to follow the target
send_ned_velocity(vehicle, vx, vy, vz, duration=1, MV_Status="Following Target")
'''
else:
# IF NO TARGET IS DETECTED FOR 15 SECONDS, SWITCH TO SQUARE MOVEMENT
if current_time - last_detection_time > 15:
square_movement()
else:
# If no target is detected, switch the drone to Loiter mode
print("No victim detected, switching to Loiter mode.")
vehicle.mode = VehicleMode("LOITER")
'''
return last_detection_time
def generate_frames():
"""Generate frames from the camera and run object detection."""
global latest_centers # Track the latest centers for coordinate updates
last_detection_time = time.time() # Initialize detection time to current time
while True:
frame = picam2.capture_array()
# Detect objects in the frame
boxes, scores, classes = detect_objects(frame)
# Annotate the frame with detection results
annotated_frame, centers = annotate_image(frame, boxes, scores, classes)
# Update the global centers list for coordinate display
latest_centers = centers
# Follow the highest-scoring target or execute square movement
last_detection_time = follow_target(centers, scores, last_detection_time)
# Encode the frame in JPEG format
ret, buffer = cv2.imencode('.jpg', annotated_frame)
if not ret:
continue # Skip this frame if encoding fails
frame = buffer.tobytes()
# Concatenate frame to a streaming-compatible format
yield (b'--frame\\r\\n'
b'Content-Type: image/jpeg\\r\\n\\r\\n' + frame + b'\\r\\n')
HTML_TEMPLATE = "/home/sedna/Drone_code/HTML_TEMPLATE.txt"
@app.route('/video_feed')
def video_feed():
return render_template_string(HTML_TEMPLATE)
@app.route('/video_feed_stream')
def video_feed_stream():
return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/')
def index():
return "Video Streaming with Object Detection. Go to /video_feed to view the stream."
@app.route('/get_coordinates')
def get_coordinates():
global latest_centers
centers_str = ', '.join([f"({x},{y})" for x, y in latest_centers])
return centers_str
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=False)
follow_target()
appears simple but lacks robustness. It only adjusts velocity based on whether the target is slightly off-center. This could result in excessive drone movement if the target frequently moves slightly off-center due to noise in object detection. Adding a damping factor or a proportional control (PID control) would improve the stability of the drone's movements.follow_target()
function, the code computes a fixed boundary of 10 pixels around the center of the frame. This could be problematic because, depending on the resolution, 10 pixels may not provide sufficient tolerance. A more dynamic range based on the image dimensions (e.g., a percentage of the frame width) would be more adaptive.close_drone_connection
is registered to handle interrupts (e.g., Ctrl+C
). However, using vehicle.Mode = VehicleMode("RTL")
should be corrected to vehicle.mode = VehicleMode("RTL")
.'127.0.0.1:14550'
) indicates that the code expects a simulated or local drone. For real drones, ensure that the correct port and connection settings are used, and that connection timeouts are properly handled.LOITER
, GUIDED
). The drone's current mode should be checked before attempting to change it, to avoid unnecessary transitions and potential failures.The code is well-structured and should function as expected, given that the correct model and labels are provided. However, improvements in movement control, mode handling, error handling, and multi-threading would enhance reliability and performance.
## IMPORT LIBRARY
from flask import Flask, Response, render_template_string
from picamera2 import Picamera2, Preview
from libcamera import controls
from PIL import Image
import numpy as np
import cv2
import tflite_runtime.interpreter as tflite
from dronekit import connect, VehicleMode, LocationGlobalRelative
from pymavlink import mavutil
import logging
import time
import signal
import sys
import threading
import queue
import keyboard # For detecting emergency stop
# Set up logging with timestamps for better debugging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s [%(levelname)s] %(message)s')
# GLOBAL VARIABLES
vehicle = None
latest_centers = []
emergency_stop = False # Global flag for emergency stop
# THREAD-SAFE QUEUE FOR FRAME PROCESSING
frame_queue = queue.Queue(maxsize=5)
# FLASK APP HTML TEMPLATE
HTML_TEMPLATE = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>SKYSCANNER</title>
<style>
@import url('<https://fonts.googleapis.com/css2?family=Orbitron:wght@400;700&display=swap>');
body {
background: url('<https://wallpapers.com/images/high/4k-anime-space-jrdma2osexe3xo8v.webp>') no-repeat center center fixed;
background-size: cover;
color: #00ff00;
font-family: 'Orbitron', sans-serif;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
height: 100vh;
margin: 0;
overflow: hidden;
}
h1 {
font-size: 3em;
margin-bottom: 20px;
z-index: 1;
text-shadow: 2px 2px 4px #000000;
}
.container {
display: flex;
flex-direction: row;
z-index: 1;
}
.video-container {
border: 5px solid #00ff00;
padding: 10px;
background-color: rgba(0, 0, 0, 0.5);
}
.video-container img {
width: 100%;
height: auto;
}
.coordinates {
margin-left: 20px;
color: #00ff00;
font-size: 1.5em;
background-color: rgba(0, 0, 0, 0.7);
padding: 10px;
border-radius: 10px;
overflow-y: auto;
max-height: 500px; /* Adjust as needed */
}
.footer {
margin-top: 20px;
font-size: 1.2em;
text-shadow: 2px 2px 4px #000000;
}
</style>
</head>
<body>
<h1>SKYSCANNER LIVE STREAM</h1>
<div class="container">
<div class="video-container">
<img src="{{ url_for('video_feed_stream') }}" alt="Video Stream">
</div>
<div class="coordinates" id="coordinates"></div>
</div>
<div class="footer">
<p>Video Streaming with Victim Detection</p>
</div>
<script>
function fetchCoordinates() {
fetch('{{ url_for('get_coordinates') }}')
.then(response => response.text())
.then(data => {
document.getElementById('coordinates').innerText = data;
});
}
setInterval(fetchCoordinates, 1000); // Fetch coordinates every 1 second
</script>
</body>
</html>
"""
# CONNECT TO DRONE VEHICLE (WITH ERROR HANDLING)
def connect_to_vehicle():
global vehicle
try:
vehicle = connect('127.0.0.1:14550', baud=921600, wait_ready=True)
vehicle.parameters.set('PLND_ENABLED', 1)
vehicle.parameters.set('PLND_TYPE', 1)
vehicle.parameters.set('PLND_EST_TYPE', 0)
logging.info("Drone connected successfully.")
except Exception as e:
logging.error(f"Failed to connect to vehicle: {e}")
sys.exit(1)
# ARM AND TAKEOFF FUNCTION (WITH ERROR HANDLING)
def arm_and_takeoff(altitude):
try:
while not vehicle.is_armable:
logging.info("Waiting for vehicle to become armable...")
time.sleep(1)
logging.info("Arming motors")
vehicle.mode = VehicleMode("GUIDED")
vehicle.armed = True
while not vehicle.armed:
time.sleep(1)
logging.info("Taking off")
vehicle.simple_takeoff(altitude)
while True:
v_alt = vehicle.location.global_relative_frame.alt
logging.info(f">> Altitude = {v_alt:.1f} m")
if v_alt >= altitude - 1.0:
logging.info("Target altitude reached")
break
time.sleep(1)
except Exception as e:
logging.error(f"Error during takeoff: {e}")
# FUNCTION TO HANDLE RTL MODE IN CASE OF EMERGENCY
def emergency_rtl():
global emergency_stop
logging.info("Emergency RTL activated")
emergency_stop = True
vehicle.mode = VehicleMode("RTL")
while vehicle.armed:
vehicle.armed = False
time.sleep(1)
logging.info("Drone safely landed and disarmed.")
# DETECT EMERGENCY KEY PRESS ('r' FOR RETURN-TO-LAUNCH)
def check_emergency_key():
global emergency_stop
while True:
if keyboard.is_pressed('r'):
logging.info("Emergency stop triggered (Key 'r' detected). Switching to RTL.")
emergency_rtl()
break
time.sleep(0.1)
# FUNCTION TO MOVE THE DRONE (WITH ERROR HANDLING AND THREADING SUPPORT)
def send_ned_velocity(vehicle, vx, vy, vz, duration, MV_Status):
if emergency_stop:
return # Stop all movement if emergency is triggered
try:
vehicle.mode = VehicleMode("GUIDED")
msg = vehicle.message_factory.set_position_target_local_ned_encode(
0,
0, 0,
9,
1479,
0, 0, 0,
vx, vy, vz,
0, 0, 0,
0, 0
)
for _ in range(duration):
if emergency_stop:
break # Stop movement if emergency occurs
vehicle.send_mavlink(msg)
print(MV_Status)
logging.info(MV_Status)
time.sleep(1)
except Exception as e:
logging.error(f"Error sending NED velocity: {e}")
# CONTROL YAW FUNCTION (WITH ERROR HANDLING)
def condition_yaw(vehicle, radian, duration, MV_Status):
vehicle.mode = VehicleMode("GUIDED")
msg = vehicle.message_factory.set_position_target_local_ned_encode(
0,
0, 0,
9,
2503,
0, 0, 0, # POSITION m
0, 0, 0, # VELOCITY m/s
0, 0, 0, # ACCELERATIONS m/s^2
radian, 0
)
for x in range(0, duration):
vehicle.send_mavlink(msg)
print(MV_Status)
time.sleep(1)
# SQUARE MOVEMENT FUNCTION (FOR NO TARGET DETECTION)
def square_movement():
"""Move the drone in a square pattern."""
print("Executing square movement pattern.")
for _ in range(4): # Repeat 4 times for a square
# Move forward
send_ned_velocity(vehicle, 1, 0, 0, duration=3) # Move forward for 3 seconds
# Turn right (yaw)
radian = 1.5708
condition_yaw(vehicle, radian, 1, MV_Status="YAW TO THE RIGHT") # Rotate by 90 degrees TO RIGHT
time.sleep(5) # Wait a second for yaw to complete
# CAMERA INITIALIZATION FUNCTION (WITH ERROR HANDLING)
def start_camera():
try:
picam2 = Picamera2()
picam2.set_controls({"AfMode": controls.AfModeEnum.Continuous, "AfSpeed": controls.AfSpeedEnum.Fast})
camera_config = picam2.create_preview_configuration(main={"format": "RGB888", "size": (1280, 720)})
picam2.configure(camera_config)
picam2.start()
return picam2
except Exception as e:
logging.error(f"Failed to initialize camera: {e}")
sys.exit(1)
# FRAME PROCESSING THREAD FUNCTION (PUT FRAMES INTO QUEUE)
def frame_producer(picam2):
while True:
if emergency_stop:
break
frame = picam2.capture_array()
if not frame_queue.full():
frame_queue.put(frame)
def preprocess_image(image):
"""Resize and preprocess image for the TFLite model."""
# Ensure the input shape matches the model's input dimensions
input_shape = input_details[0]['shape'][1:3] # [300, 300]
# Convert the image to RGB format (Pillow's default is RGB)
image = Image.fromarray(image).convert('RGB')
# Resize the image to the expected input size
image = image.resize(input_shape, Image.LANCZOS) # Pillow 10 uses LANCZOS for high-quality downsampling
# Convert image to numpy array and expand dimensions to match [1, 300, 300, 3]
input_tensor = np.expand_dims(np.array(image), axis=0)
# Ensure the image is in uint8 format as required by the model
#input_tensor = input_tensor.astype(np.uint8)
input_tensor = input_tensor.astype(np.float32)
return input_tensor
# DETECT OBJECTS FUNCTION
def detect_objects(image, overlap_thresh=0.5):
try:
input_tensor = preprocess_image(image)
interpreter.set_tensor(input_details[0]['index'], input_tensor)
interpreter.invoke()
# Check output layer name to determine if this model was created with TF2 or TF1,
# because outputs are ordered differently for TF2 and TF1 models
outname = output_details[0]['name']
if ('StatefulPartitionedCall' in outname): # This is a TF2 model
boxes_idx, classes_idx, scores_idx = 1, 3, 0
else: # This is a TF1 model
boxes_idx, classes_idx, scores_idx = 0, 1, 2
boxes = interpreter.get_tensor(output_details[boxes_idx]['index'])[0]
classes = interpreter.get_tensor(output_details[classes_idx]['index'])[0]
scores = interpreter.get_tensor(output_details[scores_idx]['index'])[0]
confidence_threshold = 0.5
selected_indices = np.where(scores > confidence_threshold)
# Convert bounding boxes to pixel coordinates
bounding_boxes = []
for box in boxes:
ymin, xmin, ymax, xmax = box
x = int(xmin * image.shape[1])
y = int(ymin * image.shape[0])
width = int((xmax - xmin) * image.shape[1])
height = int((ymax - ymin) * image.shape[0])
bounding_boxes.append([x, y, width, height])
# Apply NMS
indices = cv2.dnn.NMSBoxes(
bounding_boxes, scores.tolist(),
confidence_threshold, overlap_thresh
)
if len(indices) > 0:
indices = indices.flatten()
bounding_boxes = np.array([bounding_boxes[i] for i in indices])
scores = np.array([scores[i] for i in indices])
classes = np.array([classes[i] for i in indices])
return boxes[selected_indices], scores[selected_indices], classes[selected_indices]
except Exception as e:
logging.error(f"Error during object detection: {e}")
return [], [], []
def annotate_image(image, boxes, scores, classes):
"""Annotate the image with detection results."""
centers = []
highest_score = -1
highest_score_index = -1
for i in range(len(boxes)):
x, y, w, h = boxes[i]
start_point = (x, y)
end_point = (x + w, y + h)
# Draw bounding boxes for all detections
cv2.rectangle(image, start_point, end_point, (0, 255, 0), 2)
object_name = labels[int(classes[i])] # Look up object name from "labels" array using class index
label = f"{object_name}: {scores[i]:.2f}"
cv2.putText(image, label, (start_point[0], start_point[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
# Calculate the center of the bounding box
center_x = x + w // 2
center_y = y + h // 2
centers.append((center_x, center_y))
# Track the highest scoring detection
if scores[i] > highest_score:
highest_score = scores[i]
highest_score_index = i
# Return the modified image, all centers, and the center of the highest-scoring detection
return image, centers, centers[highest_score_index] if highest_score_index != -1 else None
# FRAME CONSUMER THREAD (DETECT OBJECTS AND FOLLOW TARGET)
def frame_consumer():
global latest_centers
last_detection_time = time.time()
while True:
if emergency_stop:
break
if not frame_queue.empty():
frame = frame_queue.get()
boxes, scores, classes = detect_objects(frame)
annotated_frame, centers, _ = annotate_image(frame, boxes, scores, classes)
latest_centers = centers # Update global center coordinates
follow_target(centers, scores, last_detection_time)
time.sleep(1.0 / 10) # Limit FPS
# MAIN FLASK APP FOR STREAMING VIDEO
app = Flask(__name__)
@app.route('/video_feed')
def video_feed():
return render_template_string(HTML_TEMPLATE)
@app.route('/video_feed_stream')
def video_feed_stream():
return Response(generate_frames(), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/')
def index():
return "Video Streaming with Object Detection. Go to /video_feed to view the stream."
@app.route('/get_coordinates')
def get_coordinates():
global latest_centers
centers_str = ', '.join([f"({x},{y})" for x, y in latest_centers])
return centers_str
def follow_target(centers, scores, last_detection_time):
"""Function to follow the target with the highest score."""
current_time = time.time()
follow_duration = 10 # Duration to follow the target
if len(centers) > 0:
# RESET THE DETECTION TIME SINCE WE HAVE A DETECTION
# Get the center coordinates of the highest-scoring detection
# Find the index of the highest-scoring detection
max_score_idx = np.argmax(scores) # Get the index of the highest score
center_x, center_y = centers[max_score_idx] # Use the center of the highest-scoring target
# Set default velocities
vx = 0
vy = 0
vz = 0
# DEFINE THE IMAGE WIDTH AND HEIGHT
image_width = 1280
image_height = 720
# DEFINE THE CENTER TOLERANCE RANGES FOR X AND Y
x_min = image_width // 2 - 10
x_max = image_width // 2 + 10
y_min = image_height // 2 - 10
y_max = image_height // 2 + 10
'''center_x is the x-coordinate of the center of the bounding box,
center_y is the y-coordinate of the center of the bounding box'''
# Adjust horizontal velocity based on the x position (left/right)
# MOVE RIGHT
if center_x > x_max:
vy = 1
vz = 0
# MOVE LEFT
elif center_x < x_min:
vy = -1
vz = 0
# Adjust forward/backward velocity based on the y position
# MOVE FORWARD
if center_y > y_min:
vx = 1
vz = 0
# MOVE BACKWARD
elif center_y < y_max:
vx = -1
vz = 0
if (center_x > x_min and center_x < x_max) and (center_y > y_min and center_y < y_max):
vx = 0
vy = 0
vz = 0
vehicle.mode = VehicleMode("LOITER")
# Send the velocity command to follow the target
send_ned_velocity(vehicle, vx, vy, vz, duration=1, MV_Status="Following Target")
#CHECK IF THE FOLLOW DURATION HAS BEEN EXCEEDED
if current_time - last_detection_time >= follow_duration:
vehicle.mode = VehicleMode("LOITER")
print("Target lost, switching to Loiter mode.")
vehicle.close()
return current_time
'''
else:
# IF NO TARGET IS DETECTED FOR 15 SECONDS, SWITCH TO SQUARE MOVEMENT
if current_time - last_detection_time > 15:
square_movement()
else:
# If no target is detected, switch the drone to Loiter mode
print("No victim detected, switching to Loiter mode.")
vehicle.mode = VehicleMode("LOITER")
return last_detection_time
'''
# STREAM VIDEO FRAMES TO FLASK (UPDATED)
def generate_frames():
"""Generate frames from the camera and run object detection."""
global latest_centers # Track the latest centers for coordinate updates
last_detection_time = time.time() # Initialize detection time to current time
while True:
frame = picam2.capture_array()
# Detect objects in the frame
boxes, scores, classes = detect_objects(frame)
# Annotate the frame with detection results
annotated_frame, centers, _ = annotate_image(frame, boxes, scores, classes)
# Update the global centers list for coordinate display
latest_centers = centers
# Follow the highest-scoring target or execute square movement
last_detection_time = follow_target(centers, scores, last_detection_time)
# Encode the frame in JPEG format
ret, buffer = cv2.imencode('.jpg', annotated_frame)
if not ret:
continue # Skip this frame if encoding fails
frame = buffer.tobytes()
# Concatenate frame to a streaming-compatible format
yield (b'--frame\\r\\n'
b'Content-Type: image/jpeg\\r\\n\\r\\n' + frame + b'\\r\\n')
def run_flask():
app.run(host='0.0.0.0', port=5000, debug=False)
# MAIN EXECUTION FUNCTION
if __name__ == '__main__':
# Load the TFLite model
model_path = "/home/sedna/Downloads/PROJECT_IRIS_V1/IRIS.tflite"
label_path = "/home/sedna/Downloads/PROJECT_IRIS_V1/labelmap.txt" # DIRECTORY PATH OF LABELMAP FILE
interpreter = tflite.Interpreter(model_path=model_path, num_threads=4)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
# Load labels
with open(label_path, 'r') as f:
labels = [line.strip() for line in f.readlines()]
# Start the camera
picam2 = start_camera()
# Start frame processing threads
threading.Thread(target=frame_producer, args=(picam2,), daemon=True).start()
threading.Thread(target=frame_consumer, daemon=True).start()
threading.Thread(target=check_emergency_key, daemon=True).start() # Start listening for emergency key press
print("PRESS ('r' FOR RETURN-TO-LAUNCH)")
# Start Flask app
run_flask()
time.sleep(15)
connect_to_vehicle()
arm_and_takeoff(10)