【Websocket编程】基于libwebsockets实现客户端数据通信

导读:本篇文章讲解 【Websocket编程】基于libwebsockets实现客户端数据通信,希望对大家有帮助,欢迎收藏,转发!站点地址:www.bmabk.com

0、序

  基于libwebsockets所实现的客户端数据上传demo,源码来源于libwebsockets中minimal-example中的实例。本篇章仅利用client端实例进行数据通信,Server端已由其他后台的小伙子搭好服务器了。vhd->i.path的入参源码为”/publisher”,不应给此参数或者“/”,应赋值为“/ws”
★★★仅修改以下三部分:

vhd->i.port =  ;
vhd->i.address = " ";
vhd->i.path = "/ws";

在这里插入图片描述

1、Client端实现

1.1、minimal_ws_client.c

#include "minimal_ws_client.h"

static int interrupted;

static void __minimal_destroy_message(void *_msg)
{
	struct msg *msg = _msg;

	free(msg->payload);
	msg->payload = NULL;
	msg->len = 0;
}

static void *thread_spam(void *d)
{
	struct per_vhost_data__minimal *vhd = (struct per_vhost_data__minimal *)d;
	struct msg amsg;
	int len = 128, n, whoami = 0;

	for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++)
		if (pthread_equal(pthread_self(), vhd->pthread_spam[n]))
			whoami = n + 1;

	while ((!vhd->finished) && (rt_ctl == RT_IDLE))
	{
		/* don't generate output if client not connected */
		if (!vhd->established)
			goto wait;

		pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */

		/* only create if space in ringbuffer */
		n = (int)lws_ring_get_count_free_elements(vhd->ring);
		if (!n)
		{
			lwsl_user("dropping!\n");
			goto wait_unlock;
		}

		amsg.payload = malloc((unsigned int)(LWS_PRE + len));
		if (!amsg.payload)
		{
			lwsl_user("OOM: dropping\n");
			goto wait_unlock;
		}
		n = lws_snprintf((char *)amsg.payload + LWS_PRE, (unsigned int)len, "hello websocket");
		amsg.len = (unsigned int)n;
		n = (int)lws_ring_insert(vhd->ring, &amsg, 1);
		if (n != 1)
		{
			__minimal_destroy_message(&amsg);
			lwsl_user("dropping!\n");
		}
		else
		{
			lws_cancel_service(vhd->context);
		}

	wait_unlock:
		pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */

	wait:
		usleep(100000);
	}

	lwsl_notice("thread_spam %d exiting\n", whoami);
	pthread_exit(NULL);
	return NULL;
}

static void sul_connect_attempt(struct lws_sorted_usec_list *sul)
{
	struct per_vhost_data__minimal *vhd =
		lws_container_of(sul, struct per_vhost_data__minimal, sul);

	vhd->i.context = vhd->context;
	vhd->i.port = 6088;
	vhd->i.address = "xxx.xx.xx.xx";
	vhd->i.path = "/ws";
	vhd->i.host = vhd->i.address;
	vhd->i.origin = vhd->i.address;
	vhd->i.ssl_connection = 0;

	vhd->i.protocol = "lws-minimal-demo";
	vhd->i.pwsi = &vhd->client_wsi;

	//if connect failed, retry to connect
	if (!lws_client_connect_via_info(&vhd->i))
		lws_sul_schedule(vhd->context, 0, &vhd->sul, sul_connect_attempt, 10 * LWS_US_PER_SEC);
}

