Monarch  v3.8.2
Project 8 Data File Format Library
M3MultithreadingTest.cc
Go to the documentation of this file.
1 /*
2  * Monarch3MultithreadingTest.cc
3  *
4  * Created on: Dec 22, 2016
5  * Author: N. Oblath
6  *
7  * Use: Monarch3MultithreadingTest [options]
8  *
9  * Options:
10  * filename=[string] -- default: multithreading_test.egg
11  */
12 
13 #include "M3DataInterface.hh"
14 #include "M3Monarch.hh"
15 
16 #include "application.hh"
17 #include "logger.hh"
18 #include "param.hh"
19 
20 #include <boost/filesystem.hpp>
21 
22 #include <atomic>
23 #include <chrono>
24 #include <condition_variable>
25 #include <mutex>
26 #include <stdlib.h>
27 #include <string>
28 #include <thread>
29 #include <vector>
30 
31 using namespace monarch3;
32 
33 LOGGER( mlog, "M3MultithreadingTest" );
34 
35 int main( int argc, char** argv )
36 {
37  try
38  {
39  scarab::main_app theMain;
40 
41  theMain.default_config().add( "filename", "multithreading_test.egg" );
42 
43  theMain.add_config_option< std::string >( "Filename", "filename", "Test output filename" );
44 
45  CLI11_PARSE( theMain, argc, argv );
46 
47  std::string tFilename = theMain.primary_config()[ "filename" ]().as_string();
48 
49  unsigned tNRecords = 5;
50  unsigned tNStreams = 10;
51  unsigned tArraySize = 1024;
52  unsigned tSampleSize = 1;
53  unsigned tDataTypeSize = 1;
54 
55  std::shared_ptr< Monarch3 > tWriteTest( Monarch3::OpenForWriting( tFilename ) );
56 
57  LINFO( mlog, "Preparing header" );
58  M3Header* tHeader = tWriteTest->GetHeader();
59  tHeader->Filename() = tFilename;
60  tHeader->SetRunDuration( 8675309 );
61  tHeader->Timestamp() = "Stardate 33515";
62  tHeader->Description() = "Bigger on the inside";
63 
64  LINFO( mlog, "Adding stream(s)" );
65 
66  std::vector< unsigned > tStreamNums( tNStreams );
67  for( unsigned iStream = 0; iStream < tNStreams; ++iStream )
68  {
69  std::stringstream tStr;
70  tStr << "Channel " << iStream;
71  unsigned tSingleStreamNum = tHeader->AddStream( tStr.str(), 100, tArraySize, tSampleSize, tDataTypeSize, sDigitizedUS, tDataTypeSize * 8, sBitsAlignedLeft );
72  tStreamNums[ iStream ] = tSingleStreamNum;
73  }
74 
75  tWriteTest->WriteHeader();
76 
77  LINFO( mlog, "Wrote header:\n" << *tHeader );
78 
79 
80  LINFO( mlog, "Creating fake data array" );
81 
82  unsigned tNBytes = tArraySize * tDataTypeSize * tSampleSize;
83  std::vector< byte_type > tDataMaster( tNBytes );
84 
85  M3DataWriter< uint8_t > tDMWriter( tDataMaster.data(), tDataTypeSize, sDigitizedUS );
86  for( unsigned iBin = 0; iBin < tArraySize; ++iBin )
87  {
88  tDMWriter.set_at( 42, iBin );
89  }
90 
91 
92  LINFO( mlog, "Getting stream pointers" );
93 
94  std::vector< M3Stream* > tStreams( tNStreams );
95  std::vector< byte_type* > tStreamData( tNStreams );
96  for( unsigned iStream = 0; iStream < tNStreams; ++iStream )
97  {
98  tStreams[ iStream ] = tWriteTest->GetStream( tStreamNums[ iStream ] );
99  tStreamData[ iStream ] = tStreams[ iStream ]->GetStreamRecord()->GetData();
100  }
101 
102  LINFO( mlog, "Writing data" );
103 
104 
105  std::mutex tRunMutex;
106  std::condition_variable tRunRelease;
107 
108  std::vector< std::thread > tThreads;
109  std::atomic< unsigned > tNThreadsReady( 0 );
110 
111  for( unsigned iStream = 0; iStream < tNStreams; ++iStream )
112  {
113  tThreads.push_back( std::thread( [&, iStream]{
114  //LDEBUG( mlog, "Starting thread " << iStream << "; waiting for start signal" );
115  bool tIsNewAcq = true;
116  std::unique_lock< std::mutex > tRunLock( tRunMutex );
117  tNThreadsReady++;
118  tRunRelease.wait( tRunLock );
119  for( unsigned iRecord = 0; iRecord < tNRecords; ++iRecord )
120  {
121  ::memcpy( tStreamData[ iStream ], tDataMaster.data(), tNBytes );
122  if( ! tStreams[ iStream ]->WriteRecord( tIsNewAcq ) )
123  {
124  LERROR( mlog, "Unable to write record <" << iRecord << "> for stream <" << iStream << ">" );
125  return;
126  }
127  tIsNewAcq = false;
128  }
129  //LDEBUG( mlog, "Thread " << iStream << " is finished" );
130  } ) );
131  }
132 
133  // Synchronize threads
134  LINFO( mlog, "Waiting for threads to be ready" );
135  while( tNThreadsReady.load() != tNStreams )
136  {
137  std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
138  }
139  std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
140 
141  LINFO( mlog, "Releasing threads" );
142  tRunRelease.notify_all();
143 
144  for( unsigned iStream = 0; iStream < tNStreams; ++iStream )
145  {
146  tThreads[ iStream ].join();
147  }
148 
149  LINFO( mlog, "Closing file" );
150 
151  tWriteTest->FinishWriting();
152 
153  LINFO( mlog, "Test finished" );
154 
155  }
156  catch( std::exception& e )
157  {
158  LERROR( mlog, "Exception thrown during write-speed test:\n" << e.what() );
159  return RETURN_ERROR;
160  }
161 
162  return RETURN_SUCCESS;
163 }
static const uint32_t sDigitizedUS
Definition: M3Constants.hh:38
Egg file header information.
Definition: M3Header.hh:152
void set_at(SetType value, unsigned index)
static scarab::logger mlog("M3Header")
int main(int argc, char **argv)
Interface class for a variety of data types.
static const uint32_t sBitsAlignedLeft
Definition: M3Constants.hh:44
static Monarch3 * OpenForWriting(const std::string &filename)
Definition: M3Monarch.cc:92
unsigned AddStream(const std::string &aSource, uint32_t anAcqRate, uint32_t aRecSize, uint32_t aSampleSize, uint32_t aDataTypeSize, uint32_t aDataFormat, uint32_t aBitDepth, uint32_t aBitAlignment, std::vector< unsigned > *aChanVec=NULL)
Definition: M3Header.cc:400