root/test/test-shunt-process.c

Revision f74aea30e4dfbad4e94ef339ae38bf0bb3800dde, 10.3 kB (checked in by hansp <hansp>, 4 years ago)

Initial revision

  • Property mode set to 100644
Line 
1 /* -*- Mode: C; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2
3 /* test-shunt-process.c - FlowShunt process test.
4  *
5  * Copyright (C) 2006 Hans Petter Jansson
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  *
22  * Authors: Hans Petter Jansson <hpj@copyleft.no>
23  */
24
25 #define TEST_UNIT_NAME "FlowShunt (sub-process)"
26 #define TEST_TIMEOUT_S 60
27
28 /* Test variables; adjustable */
29
30 #define BUFFER_SIZE            50000000  /* Amount of data to transfer */
31 #define PACKET_MAX_SIZE        8192      /* Max transfer unit */
32 #define PACKET_MIN_SIZE        1         /* Min transfer unit */
33
34 #define TOTAL_PAUSE_TIME_MS    3000      /* Total time to spend *not* reading or writing */
35 #define PAUSE_MIN_LENGTH_MS    10        /* Min pause unit */
36 #define PAUSE_MAX_LENGTH_MS    200       /* Max pause unit */
37
38 /* Calculations to determine the probability of pausing for
39  * each packet processed. No user serviceable parts inside. */
40
41 #define PROBABILITY_MULTIPLIER 1000000   /* For fixed-point fractions */
42 #define PACKET_AVG_SIZE        (PACKET_MIN_SIZE + ((PACKET_MAX_SIZE - PACKET_MIN_SIZE) / 2))
43 #define PAUSE_AVG_LENGTH_MS    (PAUSE_MIN_LENGTH_MS + ((PAUSE_MAX_LENGTH_MS - PAUSE_MIN_LENGTH_MS) / 2))
44 #define NUM_PAUSES             (TOTAL_PAUSE_TIME_MS / PAUSE_AVG_LENGTH_MS)
45 #define TOTAL_EXPECTED_PACKETS (BUFFER_SIZE / PACKET_AVG_SIZE)
46 #define TOTAL_EXPECTED_EVENTS  (TOTAL_EXPECTED_PACKETS * 2)  /* Account for both reads and writes */
47 #define PAUSE_PROBABILITY      ((NUM_PAUSES * PROBABILITY_MULTIPLIER) / TOTAL_EXPECTED_EVENTS)
48
49 #include "test-common.c"
50
51 static guchar    *buffer;
52 static guint      src_index;
53 static guint      dest_index;
54
55 static gboolean   finished_writing;
56 static gboolean   writes_are_blocked;
57
58 static gboolean   started_reading;
59 static gboolean   finished_reading;
60 static gboolean   in_segment;
61 static gboolean   reads_are_blocked;
62
63 static FlowShunt *worker_shunt;
64
65 static guint
66 get_pause_interval_ms (void)
67 {
68   guint n;
69
70   n = g_random_int_range (0, PROBABILITY_MULTIPLIER + 1);
71   if (n < PAUSE_PROBABILITY)
72   {
73     n = g_random_int_range (PAUSE_MIN_LENGTH_MS, PAUSE_MAX_LENGTH_MS + 1);
74     return n;
75   }
76
77   return 0;
78 }
79
80 static gboolean
81 read_pause_ended (FlowShunt *shunt)
82 {
83   test_print ("Resuming reads\n");
84   reads_are_blocked = FALSE;
85   flow_shunt_unblock_reads (shunt);
86   return FALSE;
87 }
88
89 static void
90 read_from_shunt (FlowShunt *shunt, FlowPacket *packet, gpointer data)
91 {
92   guint    packet_size;
93   gpointer packet_data;
94   guint    pause_ms;
95
96   if (reads_are_blocked)
97     test_end (TEST_RESULT_FAILED, "got read while blocked");
98
99   if (data != shunt)
100     test_end (TEST_RESULT_FAILED, "read callback user_data does not match");
101
102   if (finished_reading)
103     test_end (TEST_RESULT_FAILED, "got read callback after end-of-stream");
104
105   if (!packet)
106     test_end (TEST_RESULT_FAILED, "got read with NULL packet");
107
108   packet_data = flow_packet_get_data (packet);
109   if (!packet_data)
110     test_end (TEST_RESULT_FAILED, "got NULL packet data");
111
112   if (flow_packet_get_format (packet) == FLOW_PACKET_FORMAT_OBJECT)
113   {
114     FlowDetailedEvent *detailed_event = packet_data;
115
116     if (FLOW_IS_DETAILED_EVENT (detailed_event))
117     {
118       if (flow_detailed_event_matches (detailed_event, FLOW_STREAM_DOMAIN, FLOW_STREAM_BEGIN))
119       {
120         test_print ("Read: Beginning of stream marker\n");
121
122         if (started_reading)
123           test_end (TEST_RESULT_FAILED, "got multiple beginning-of-stream markers");
124
125         started_reading = TRUE;
126       }
127       else if (flow_detailed_event_matches (detailed_event, FLOW_STREAM_DOMAIN, FLOW_STREAM_SEGMENT_BEGIN))
128       {
129         test_print ("Read: Beginning of segment marker\n");
130
131         if (!started_reading)
132           test_end (TEST_RESULT_FAILED, "segment started before stream");
133
134         if (finished_reading)
135           test_end (TEST_RESULT_FAILED, "segment started after stream end");
136
137         if (in_segment)
138           test_end (TEST_RESULT_FAILED, "got nested beginning-of-segment markers");
139
140         in_segment = TRUE;
141       }
142       else if (flow_detailed_event_matches (detailed_event, FLOW_STREAM_DOMAIN, FLOW_STREAM_END))
143       {
144         test_print ("Read: End of stream marker\n");
145
146         if (!started_reading)
147           test_end (TEST_RESULT_FAILED, "stream ended without starting");
148
149         if (finished_reading)
150           test_end (TEST_RESULT_FAILED, "got multiple end-of-stream markers");
151
152         if (in_segment)
153           test_end (TEST_RESULT_FAILED, "stream ended inside an open segment");
154
155         if (dest_index != BUFFER_SIZE)
156           test_end (TEST_RESULT_FAILED, "did not pass through all the data");
157
158         finished_reading = TRUE;
159
160         /* Wait a bit before quitting, so shunts have a chance to generate invalid events */
161         g_timeout_add (1000, (GSourceFunc) test_quit_main_loop, NULL);
162       }
163       else if (flow_detailed_event_matches (detailed_event, FLOW_STREAM_DOMAIN, FLOW_STREAM_SEGMENT_END))
164       {
165         test_print ("Read: End of segment marker\n");
166
167         if (!started_reading)
168           test_end (TEST_RESULT_FAILED, "segment end before stream start");
169
170         if (finished_reading)
171           test_end (TEST_RESULT_FAILED, "segment end after stream end");
172
173         if (!in_segment)
174           test_end (TEST_RESULT_FAILED, "end of segment, but no segment open");
175
176         in_segment = FALSE;
177       }
178     }
179     else if (!FLOW_IS_EVENT (detailed_event))
180     {
181       test_end (TEST_RESULT_FAILED, "got a weird object from read shunt");
182     }
183   }
184   else if (flow_packet_get_format (packet) == FLOW_PACKET_FORMAT_BUFFER)
185   {
186     packet_size = flow_packet_get_size (packet);
187     if (packet_size == 0)
188       test_end (TEST_RESULT_FAILED, "got zero-size buffer packet");
189
190     test_print ("Read: %d byte packet at offset %d\n", packet_size, dest_index);
191
192     if (!started_reading)
193       test_end (TEST_RESULT_FAILED, "got data before start of stream");
194
195     if (finished_reading)
196       test_end (TEST_RESULT_FAILED, "got data after end of stream");
197
198     if (!in_segment)
199       test_end (TEST_RESULT_FAILED, "got data outside segment");
200
201     if (dest_index + packet_size > BUFFER_SIZE)
202       test_end (TEST_RESULT_FAILED, "read too much data");
203
204     if (memcmp (packet_data, buffer + dest_index, packet_size))
205       test_end (TEST_RESULT_FAILED, "output data did not match input");
206
207     dest_index += packet_size;
208
209     if (dest_index == BUFFER_SIZE)
210       test_print ("Read: Complete at %d bytes\n", dest_index);
211   }
212   else
213   {
214     test_end (TEST_RESULT_FAILED, "got unknown packet format");
215   }
216
217   flow_packet_free (packet);
218
219   pause_ms = get_pause_interval_ms ();
220   if (pause_ms > 0)
221   {
222     test_print ("Blocking reads for %.2fs\n", (float) pause_ms / 1000.0);
223     reads_are_blocked = TRUE;
224     flow_shunt_block_reads (shunt);
225     g_timeout_add (pause_ms, (GSourceFunc) read_pause_ended, shunt);
226   }
227 }
228
229 static gboolean
230 write_pause_ended (FlowShunt *shunt)
231 {
232   test_print ("Resuming writes\n");
233   writes_are_blocked = FALSE;
234   flow_shunt_unblock_writes (shunt);
235   return FALSE;
236 }
237
238 static FlowPacket *
239 write_to_shunt (FlowShunt *shunt, gpointer data)
240 {
241   FlowPacket *packet;
242   guint       pause_ms;
243
244   if (writes_are_blocked)
245     test_end (TEST_RESULT_FAILED, "got write while blocked");
246
247   if (data != shunt)
248     test_end (TEST_RESULT_FAILED, "write callback user_data does not match");
249
250   if (finished_writing)
251     test_end (TEST_RESULT_FAILED, "got write callback after sending end-of-stream");
252
253   if (src_index == BUFFER_SIZE)
254   {
255     packet = flow_create_simple_event_packet (FLOW_STREAM_DOMAIN, FLOW_STREAM_END);
256     finished_writing = TRUE;
257     flow_shunt_block_writes (shunt);
258
259     test_print ("Write: End of stream marker\n");
260   }
261   else
262   {
263     guint len;
264
265     len = g_random_int_range (PACKET_MIN_SIZE, PACKET_MAX_SIZE + 1);
266     if (src_index + len > BUFFER_SIZE)
267       len = BUFFER_SIZE - src_index;
268
269     packet = flow_packet_new (FLOW_PACKET_FORMAT_BUFFER, buffer + src_index, len);
270
271     test_print ("Write: %d byte packet at offset %d\n", len, src_index);
272
273     src_index += len;
274   }
275
276   if (!packet)
277     test_end (TEST_RESULT_FAILED, "failed to create a packet");
278
279   pause_ms = get_pause_interval_ms ();
280   if (pause_ms > 0)
281   {
282     test_print ("Blocking writes for %.2fs\n", (float) pause_ms / 1000.0);
283     writes_are_blocked = TRUE;
284     flow_shunt_block_writes (shunt);
285     g_timeout_add (pause_ms, (GSourceFunc) write_pause_ended, shunt);
286   }
287
288   return packet;
289 }
290
291 static void
292 worker_func (FlowSyncShunt *sync_shunt, gpointer user_data)
293 {
294   gboolean running = TRUE;
295
296   while (running)
297   {
298     FlowPacket *packet;
299
300     packet = flow_sync_shunt_read (sync_shunt);
301
302     if (packet)
303     {
304       test_print ("Child: Processing %d byte packet.\n", flow_packet_get_size (packet));
305       flow_sync_shunt_write (sync_shunt, packet);
306     }
307     else
308     {
309       test_print ("Child: Read failed - exiting.\n");
310       running = FALSE;
311     }
312   }
313 }
314
315 static void
316 test_run (void)
317 {
318   gint i;
319
320   g_random_set_seed (time (NULL));
321
322   /* Set up a buffer with random data */
323
324   buffer = g_malloc (BUFFER_SIZE);
325
326   for (i = 0; i < BUFFER_SIZE; )
327   {
328     guchar *p = buffer + i;
329
330     if (i < BUFFER_SIZE - 4)
331     {
332       *((guint32 *) p) = g_random_int ();
333       i += 4;
334     }
335     else
336     {
337       *p = (guchar) g_random_int ();
338       i++;
339     }
340   }
341
342   test_print ("Probability of pause is %d out of %d\n", PAUSE_PROBABILITY, PROBABILITY_MULTIPLIER);
343
344   src_index          = 0;
345   dest_index         = 0;
346   finished_reading   = FALSE;
347   finished_writing   = FALSE;
348   reads_are_blocked  = FALSE;
349   writes_are_blocked = FALSE;
350
351   worker_shunt = flow_spawn_process (worker_func, NULL);
352
353   flow_shunt_set_read_func (worker_shunt, read_from_shunt, worker_shunt);
354   flow_shunt_set_write_func (worker_shunt, write_to_shunt, worker_shunt);
355
356   /* Run */
357
358   test_run_main_loop ();
359
360   /* Cleanup */
361
362   flow_shunt_destroy (worker_shunt);
363   g_free (buffer);
364 }
Note: See TracBrowser for help on using the browser.