static int callback_minimal_handler(struct lws *wsi, enum lws_callback_reasons reason,
									void *user, void *in, size_t len)
{
	struct per_vhost_data__minimal *vhd =
		(struct per_vhost_data__minimal *)
			lws_protocol_vh_priv_get(lws_get_vhost(wsi),
									 lws_get_protocol(wsi));
	const struct msg *pmsg;
	//struct rec_payload gmsg;

	void *retval;
	int n, m, r = 0;

	switch (reason)
	{
	/* --- protocol lifecycle callbacks --- */
	case LWS_CALLBACK_PROTOCOL_INIT:
		vhd = lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi), lws_get_protocol(wsi), sizeof(struct 			  per_vhost_data__minimal));
		vhd->context = lws_get_context(wsi);
		vhd->protocol = lws_get_protocol(wsi);
		vhd->vhost = lws_get_vhost(wsi);

		vhd->ring = lws_ring_create(sizeof(struct msg), 8, __minimal_destroy_message);
		if (!vhd->ring)
			return 1;

		pthread_mutex_init(&vhd->lock_ring, NULL);

		/* start the content-creating threads */

		for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++)
			if (pthread_create(&vhd->pthread_spam[n], NULL, thread_spam, vhd))
			{
				lwsl_err("thread creation failed\n");
				r = 1;
				goto init_fail;
			}

		sul_connect_attempt(&vhd->sul);
		break;

	case LWS_CALLBACK_PROTOCOL_DESTROY:
	init_fail:
		vhd->finished = 1;
		for (n = 0; n < (int)LWS_ARRAY_SIZE(vhd->pthread_spam); n++)
			pthread_join(vhd->pthread_spam[n], &retval);

		if (vhd->ring)
			lws_ring_destroy(vhd->ring);

		lws_sul_cancel(&vhd->sul);
		pthread_mutex_destroy(&vhd->lock_ring);

		return r;

	case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
		lwsl_err("CLIENT_CONNECTION_ERROR: %s\n", in ? (char *)in : "(null)");
		vhd->client_wsi = NULL;
		lws_sul_schedule(vhd->context, 0, &vhd->sul, sul_connect_attempt, LWS_US_PER_SEC);
		break;

		/* --- client callbacks --- */

	case LWS_CALLBACK_CLIENT_ESTABLISHED:
		lwsl_user("%s: established\n", __func__);
		vhd->established = 1;
		break;

	case LWS_CALLBACK_CLIENT_WRITEABLE:
		lwsl_user("Send a msg to Server.\n");
		rt_ctl = RT_BUSY;
		pthread_mutex_lock(&vhd->lock_ring); /* --------- ring lock { */
		pmsg = lws_ring_get_element(vhd->ring, &vhd->tail);
		if (!pmsg)
			goto skip;

		/* notice we allowed for LWS_PRE in the payload already */
		m = lws_write(wsi, ((unsigned char *)pmsg->payload) + LWS_PRE,
					  pmsg->len, LWS_WRITE_TEXT);
		if (m < (int)pmsg->len)
		{
			pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock */
			lwsl_err("ERROR %d writing to ws socket\n", m);
			return -1;
		}

		lws_ring_consume_single_tail(vhd->ring, &vhd->tail, 1);

		/* more to do for us? */
		if (lws_ring_get_element(vhd->ring, &vhd->tail))
			/* come back as soon as we can write more */
			lws_callback_on_writable(wsi);

	skip:
		pthread_mutex_unlock(&vhd->lock_ring); /* } ring lock ------- */
		break;

	case LWS_CALLBACK_CLIENT_RECEIVE:
		printf("GOT: %s\n", (char *)in);
		rt_ctl = RT_IDLE;
		break;

	case LWS_CALLBACK_CLIENT_CLOSED:
		vhd->client_wsi = NULL;
		vhd->established = 0;
		lws_sul_schedule(vhd->context, 0, &vhd->sul, sul_connect_attempt, LWS_US_PER_SEC);
		break;

	case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
		if (vhd && vhd->client_wsi && vhd->established)
			lws_callback_on_writable(vhd->client_wsi);
		break;

	default:
		break;
	}

	return lws_callback_http_dummy(wsi, reason, user, in, len);
}

static const struct lws_protocols protocols[] = {
	{
		"lws-minimal-demo",
		callback_minimal_handler,
		0,
		0,
	},
	{NULL, NULL, 0, 0}};

static void sigint_handler(int sig)
{
	interrupted = 1;
}

