16 #include "application.hh" 20 #include <boost/filesystem.hpp> 24 #include <condition_variable> 33 LOGGER(
mlog,
"M3MultithreadingTest" );
35 int main(
int argc,
char** argv )
39 scarab::main_app theMain;
41 theMain.default_config().add(
"filename",
"multithreading_test.egg" );
43 theMain.add_config_option< std::string >(
"Filename",
"filename",
"Test output filename" );
45 CLI11_PARSE( theMain, argc, argv );
47 std::string tFilename = theMain.primary_config()[
"filename" ]().as_string();
49 unsigned tNRecords = 5;
50 unsigned tNStreams = 10;
51 unsigned tArraySize = 1024;
52 unsigned tSampleSize = 1;
53 unsigned tDataTypeSize = 1;
57 LINFO(
mlog,
"Preparing header" );
58 M3Header* tHeader = tWriteTest->GetHeader();
59 tHeader->Filename() = tFilename;
60 tHeader->SetRunDuration( 8675309 );
61 tHeader->Timestamp() =
"Stardate 33515";
62 tHeader->Description() =
"Bigger on the inside";
64 LINFO(
mlog,
"Adding stream(s)" );
66 std::vector< unsigned > tStreamNums( tNStreams );
67 for(
unsigned iStream = 0; iStream < tNStreams; ++iStream )
69 std::stringstream tStr;
70 tStr <<
"Channel " << iStream;
72 tStreamNums[ iStream ] = tSingleStreamNum;
75 tWriteTest->WriteHeader();
77 LINFO(
mlog,
"Wrote header:\n" << *tHeader );
80 LINFO(
mlog,
"Creating fake data array" );
82 unsigned tNBytes = tArraySize * tDataTypeSize * tSampleSize;
83 std::vector< byte_type > tDataMaster( tNBytes );
86 for(
unsigned iBin = 0; iBin < tArraySize; ++iBin )
88 tDMWriter.
set_at( 42, iBin );
92 LINFO(
mlog,
"Getting stream pointers" );
94 std::vector< M3Stream* > tStreams( tNStreams );
95 std::vector< byte_type* > tStreamData( tNStreams );
96 for(
unsigned iStream = 0; iStream < tNStreams; ++iStream )
98 tStreams[ iStream ] = tWriteTest->GetStream( tStreamNums[ iStream ] );
99 tStreamData[ iStream ] = tStreams[ iStream ]->GetStreamRecord()->GetData();
102 LINFO(
mlog,
"Writing data" );
105 std::mutex tRunMutex;
106 std::condition_variable tRunRelease;
108 std::vector< std::thread > tThreads;
109 std::atomic< unsigned > tNThreadsReady( 0 );
111 for(
unsigned iStream = 0; iStream < tNStreams; ++iStream )
113 tThreads.push_back( std::thread( [&, iStream]{
115 bool tIsNewAcq =
true;
116 std::unique_lock< std::mutex > tRunLock( tRunMutex );
118 tRunRelease.wait( tRunLock );
119 for(
unsigned iRecord = 0; iRecord < tNRecords; ++iRecord )
121 ::memcpy( tStreamData[ iStream ], tDataMaster.data(), tNBytes );
122 if( ! tStreams[ iStream ]->WriteRecord( tIsNewAcq ) )
124 LERROR(
mlog,
"Unable to write record <" << iRecord <<
"> for stream <" << iStream <<
">" );
134 LINFO(
mlog,
"Waiting for threads to be ready" );
135 while( tNThreadsReady.load() != tNStreams )
137 std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
139 std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
141 LINFO(
mlog,
"Releasing threads" );
142 tRunRelease.notify_all();
144 for(
unsigned iStream = 0; iStream < tNStreams; ++iStream )
146 tThreads[ iStream ].join();
149 LINFO(
mlog,
"Closing file" );
151 tWriteTest->FinishWriting();
153 LINFO(
mlog,
"Test finished" );
156 catch( std::exception& e )
158 LERROR(
mlog,
"Exception thrown during write-speed test:\n" << e.what() );
162 return RETURN_SUCCESS;
static const uint32_t sDigitizedUS
void set_at(SetType value, unsigned index)
static scarab::logger mlog("M3Header")
int main(int argc, char **argv)
Interface class for a variety of data types.
static const uint32_t sBitsAlignedLeft
static Monarch3 * OpenForWriting(const std::string &filename)