test/throughput/main.c

/*******************************************************************************
* # License
* Copyright 2019 Silicon Laboratories Inc. www.silabs.com
*******************************************************************************
*
* The licensor of this software is Silicon Laboratories Inc. Your use of this
* software is governed by the terms of Silicon Labs Master Software License
* Agreement (MSLA) available at
* www.silabs.com/about-us/legal/master-software-license-agreement. This
* software is distributed to you in Source Code format and is governed by the
* sections of the MSLA applicable to Source Code.
*
******************************************************************************/
#include "gos.h"
#include "throughput_cli_parser.h"
#include "throughput_types.h"
#define APPLICATION_START_LINE "\r\nThroughput test app started"
#define THROUGHPUT_START_MIN_ARGS 5
#define THROUGHPUT_START_MAX_ARGS 10
#define THROUGHPUT_RESULT_ARG_LENGTH 1
#define TCP_CLIENT_CONNECTION_RETRY_PERIOD (1000)
#define MAX_TCP_CLIENT_CONNECTION_ATTEMPTS (3)
#define MAX_PAYLOAD_SIZE (1000)
#define THROUGHPUT_PACKET_LENGTH (MAX_PAYLOAD_SIZE + sizeof(throughput_packet_header_t))
#define THREAD_STACK_SIZE (2048)
#define NUM_BITS_IN_A_BYTE (8)
#define NUM_BYTES_IN_A_KBYTE (1024)
#define MAX_IP_ADDRESS_STRING (16)
#define SERVER_PRIORITY 5
#define CLIENT_PRIORITY 5
// Commands created by this demo
GOS_CMD_CREATE_COMMAND(throughput, start, "throughput_start", "tstart",
THROUGHPUT_START_MIN_ARGS, THROUGHPUT_START_MAX_ARGS, true);
GOS_CMD_CREATE_COMMAND(throughput, result, "throughput_result", "tresult",
0, THROUGHPUT_RESULT_ARG_LENGTH, true);
static gos_cmd_result_t throughput_server_start(throughput_context_t* context);
static gos_cmd_result_t throughput_client_start(throughput_context_t* context);
static gos_cmd_result_t create_server_thread(throughput_context_t* context);
static gos_cmd_result_t get_report(throughput_context_t* context);
static void tcp_client_connected_handler(gos_handle_t handle);
static void tcp_client_disconnected_handler(gos_handle_t handle);
static void report_update_handler(void* arg);
static void timer_handler(void* arg);
static void generate_data(uint8_t* buffer, uint16_t length);
static void throughput_context_deinit(throughput_context_t* context);
void throughput_client_thread(void * arg);
void throughput_server_thread(void * arg);
throughput_context_t throughput[NUM_SYSTEMS];
GOS_DEFINE_COMMAND(throughput, start)
{
context_type_t context_type;
gos_result_t result;
context_type = process_session_type(argc, argv);
if (throughput[context_type].status != UNINITIALISED)
{
gos_cmd_format_response(GOS_CMD_FAILED, "Meter is already running");
}
// Add type
throughput[context_type].type = context_type;
result = throughput_process_args(argc, argv, &throughput[context_type]);
if (result != GOS_SUCCESS)
{
}
if (context_type == SERVER)
{
return throughput_server_start(&throughput[context_type]);
}
else
{
return throughput_client_start(&throughput[context_type]);
}
}
GOS_DEFINE_COMMAND(throughput, result)
{
context_type_t type;
if (strcmp(argv[0], "-c") == 0)
{
type = CLIENT;
}
else
{
type = SERVER;
}
if (throughput[type].status != TRANSFERRING)
{
}
if (throughput[type].settings.update_interval_ms != 0)
{
}
return get_report(&throughput[type]);
}
void gos_app_init(void)
{
GOS_LOG(APPLICATION_START_LINE);
// Init the client and server
for (uint8_t sys = 0; sys < NUM_SYSTEMS; sys++)
{
throughput[sys].status = UNINITIALISED;
throughput[sys].transferred = 0;
throughput[sys].reported = 0;
throughput[sys].thread_stack_ptr = NULL;
throughput[sys].interval_count = 0;
}
}
static void generate_data( uint8_t* buffer, uint16_t length )
{
/* Filling buffer with incrementing digits in ascii mode */
char character = 0x30;
uint8_t* buffer_end = buffer + length;
while (buffer < buffer_end)
{
*buffer++ = character;
character++;
if ( character == 0x39 )
{
character = 0x30;
}
}
return;
}
static void timer_handler(void* arg)
{
throughput_context_t* context = (throughput_context_t*)arg;
context->exit = true;
GOS_LOG("timer event");
if (context->type == SERVER)
{
gos_rtos_thread_force_awake(&context->thread);
}
}
void throughput_server_thread(void* arg)
{
/* handle will be returned in connect handler */
/* we assume that settings structure has been already check on validity and arg points to a valid context */
throughput_context_t* context = (throughput_context_t*)arg;
throughput_packet_header_t* packet_ptr = NULL;
uint32_t read_bytes = 0;
gos_result_t result;
context->last_get_report_time = gos_rtos_get_time();
context->transferred = 0;
if ( context->settings.proto == THROUGHPUT_TCP )
{
context->status = TRANSFERRING;
while(1)
{
gos_malloc("throughput_packet", (uint8_t**)&packet_ptr, THROUGHPUT_PACKET_LENGTH);
result = gos_tcp_read( context->handle, packet_ptr, THROUGHPUT_PACKET_LENGTH, &read_bytes );
if ( result == GOS_TIMEOUT)
{
gos_free(packet_ptr);
continue;
}
if( result != GOS_SUCCESS )
{
/* server will know when to stop receiving via client disconnected event.
* When client gets disconnected gos_tcp_read will return with bad result */
gos_free(packet_ptr);
throughput_context_deinit(context);
GOS_LOG("gos_tcp_read failed! %d", result);
return;
}
context->transferred += read_bytes;
gos_free(packet_ptr);
}
}
else
{
while(context->exit == false)
{
gos_malloc("throughput_packet", (uint8_t**)&packet_ptr, THROUGHPUT_PACKET_LENGTH);
result = gos_udp_read(context->handle, packet_ptr, THROUGHPUT_PACKET_LENGTH, &read_bytes);
if (result == GOS_TIMEOUT)
{
gos_free(packet_ptr);
continue;
}
if ( result != GOS_SUCCESS )
{
gos_free(packet_ptr);
throughput_context_deinit(context);
GOS_LOG("gos_udp_read failed! %d", result);
return;
}
if (context->transferred == 0)
{
/* only update status to transferring after we receive first packet */
context->status = TRANSFERRING;
gos_event_register_timed(timer_handler, context, context->settings.time_s, GOS_EVENT_FLAG_SINGLE_SHOT);
}
context->transferred += read_bytes;
gos_free(packet_ptr);
}
}
throughput_context_deinit(context);
GOS_LOG("TCP server transfer is completed.");
}
static gos_cmd_result_t create_server_thread(throughput_context_t* context)
{
gos_thread_config_t config =
{
.function = throughput_server_thread,
.stack_size = THREAD_STACK_SIZE,
.stack_buffer = context->thread_stack_ptr,
.arg = context,
.name = "server thread",
.priority = SERVER_PRIORITY
};
gos_rtos_thread_delete(&context->thread);
result = gos_rtos_thread_create(&context->thread, &config);
if ( result != GOS_SUCCESS )
{
GOS_LOG("Server Thread initialization failed.");
/* TODO: properly handle this error */
}
GOS_LOG("Server thread started.");
}
static void tcp_client_connected_handler(gos_handle_t handle)
{
GOS_LOG("TCP client connected");
throughput[SERVER].handle = handle;
create_server_thread(&throughput[SERVER]);
}
static void tcp_client_disconnected_handler(gos_handle_t handle)
{
GOS_LOG("TCP client disconnected");
}
static void report_update_handler(void* arg)
{
char result1[10];
char result2[10];
char result3[10];
throughput_context_t* context = (throughput_context_t*)arg;
uint32_t not_reported;
uint32_t total_sent;
float interval_start;
float interval_end;
float interval_length_s = context->settings.update_interval_ms / (float)1000;
float bandwidth;
if ( context->status != TRANSFERRING )
{
return;
}
context->interval_count++;
interval_start = (context->interval_count - 1) * interval_length_s;
interval_end = context->interval_count * interval_length_s;
not_reported = context->transferred - context->reported;
bandwidth = (not_reported * NUM_BITS_IN_A_BYTE) / interval_length_s;
total_sent = not_reported / NUM_BYTES_IN_A_KBYTE;
if ( context->type == CLIENT )
{
GOS_LOG("[c][%s - %s] total_sent: %d Kbytes, Bandwidth: %s", float_to_str(interval_start, result1, 2),
float_to_str(interval_end, result2, 2),
total_sent,
float_to_str(bandwidth, result3, 2));
}
else
{
GOS_LOG("[s][%s - %s] total_received: %d KBytes, Bandwidth: %s", float_to_str(interval_start, result1, 2),
float_to_str(interval_end, result2, 2),
total_sent,
float_to_str(bandwidth, result3, 2));
}
context->reported += not_reported;
return;
}
static gos_cmd_result_t get_report(throughput_context_t* context)
{
uint32_t time;
uint32_t not_reported;
uint32_t total_transferred;
float interval;
float bandwidth;
char result1[10];
char result2[10];
if ( context->status != TRANSFERRING )
{
}
interval = (time - context->last_get_report_time) / (float)1000.0;
context->last_get_report_time = gos_rtos_get_time();
not_reported = context->transferred - context->reported;
bandwidth = (not_reported * NUM_BITS_IN_A_BYTE) / interval;
total_transferred = not_reported / NUM_BYTES_IN_A_KBYTE;
if ( context->type == CLIENT )
{
gos_cmd_format_response(GOS_CMD_SUCCESS, "[c][%s] sent: %d Kbytes, Bandwidth: %s", float_to_str(interval, result1, 2),
total_transferred,
float_to_str(bandwidth, result2, 2));
}
else
{
gos_cmd_format_response(GOS_CMD_SUCCESS, "[s][%s] received: %d Kbytes, Bandwidth: %s", float_to_str(interval, result1, 2),
total_transferred,
float_to_str(bandwidth, result2, 2));
}
context->reported += not_reported;
}
static gos_cmd_result_t throughput_server_start(throughput_context_t* context)
{
/* register periodic event for throughput reporting */
if (context->settings.update_interval_ms != 0)
{
gos_event_register_periodic(report_update_handler, context, context->settings.update_interval_ms, GOS_EVENT_FLAG_NONE);
}
if ( context->settings.proto == THROUGHPUT_TCP )
{
/* for tcp we will first register a connected and disconnected event handlers.
* Tthread will be created as soon as client establishes connection with our server */
gos_tcp_register_server_event_handlers(tcp_client_connected_handler,
tcp_client_disconnected_handler,
NULL);
gos_tcp_listen(GOS_INTERFACE_DEFAULT, context->settings.port);
}
else
{
gos_udp_listen(GOS_INTERFACE_DEFAULT, context->settings.port, &context->handle);
return create_server_thread(context);
}
}
static gos_cmd_result_t throughput_client_start(throughput_context_t* context)
{
gos_thread_config_t config =
{
.function = throughput_client_thread,
.stack_size = THREAD_STACK_SIZE,
.stack_buffer = context->thread_stack_ptr,
.arg = context,
.name = "client thread",
.priority = CLIENT_PRIORITY
};
if (context->settings.update_interval_ms != 0)
{
gos_event_register_periodic(report_update_handler, context, context->settings.update_interval_ms, GOS_EVENT_FLAG_NONE);
}
gos_rtos_thread_delete(&context->thread);
result = gos_rtos_thread_create(&context->thread, &config);
if ( result != GOS_SUCCESS )
{
GOS_LOG("Thread initialization failed");
cmd_result = GOS_CMD_FAILED;
}
return cmd_result;
}
void throughput_client_thread(void * arg)
{
gos_result_t result;
uint16_t seq_number = 0;
int i;
throughput_packet_header_t* packet_ptr = NULL;
/* we assume that settings structure has been already check on validity and arg points to a valid context */
// from the settings it will get interval
throughput_context_t* context = (throughput_context_t*)arg;
if ( context->settings.proto == THROUGHPUT_TCP )
{
context->status = IDLE;
for ( i=0; i< MAX_TCP_CLIENT_CONNECTION_ATTEMPTS; i++ )
{
/* connecting on the default interface which must be set prior running application */
result = gos_tcp_connect(GOS_INTERFACE_DEFAULT, context->settings.ip, context->settings.port, &context->handle);
if ( result != GOS_SUCCESS )
{
GOS_LOG("Failed to connect. Trying again in %d seconds", TCP_CLIENT_CONNECTION_RETRY_PERIOD/1000);
continue;
}
break;
}
if ( result != GOS_SUCCESS )
{
GOS_LOG("TCP client initialization failed. Exit");
throughput_context_deinit(context);
return;
}
GOS_LOG("Meter TCP client is connected to a receiver. Start sending data");
}
else
{
GOS_LOG("Before gos_udp_connect");
result = gos_udp_connect(context->settings.ip, context->settings.port, 0, &context->handle);
if ( result != GOS_SUCCESS )
{
GOS_LOG("Udp client initialization failed");
throughput_context_deinit(context);
return;
}
GOS_LOG("After gos_udp_connect");
}
context->last_get_report_time = gos_rtos_get_time();
context->status = TRANSFERRING;
context->transferred = 0;
while(context->exit == false)
{
/* Packet generation sequence may be moved into a separate function */
/* put sequence number followed by test data */
gos_malloc("throughput_packet", (uint8_t**)&packet_ptr, THROUGHPUT_PACKET_LENGTH);
((throughput_packet_header_t*)packet_ptr)->seq_number = htons(seq_number);
seq_number++;
generate_data((uint8_t*)packet_ptr + sizeof(throughput_packet_header_t), THROUGHPUT_PACKET_LENGTH - sizeof(throughput_packet_header_t));
if ( context->settings.proto == THROUGHPUT_TCP )
{
int retries;
for ( retries = 0; retries < 5; retries++ )
{
result = gos_tcp_write(context->handle, (uint8_t*)packet_ptr, THROUGHPUT_PACKET_LENGTH, 0);
if (result == GOS_SUCCESS)
{
break;
}
else if (result == GOS_TIMEOUT)
{
continue;
}
else
{
GOS_LOG("Failed to send, %d. Exit", result);
gos_free(packet_ptr);
throughput_context_deinit(context);
return;
}
}
if (result != GOS_SUCCESS)
{
gos_free(packet_ptr);
throughput_context_deinit(context);
return;
}
}
else
{
while(context->exit == false)
{
result = gos_udp_write(context->handle, (uint8_t*)packet_ptr, THROUGHPUT_PACKET_LENGTH);
if (result == GOS_SUCCESS)
{
break;
}
else if (result == GOS_TIMEOUT)
{
continue;
}
else
{
GOS_LOG("Failed to send, %d. Exit", result);
gos_free(packet_ptr);
throughput_context_deinit(context);
return;
}
}
if (result != GOS_SUCCESS)
{
GOS_LOG("Failed to send, %d. Exit", result);
gos_free(packet_ptr);
throughput_context_deinit(context);
return;
}
}
gos_free(packet_ptr);
if (context->transferred == 0)
{
gos_event_register_timed(timer_handler, context, context->settings.time_s, GOS_EVENT_FLAG_SINGLE_SHOT);
}
context->transferred += THROUGHPUT_PACKET_LENGTH;
}
GOS_LOG("Client transfer is completed.");
throughput_context_deinit(context);
}
static void throughput_context_deinit(throughput_context_t* context)
{
context->last_get_report_time = 0;
context->thread_stack_ptr = NULL;
context->interval_count = 0;
context->reported = 0;
context->transferred = 0;
context->exit = false;
if (context->settings.update_interval_ms != 0)
{
gos_event_unregister(report_update_handler, context);
}
gos_event_unregister(timer_handler, context);
if (context->type == SERVER)
{
if (context->settings.proto == THROUGHPUT_TCP)
{
}
else
{
}
}
else if (context->type == CLIENT && context->settings.proto == THROUGHPUT_TCP)
{
/* Disconnect the client so the tcp server knows when to exit */
gos_tcp_disconnect(context->handle);
}
else if (context->type == CLIENT && context->settings.proto == THROUGHPUT_UDP)
{
gos_udp_close(context->handle);
}
context->handle = 0;
memset(&context->settings, 0x00, sizeof(context->settings));
context->status = UNINITIALISED;
}