int main(int argc, const char **argv)
{
	struct lws_context_creation_info info;
	struct lws_context *context;
	const char *p;
	int n = 0, logs = LLL_USER | LLL_ERR | LLL_WARN | LLL_NOTICE
		/* for LLL_ verbosity above NOTICE to be built into lws,
			 * lws must have been configured and built with
			 * -DCMAKE_BUILD_TYPE=DEBUG instead of =RELEASE */
		/* | LLL_INFO */ /* | LLL_PARSER */ /* | LLL_HEADER */
		/* | LLL_EXT */ /* | LLL_CLIENT */	/* | LLL_LATENCY */
		/* | LLL_DEBUG */;
	signal(SIGINT, sigint_handler);

	if ((p = lws_cmdline_option(argc, argv, "-d")))
		logs = atoi(p);

	lws_set_log_level(logs, NULL);
	lwsl_user("LWS minimal ws client \n");

	memset(&info, 0, sizeof info);		/* otherwise uninitialized garbage */
	info.port = CONTEXT_PORT_NO_LISTEN; /* we do not run any server */
	info.protocols = protocols;
	info.fd_limit_per_thread = 1 + 1 + 1;

	context = lws_create_context(&info);
	if (!context)
	{
		lwsl_err("lws init failed\n");
		return 1;
	}

	while (n >= 0 && !interrupted)
		n = lws_service(context, 0);

	lws_context_destroy(context);
	lwsl_user("Completed\n");

	return 0;
}

1.2、minimal_ws_client.h

#ifndef _MINIMAL_WS_CLIENT_H_
#define _MINIMAL_WS_CLIENT_H_

#include <libwebsockets.h>
#include <string.h>
#include <signal.h>
#include <pthread.h>

typedef enum{
		RT_IDLE = 0,
		RT_BUSY,
		RT_TIMEOUT
}MSG_STATUS;
MSG_STATUS rt_ctl = RT_IDLE;

/* one of these created for each message */
struct msg 
{
	void *payload; /* is malloc'd */
	size_t len;
};

struct rec_payload
{
    unsigned char data[LWS_SEND_BUFFER_PRE_PADDING + 128];
    size_t len;
};

struct per_vhost_data__minimal {
	struct lws_context *context;
	struct lws_vhost *vhost;
	const struct lws_protocols *protocol;
	pthread_t pthread_spam[2];

	lws_sorted_usec_list_t sul;

	pthread_mutex_t lock_ring; /* serialize access to the ring buffer */
	struct lws_ring *ring; /* ringbuffer holding unsent messages */
	uint32_t tail;

	struct lws_client_connect_info i;
	struct lws *client_wsi;

	int counter;
	char finished;
	char established;
};

#endif

1.3、Makefile

#compile tool
CC = gcc

#name the target project
TARGET=client_demo

#the path of the head files
INCLUDE_DIR=-I./
 
#load lib   -L load path of the lib ; -l load name of the lib
LIBS=-lpthread -lwebsockets
 
# compile option(-wall output warning message; -O optimize compile)
CFLAGS = -Wall -O3
 
# get all files name is *.c to SOURCE_C
SOURCE_C=$(wildcard ./*.c)

#replace *.c to *.o from variable SOUTCE_C, and get the name string to variable OBJECT_O
OBJECT_O=$(SOURCE_C:.c=.o)
 
#.c.o:
%.o: %.c
	$(CC) -c $(CFLAGS) $(INCLUDE_DIR) $< -o $@
 
$(TARGET): $(OBJECT_O)
#     $(CC) $(OBJECT_O) -o $(TARGET)
	$(CC) ${CFLAGS}   $(OBJECT_O) $(LIBS) -o $@
	@echo "********************"
	@echo "***Build Finished***"
	@echo "********************"
 
clean:
	rm client_demo ./*.o -rf
	@echo "********************"
	@echo "***Clean Finished***"
	@echo "********************" 

2、调试

  使用Wireshark工具进行抓包分析如下图所示:
在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/116951.html

(0)
seven_的头像seven_bm

相关推荐

发表回复

登录后才能评论
极客之音——专业性很强的中文编程技术网站,欢迎收藏到浏览器,订阅我们!