root/test/test-tcp-io.c

Revision 7fc7cc43795e2dc3eca7e924d74101d441bafb67, 12.4 kB (checked in by Hans Petter Jansson <hpj@gong.(none)>, 2 years ago)

Implement GError error reporting in high-level I/O functions.

  • Property mode set to 100644
Line 
1 /* -*- Mode: C; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2
3 /* test-tcp-io.c - FlowTcpIO test.
4  *
5  * Copyright (C) 2006 Hans Petter Jansson
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  *
22  * Authors: Hans Petter Jansson <hpj@copyleft.no>
23  */
24
25 #define TEST_UNIT_NAME "FlowTcpIO"
26 #define TEST_TIMEOUT_S 180
27
28 /* Test variables; adjustable */
29
30 #define LOCAL_PORT             2533
31
32 #define SOCKETS_NUM            15
33 #define SOCKETS_CONCURRENT_MAX 5
34
35 #define BUFFER_SIZE            5000000   /* Amount of data to transfer */
36 #define PACKET_MAX_SIZE        8192      /* Max transfer unit */
37 #define PACKET_MIN_SIZE        1         /* Min transfer unit */
38
39 #define TOTAL_PAUSE_TIME_MS    3000      /* Total time to spend *not* reading or writing */
40 #define PAUSE_MIN_LENGTH_MS    10        /* Min pause unit */
41 #define PAUSE_MAX_LENGTH_MS    200       /* Max pause unit */
42
43 /* Calculations to determine the probability of pausing for
44  * each packet processed. No user serviceable parts inside. */
45
46 #define PROBABILITY_MULTIPLIER 1000000   /* For fixed-point fractions */
47 #define PACKET_AVG_SIZE        (PACKET_MIN_SIZE + ((PACKET_MAX_SIZE - PACKET_MIN_SIZE) / 2))
48 #define PAUSE_AVG_LENGTH_MS    (PAUSE_MIN_LENGTH_MS + ((PAUSE_MAX_LENGTH_MS - PAUSE_MIN_LENGTH_MS) / 2))
49 #define NUM_PAUSES             (TOTAL_PAUSE_TIME_MS / PAUSE_AVG_LENGTH_MS)
50 #define TOTAL_EXPECTED_PACKETS (BUFFER_SIZE / PACKET_AVG_SIZE)
51 #define TOTAL_EXPECTED_EVENTS  (TOTAL_EXPECTED_PACKETS * 2)  /* Account for both reads and writes */
52 #define PAUSE_PROBABILITY      ((NUM_PAUSES * PROBABILITY_MULTIPLIER) / TOTAL_EXPECTED_EVENTS)
53
54 #include "test-common.c"
55
56 typedef struct
57 {
58   gint read_offset;
59   gint write_offset;
60 }
61 TransferInfo;
62
63 static guchar            *buffer                 = NULL;
64
65 static FlowIPService     *loopback_service       = NULL;
66 static FlowIPService     *bad_loopback_service   = NULL;
67 static FlowTcpIOListener *tcp_listener           = NULL;
68 static FlowTcpIO         *tcp_reader             = NULL;
69 static FlowTcpIO         *tcp_writer             = NULL;
70
71 static GHashTable        *transfer_info_table    = NULL;
72
73 static gint               sockets_done           = 0;
74 static gint               sockets_running        = 0;
75
76 static FlowTcpIO         *main_tcp_io            = NULL;
77
78 static void
79 transfer_info_free (TransferInfo *transfer_info)
80 {
81   g_slice_free (TransferInfo, transfer_info);
82 }
83
84 static void
85 subthread_main (void)
86 {
87   FlowTcpIO    *tcp_io;
88   TransferInfo  transfer_info;
89   guchar        temp_buffer [PACKET_MAX_SIZE];
90
91   test_print ("Subthread connecting to main thread\n");
92
93   tcp_io = flow_tcp_io_new ();
94
95   if (g_random_boolean ())
96   {
97     if (!flow_tcp_io_sync_connect (tcp_io, loopback_service, NULL))
98       test_end (TEST_RESULT_FAILED, "loopback connect failed");
99   }
100   else
101   {
102     if (!flow_tcp_io_sync_connect_by_name (tcp_io, "localhost", LOCAL_PORT, NULL))
103       test_end (TEST_RESULT_FAILED, "loopback connect by name failed");
104   }
105
106   test_print ("Subthread connected to main thread\n");
107
108   transfer_info.read_offset  = 0;
109   transfer_info.write_offset = 0;
110
111   while (transfer_info.read_offset  < BUFFER_SIZE ||
112          transfer_info.write_offset < BUFFER_SIZE)
113   {
114     if (transfer_info.write_offset < BUFFER_SIZE)
115     {
116       gint len;
117
118       len = g_random_int_range (PACKET_MIN_SIZE, PACKET_MAX_SIZE);
119       len = MIN (len, BUFFER_SIZE - transfer_info.write_offset);
120
121       if (g_random_int_range (0, 2) == 0)
122         flow_io_sync_write (FLOW_IO (tcp_io), buffer + transfer_info.write_offset, len, NULL);
123       else
124         flow_io_write (FLOW_IO (tcp_io), buffer + transfer_info.write_offset, len);
125
126       test_print ("Subthread wrote %d bytes\n", len);
127
128       transfer_info.write_offset += len;
129     }
130
131     if (transfer_info.read_offset < BUFFER_SIZE)
132     {
133       gint len;
134
135       len = g_random_int_range (PACKET_MIN_SIZE, PACKET_MAX_SIZE);
136       len = MIN (len, BUFFER_SIZE - transfer_info.read_offset);
137
138       if (!flow_io_sync_read_exact (FLOW_IO (tcp_io), temp_buffer, len, NULL))
139         test_end (TEST_RESULT_FAILED, "subthread short read");
140
141       test_print ("Subthread read %d bytes\n", len);
142
143       if (memcmp (buffer + transfer_info.read_offset, temp_buffer, len))
144         test_end (TEST_RESULT_FAILED, "subthread read mismatch");
145
146       transfer_info.read_offset += len;
147     }
148   }
149
150   test_print ("Subthread disconnecting\n");
151   flow_tcp_io_sync_disconnect (tcp_io, NULL);
152   test_print ("Subthread disconnected\n");
153
154   g_object_unref (tcp_io);
155   test_print ("Subthread cleaned up\n");
156 }
157
158 static gboolean
159 spawn_subthread (void)
160 {
161   if (sockets_running >= SOCKETS_CONCURRENT_MAX || sockets_done >= SOCKETS_NUM)
162     return TRUE;
163
164   test_print ("Spawning new subthread\n");
165   g_thread_create ((GThreadFunc) subthread_main, NULL, FALSE, NULL);
166
167   sockets_running++;
168   return TRUE;
169 }
170
171 static void
172 read_notify (FlowTcpIO *tcp_io)
173 {
174   TransferInfo *transfer_info;
175   guchar        temp_buffer [PACKET_MAX_SIZE];
176   gint          desired_len;
177   gint          len;
178
179   test_print ("Main thread read notify (%p)\n", tcp_io);
180
181   transfer_info = g_hash_table_lookup (transfer_info_table, tcp_io);
182   g_assert (transfer_info != NULL);
183
184   do
185   {
186     desired_len = g_random_int_range (1, PACKET_MAX_SIZE + 1);
187     len = flow_io_read (FLOW_IO (tcp_io), temp_buffer, desired_len);
188     test_print ("Main thread read %d bytes\n", len);
189
190     if (memcmp (buffer + transfer_info->read_offset, temp_buffer, len))
191       test_end (TEST_RESULT_FAILED, "main thread read mismatch");
192
193     transfer_info->read_offset += len;
194
195     if (transfer_info->read_offset > BUFFER_SIZE)
196       test_end (TEST_RESULT_FAILED, "main thread read past buffer length");
197   }
198   while (len == desired_len);
199 }
200
201 static void
202 write_notify (FlowTcpIO *tcp_io)
203 {
204   TransferInfo *transfer_info;
205   gint          len;
206
207   test_print ("Main thread write notify (%p)\n", tcp_io);
208
209   transfer_info = g_hash_table_lookup (transfer_info_table, tcp_io);
210   g_assert (transfer_info != NULL);
211
212   len = g_random_int_range (PACKET_MIN_SIZE, PACKET_MAX_SIZE);
213   len = MIN (len, BUFFER_SIZE - transfer_info->write_offset);
214
215   flow_io_write (FLOW_IO (tcp_io), buffer + transfer_info->write_offset, len);
216   test_print ("Main thread wrote %d bytes\n", len);
217
218   transfer_info->write_offset += len;
219
220   if (transfer_info->write_offset == BUFFER_SIZE)
221   {
222     flow_io_set_write_notify (FLOW_IO (tcp_io), NULL, NULL);
223     test_print ("Main thread read done\n");
224   }
225 }
226
227 static void
228 print_running_socket (gpointer key, gpointer value, gpointer data)
229 {
230   test_print ("%p, ", key);
231 }
232
233 static void
234 lost_connection (FlowTcpIO *tcp_io)
235 {
236   TransferInfo *transfer_info;
237
238   if (flow_tcp_io_get_connectivity (tcp_io) != FLOW_CONNECTIVITY_DISCONNECTED)
239     return;
240
241   transfer_info = g_hash_table_lookup (transfer_info_table, tcp_io);
242   g_assert (transfer_info != NULL);
243
244   if (transfer_info->read_offset < BUFFER_SIZE)
245     test_end (TEST_RESULT_FAILED, "main thread did not read all data");
246
247   if (transfer_info->write_offset < BUFFER_SIZE)
248     test_end (TEST_RESULT_FAILED, "main thread did not write all data");
249
250   test_print ("Main thread lost connection\n");
251   g_hash_table_remove (transfer_info_table, tcp_io);
252   g_object_unref (tcp_io);
253
254   sockets_done++;
255   sockets_running--;
256
257   test_print ("sockets_done == %d, sockets_running == %d: ", sockets_done, sockets_running);
258   g_hash_table_foreach (transfer_info_table, print_running_socket, NULL);
259   test_print ("\n");
260
261   if (sockets_done >= SOCKETS_NUM && sockets_running == 0)
262   {
263     test_print ("quitting main loop!\n");
264     test_quit_main_loop ();
265   }
266 }
267
268 static void
269 new_connection (void)
270 {
271   FlowTcpIO    *tcp_io;
272   TransferInfo *transfer_info;
273
274   tcp_io = flow_tcp_io_listener_pop_connection (tcp_listener);
275   if (!tcp_io)
276     return;
277
278   main_tcp_io = tcp_io;
279
280   test_print ("Main thread received connection\n");
281
282   transfer_info = g_slice_new0 (TransferInfo);
283   g_hash_table_insert (transfer_info_table, tcp_io, transfer_info);
284
285   flow_io_set_read_notify  (FLOW_IO (tcp_io), (FlowNotifyFunc) read_notify, tcp_io);
286   flow_io_set_write_notify (FLOW_IO (tcp_io), (FlowNotifyFunc) write_notify, tcp_io);
287
288   g_signal_connect (tcp_io, "connectivity-changed", (GCallback) lost_connection, NULL);
289 }
290
291 static void
292 long_test (void)
293 {
294   guint id;
295
296   test_print ("Long test begin\n");
297
298   id = g_timeout_add (250, (GSourceFunc) spawn_subthread, NULL);
299   g_signal_connect (tcp_listener, "new-connection", (GCallback) new_connection, NULL);
300
301   test_run_main_loop ();
302
303   g_source_remove (id);
304
305   test_print ("Long test end\n");
306 }
307
308 static void
309 short_tests (void)
310 {
311   guchar read_buffer [2048];
312
313   test_print ("Short tests begin\n");
314
315   tcp_writer = flow_tcp_io_new ();
316
317   if (flow_tcp_io_sync_connect (tcp_writer, bad_loopback_service, NULL))
318     test_end (TEST_RESULT_FAILED, "bad connect did not fail as expected");
319
320   if (!flow_tcp_io_sync_connect (tcp_writer, loopback_service, NULL))
321     test_end (TEST_RESULT_FAILED, "could not connect to short-test listener");
322
323   tcp_reader = flow_tcp_io_listener_sync_pop_connection (tcp_listener);
324   if (!tcp_reader)
325     test_end (TEST_RESULT_FAILED, "missed connection on listener end");
326
327   flow_io_write (FLOW_IO (tcp_writer), buffer,        512);
328   flow_io_write (FLOW_IO (tcp_writer), buffer +  512, 512);
329   flow_io_write (FLOW_IO (tcp_writer), buffer + 1024, 512);
330   flow_io_write (FLOW_IO (tcp_writer), buffer + 1536, 512);
331
332   /* Partial read */
333
334   flow_io_sync_read_exact (FLOW_IO (tcp_reader), read_buffer, 2000, NULL);
335
336   /* Check read */
337
338   if (memcmp (buffer, read_buffer, 2000))
339     test_end (TEST_RESULT_FAILED, "data mismatch in short transfer (1)");
340
341   /* Read remaining bytes with an oversized request */
342
343   if (flow_io_sync_read (FLOW_IO (tcp_reader), read_buffer, 100, NULL) != 48)
344     test_end (TEST_RESULT_FAILED, "oversized read request returned wrong count");
345
346   /* Make sure remaining bytes match */
347
348   if (memcmp (buffer + 2000, read_buffer, 48))
349     test_end (TEST_RESULT_FAILED, "data mismatch in short transfer (2)");
350
351   /* Make sure nothing got clobbered */
352
353   if (memcmp (buffer + 48, read_buffer + 48, 1000))
354     test_end (TEST_RESULT_FAILED, "data mismatch in short transfer (3)");
355
356   /* Disconnect */
357
358   flow_tcp_io_sync_disconnect (tcp_reader, NULL);
359   flow_tcp_io_sync_disconnect (tcp_writer, NULL);
360
361   g_object_unref (tcp_reader);
362   g_object_unref (tcp_writer);
363
364   test_print ("Short tests end\n");
365 }
366
367 static void
368 test_run (void)
369 {
370   FlowIPAddr *ip_addr;
371   gint        i;
372
373   g_random_set_seed (time (NULL));
374
375   /* Set up a buffer with random data */
376
377   buffer = g_malloc (BUFFER_SIZE);
378
379   for (i = 0; i < BUFFER_SIZE; )
380   {
381     guchar *p = buffer + i;
382
383     if (i < BUFFER_SIZE - 4)
384     {
385       *((guint32 *) p) = g_random_int ();
386       i += 4;
387     }
388     else
389     {
390       *p = (guchar) g_random_int ();
391       i++;
392     }
393   }
394
395   transfer_info_table = g_hash_table_new_full (g_direct_hash, g_direct_equal,
396                                                NULL, (GDestroyNotify) transfer_info_free);
397
398   loopback_service = flow_ip_service_new ();
399   flow_ip_service_set_name (loopback_service, "localhost");
400   flow_ip_service_set_port (loopback_service, LOCAL_PORT);
401
402   /* */
403
404   bad_loopback_service = flow_ip_service_new ();
405   flow_ip_service_set_port (bad_loopback_service, 12505);
406
407   ip_addr = flow_ip_addr_new ();
408   flow_ip_addr_set_string (ip_addr, "127.0.0.1");
409   flow_ip_service_add_address (bad_loopback_service, ip_addr);
410   g_object_unref (ip_addr);
411
412   tcp_listener = flow_tcp_io_listener_new ();
413   if (!flow_tcp_listener_set_local_service (FLOW_TCP_LISTENER (tcp_listener), loopback_service, NULL))
414     test_end (TEST_RESULT_FAILED, "could not bind short-test listener");
415
416   short_tests ();
417   long_test ();
418
419   g_hash_table_destroy (transfer_info_table);
420   transfer_info_table = NULL;
421
422   g_object_unref (tcp_listener);
423   tcp_listener = NULL;
424
425   g_object_unref (loopback_service);
426   loopback_service = NULL;
427
428   g_object_unref (bad_loopback_service);
429   bad_loopback_service = NULL;
430
431   g_free (buffer);
432   buffer = NULL;
433 }
Note: See TracBrowser for help on using the browser.