Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion example/01_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
print(dev_list)

# Replace 'COMX' with the appropriate COM port for Model2450
sw1 = model2450.Model2450('COM10')
sw1 = model2450.Model2450('COM6')
# print("switch 2450 connected:", sw1)
# Connect the USB Switch
sw1.connect()
Expand All @@ -26,6 +26,26 @@
gread = sw1.get_read() # cmd for read the ambient light sensor
print(gread)

time.sleep(1)
gread = sw1.get_read() # cmd for read the ambient light sensor
print(gread)

time.sleep(1)
gread = sw1.get_read() # cmd for read the ambient light sensor
print(gread)

time.sleep(1)
gcolor = sw1.get_color() # display the color reading
print(gcolor)

time.sleep(1)
gcolor = sw1.get_color() # display the color reading
print(gcolor)

time.sleep(1)
gcolor = sw1.get_color() # display the color reading
print(gcolor)

time.sleep(1)
glevel = sw1.get_level() # cmd for read the light level for detecting blank frames
print(glevel)
Expand Down
2 changes: 1 addition & 1 deletion example/02_blackframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
print(dev_list)

# Replace 'COMX' with the appropriate COM port for Model2450
sw1 = model2450.Model2450('COM10')
sw1 = model2450.Model2450('COM6')
# print("switch 2450 connected:", sw1)
# Connect the USB Switch
sw1.connect()
Expand Down
1 change: 1 addition & 0 deletions model2450lib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# __init__.py
55 changes: 0 additions & 55 deletions model2450lib/model.py

This file was deleted.

196 changes: 128 additions & 68 deletions model2450lib/model2450.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

##############################################################################
#
# Module: model2450.py
Expand All @@ -8,90 +9,149 @@
# Released under the MCCI Corporation.
#
# Author:
# Vinay N, MCCI Corporation August 2024
# Vinay N, MCCI Corporation May 2025
#
# Revision history:
# V1.0.0 Wed Aug 2024 12:05:00 Vinay N
# V1.0.1 Wed May 2025 12:05:00 Vinay N
# Module created
##############################################################################
from model2450lib import model
from model2450lib.serialmodel import SerialDevice
from model2450lib.packetutils import decode_packet, read_packet_from_serial, read_block_frames
import time

class Model2450(model.Model):
def __init__(self, cport):
model.Model.__init__(self, cport, 115200)

def read_sn(self):
cmd = 'sn\r\n'
rc, rstr = self.send_cmd(cmd)
# print(rc,rstr)
return (rstr)
class Model2450(SerialDevice):
def __init__(self, port):
super().__init__(port)
self.r_data = []
self.g_data = []
self.b_data = []
self.light_data = []
self.time_data = []
self.keep_running = True

def read_sn(self):
return self.send_cmd('sn\r\n')

def read_sn(self):
return self.send_cmd('sn\r\n')

def get_version(self):
cmd = 'version\r\n'
rc, rstr = self.send_cmd(cmd)
# print("rstr-version:", rstr)
return (rc, rstr)

def do_reset(self):
cmd = 'reset\r\n'
rc, rstr = self.send_cmd(cmd)
# print("rstr-version:", rstr)
return (rc, rstr)
return self.send_cmd('version\r\n')

def get_color(self):
return self.send_cmd('color\r\n')


def get_read(self):
cmd = 'read\r\n'
rc, rstr = self.send_cmd(cmd)
# print("read->:",rstr)
return (rstr)
return self.send_cmd('read\r\n')


def get_level(self):
cmd = 'level\r\n'
rc, rstr = self.send_cmd(cmd)
# print("level->", rstr)
return rstr
return self.send_cmd('level\r\n')

def set_level(self, value):
cmd = self.set_level_cmd(value)
return self.send_cmd(cmd)

def set_level_cmd(self, value):
return 'level '+str(value)+'\r\n'


def get_color(self):
cmd = 'color\r\n'
rc, rstr = self.send_command(cmd)
# print("color->", rstr)
return rstr

def set_red(self):
cmd = 'set red\r\n'
rc, rstr = self.send_cmd(cmd)
return rstr
return self.send_cmd('set red\r\n')

def set_blue(self):
cmd = 'set blue\r\n'
rc, rstr = self.send_cmd(cmd)
return rstr

return self.send_cmd('set blue\r\n')

def set_green(self):
cmd = 'set green\r\n'
rc, rstr = self.send_cmd(cmd)
return rstr
return self.send_cmd('set green\r\n')

# def set_th_light(self):
# cmd = "level"

def set_run(self):
cmd = 'run\r\n'
rc, rstr = self.send_blinkcommand(cmd)
# print(f"run command output:\n{rstr}")
return rstr

return self.send_text_command('run\r\n')

def set_stop(self):
cmd = 'stop\r\n'
rc, rstr = self.send_blinkcommand(cmd)
# print(f"stop command output:\n{rstr}")
return rstr
return self.send_text_command('stop\r\n')

def do_reset(self):
try:
self.send_command('reset -b\r\n')
time.sleep(0.1) # Give time for device to reset
self.disconnect() # Close serial port cleanly
except Exception as e:
print(f"Ignoring expected error during reset: {e}")

def reset_mode(self):
try:
self.send_command('reset\r\n')
time.sleep(0.1) # Give time for device to reset
self.disconnect() # Close serial port cleanly
except Exception as e:
print(f"Ignoring expected error during reset: {e}")

def set_level(self, value):
cmd = f'level {value}\r\n'
return self.send_cmd(cmd)

def get_stream3(self, callback=None):
"""
Continuously stream data from the device and call the callback with each piece of data.
"""
self.send_stream_cmd("stream 3\r\n") # or whatever command starts the stream

while self.ser and self.ser.is_open:
packet = read_packet_from_serial(self.ser)
if packet:
try:
decoded = decode_packet(packet)
payload = decoded.get("payload", b"")
ascii_payload = payload.decode("ascii", errors="ignore").strip()

if ascii_payload:
print(f"[get_stream3] Received: {ascii_payload}")
if callback:
callback(ascii_payload)

except Exception as e:
print(f"[get_stream3] Decode error: {e}")


def run_blank_frame_sequence(self, duration=10):
"""
Sends 'run' command to start blank frames, and automatically sends 'stop' after a specified duration.
"""
self.ser.write(b"run\r\n") # Use self.ser instead of ser

start_time = time.time() # Track the start time
buffered_payload = b""
blank_frame_count = 0 # Initialize a counter for blank frames

while self.ser and self.ser.is_open:
# Check if the elapsed time has passed the duration
if time.time() - start_time >= duration:
self.stop_blank_frame_sequence() # Stop the sequence after the specified duration
break

packet = read_block_frames(self.ser)
if packet:
try:
decoded = decode_packet(packet)
payload = decoded.get("payload", b"")
start_bit = decoded["start_bit"]
end_bit = decoded["end_bit"]
if start_bit:
buffered_payload = payload
else:
buffered_payload += payload

if end_bit or len(payload) < decoded["length"] - 2:
try:
ascii_payload = buffered_payload.decode("ascii").strip()
if not ascii_payload: # Consider empty payload as blank frame
blank_frame_count += 1
except UnicodeDecodeError:
print(f"payload: {buffered_payload.hex()} (non-ascii)")
buffered_payload = b""

except Exception as decode_err:
print("Decode error:", decode_err)

time.sleep(0.0006)

return blank_frame_count # Return the count of blank frames detected

def stop_blank_frame_sequence(self):
"""
Sends 'stop' command to stop the blank frame sequence.
"""
self.ser.write(b"stop\r\n") # Use self.ser instead of ser
print("Sent: stop")
Loading