#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # SPDX-License-Identifier: GPL-3.0 # # GNU Radio Python Flow Graph # Title: FFT Rate Tester # Author: Marcus Müller # GNU Radio version: 3.9.0.0 from gnuradio import blocks from gnuradio import fft from gnuradio.fft import window from gnuradio import gr from gnuradio.filter import firdes import sys import signal from argparse import ArgumentParser from gnuradio.eng_arg import eng_float, intx from gnuradio import eng_notation class fft_rate(gr.top_block): def __init__(self, fftlen=2**20, threads=1): gr.top_block.__init__(self, "FFT Rate Tester", catch_exceptions=True) ################################################## # Parameters ################################################## self.fftlen = fftlen self.threads = threads ################################################## # Blocks ################################################## self.fft_vxx_0 = fft.fft_vcc(fftlen, True, [], True, 1) self.blocks_vector_source_x_0 = blocks.vector_source_c(list(range(fftlen))*4, True, fftlen, []) self.blocks_probe_rate_0 = blocks.probe_rate(gr.sizeof_gr_complex*fftlen, 250, 0.025) self.blocks_null_sink_0 = blocks.null_sink(gr.sizeof_gr_complex*fftlen) self.blocks_message_debug_0 = blocks.message_debug(True) ################################################## # Connections ################################################## self.msg_connect((self.blocks_probe_rate_0, 'rate'), (self.blocks_message_debug_0, 'print')) self.connect((self.blocks_vector_source_x_0, 0), (self.blocks_probe_rate_0, 0)) self.connect((self.blocks_vector_source_x_0, 0), (self.fft_vxx_0, 0)) self.connect((self.fft_vxx_0, 0), (self.blocks_null_sink_0, 0)) def get_fftlen(self): return self.fftlen def set_fftlen(self, fftlen): self.fftlen = fftlen self.blocks_vector_source_x_0.set_data(list(range(self.fftlen))*4, []) def get_threads(self): return self.threads def set_threads(self, threads): self.threads = threads def argument_parser(): parser = ArgumentParser() parser.add_argument( "-l", "--fftlen", dest="fftlen", type=intx, default=2**20, help="Set FFT Length [default=%(default)r]") parser.add_argument( "-n", "--threads", dest="threads", type=intx, default=1, help="Set Number of Threads [default=%(default)r]") return parser def main(top_block_cls=fft_rate, options=None): if options is None: options = argument_parser().parse_args() tb = top_block_cls(fftlen=options.fftlen, threads=options.threads) def sig_handler(sig=None, frame=None): tb.stop() tb.wait() sys.exit(0) signal.signal(signal.SIGINT, sig_handler) signal.signal(signal.SIGTERM, sig_handler) tb.start() tb.wait() if __name__ == '__main__': main()