Monarch  v3.8.2
Project 8 Data File Format Library
M3WriteSpeedTest.cc
Go to the documentation of this file.
1 /*
2  * Monarch3WriteSpeedTest.cc
3  *
4  * Created on: Dec 21, 2016
5  * Author: N. Oblath
6  *
7  * Description: Measures the speed of writing a specified amount of data to disk.
8  *
9  * Single- or multi-threaded operation; in single-threaded operation all streams are written sequentially for each record.
10  *
11  * Use: Monarch3WriteSpeedTest [options]
12  *
13  * Options:
14  * multithreaded=(true | false) -- default: false
15  * n-records=[unsigned int] -- default: 10000
16  * n-streams=[unsigned int] -- default: 1
17  * array-size=[unsigned int] -- default: 1024
18  * data-type-size=[unsigned int] -- default: 1
19  */
20 
21 #include "M3DataInterface.hh"
22 #include "M3Monarch.hh"
23 
24 #include "application.hh"
25 #include "logger.hh"
26 #include "param.hh"
27 
28 #include <boost/filesystem.hpp>
29 
30 #include <atomic>
31 #include <chrono>
32 #include <condition_variable>
33 #include <mutex>
34 #include <stdlib.h>
35 #include <string>
36 #include <thread>
37 #include <vector>
38 
39 using namespace monarch3;
40 
41 LOGGER( mlog, "M3WriteSpeedTest" );
42 
43 int main( int argc, char** argv )
44 {
45  scarab::main_app theMain;
46 
47  theMain.default_config().add( "multithreaded", new scarab::param_value( false ) );
48  theMain.default_config().add( "n-records", new scarab::param_value( 10000U ) );
49  theMain.default_config().add( "n-streams", new scarab::param_value( 1U ) );
50  theMain.default_config().add( "array-size", new scarab::param_value( 1024U ) );
51  theMain.default_config().add( "data-type-size", new scarab::param_value( 1U ) );
52 
53  theMain.add_config_option< std::string >( "Filename", "filename", "Test output filename" );
54  theMain.add_config_flag< bool >( "-m,--multithreaded", "multithreaded", "Use multithreaded write" );
55  theMain.add_config_option< unsigned >( "-n,--n-records", "n-records", "Number of records to write" );
56  theMain.add_config_option< unsigned >( "-N,--n-streams", "n-streams", "Number of streams to write" );
57  theMain.add_config_option< unsigned >( "-a,--array-size", "array-size", "Array size" );
58  theMain.add_config_option< unsigned >( "-d,--data-type-size", "data-type-size", "Data-type size" );
59 
60  CLI11_PARSE( theMain, argc, argv );
61 
62  try
63  {
64  bool tMultithreaded = theMain.primary_config()[ "multithreaded" ]().as_bool();
65  unsigned tNRecords = theMain.primary_config()[ "n-records" ]().as_uint();
66  unsigned tNStreams = theMain.primary_config()[ "n-streams" ]().as_uint();
67  unsigned tArraySize = theMain.primary_config()[ "array-size" ]().as_uint();
68  unsigned tSampleSize = 1; // currently not configurable
69  unsigned tDataTypeSize = theMain.primary_config()[ "data-type-size" ]().as_uint();
70 
71  double tMBToWrite = (double)(tNRecords * tNStreams * tArraySize * tSampleSize * tDataTypeSize) * 10.e-6;
72 
73  if( tNStreams == 0 )
74  {
75  LERROR( mlog, "Please specify a number of streams > 0" );
76  return RETURN_ERROR;
77  }
78 
79 
80  boost::filesystem::path tFilePath = boost::filesystem::temp_directory_path() / boost::filesystem::unique_path();
81  std::shared_ptr< Monarch3 > tWriteTest( Monarch3::OpenForWriting( tFilePath.native() ) );
82  LINFO( mlog, "Temp file is " << tFilePath );
83 
84  LINFO( mlog, "Preparing header" );
85  M3Header* tHeader = tWriteTest->GetHeader();
86  tHeader->Filename() = "tempfile";
87  tHeader->SetRunDuration( 8675309 );
88  tHeader->Timestamp( ) ="Stardate 33515";
89  tHeader->Description() = "Bigger on the inside";
90 
91  LINFO( mlog, "Adding stream(s)" );
92 
93  std::vector< unsigned > tStreamNums( tNStreams );
94  for( unsigned iStream = 0; iStream < tNStreams; ++iStream )
95  {
96  std::stringstream tStr;
97  tStr << "Channel " << iStream;
98  unsigned tSingleStreamNum = tHeader->AddStream( tStr.str(), 100, tArraySize, tSampleSize, tDataTypeSize, sDigitizedUS, tDataTypeSize * 8, sBitsAlignedLeft );
99  tStreamNums[ iStream ] = tSingleStreamNum;
100  }
101 
102  tWriteTest->WriteHeader();
103 
104  LINFO( mlog, "Wrote header:\n" << *tHeader );
105 
106 
107  LINFO( mlog, "Creating fake data array" );
108 
109  unsigned tNBytes = tArraySize * tDataTypeSize * tSampleSize;
110  std::vector< byte_type > tDataMaster( tNBytes );
111 
112  M3DataWriter< uint8_t > tDMWriter( tDataMaster.data(), tDataTypeSize, sDigitizedUS );
113  for( unsigned iBin = 0; iBin < tArraySize; ++iBin )
114  {
115  tDMWriter.set_at( 42, iBin );
116  }
117 
118 
119  LINFO( mlog, "Getting stream pointers" );
120 
121  std::vector< M3Stream* > tStreams( tNStreams );
122  std::vector< byte_type* > tStreamData( tNStreams );
123  for( unsigned iStream = 0; iStream < tNStreams; ++iStream )
124  {
125  tStreams[ iStream ] = tWriteTest->GetStream( tStreamNums[ iStream ] );
126  tStreamData[ iStream ] = tStreams[ iStream ]->GetStreamRecord()->GetData();
127  }
128 
129  LINFO( mlog, "Writing data" );
130 
131  using std::chrono::steady_clock;
132  using std::chrono::duration;
133  using std::chrono::duration_cast;
134 
135  if( tMultithreaded )
136  {
137 
138  std::mutex tRunMutex;
139  std::condition_variable tRunRelease;
140 
141  std::vector< std::thread > tThreads;
142  std::atomic< unsigned > tNThreadsReady( 0 );
143 
144  for( unsigned iStream = 0; iStream < tNStreams; ++iStream )
145  {
146  tThreads.push_back( std::thread( [&, iStream]{
147  //LDEBUG( mlog, "Starting thread " << iStream << "; waiting for start signal" );
148  bool tIsNewAcq = true;
149  std::unique_lock< std::mutex > tRunLock( tRunMutex );
150  tNThreadsReady++;
151  tRunRelease.wait( tRunLock );
152  for( unsigned iRecord = 0; iRecord < tNRecords; ++iRecord )
153  {
154  ::memcpy( tStreamData[ iStream ], tDataMaster.data(), tNBytes );
155  if( ! tStreams[ iStream ]->WriteRecord( tIsNewAcq ) )
156  {
157  LERROR( mlog, "Unable to write record <" << iRecord << "> for stream <" << iStream << ">" );
158  return;
159  }
160  tIsNewAcq = false;
161  }
162  //LDEBUG( mlog, "Thread " << iStream << " is finished" );
163  } ) );
164  }
165 
166  // Synchronize threads
167  LINFO( mlog, "Waiting for threads to be ready" );
168  while( tNThreadsReady.load() != tNStreams )
169  {
170  std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
171  }
172  std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
173 
174  LINFO( mlog, "Releasing threads" );
175  tRunRelease.notify_all();
176 
177  steady_clock::time_point tStartTime = steady_clock::now();
178 
179  for( unsigned iStream = 0; iStream < tNStreams; ++iStream )
180  {
181  tThreads[ iStream ].join();
182  }
183 
184  steady_clock::time_point tEndTime = steady_clock::now();
185 
186  duration< double > tDuration = duration_cast< duration< double > >( tEndTime - tStartTime );
187 
188  LINFO( mlog, "Processing time: " << tDuration.count() << " sec" );
189  LINFO( mlog, "Size of data written: " << tMBToWrite << " MB" );
190  LINFO( mlog, "Write speed: " << tMBToWrite / tDuration.count() << " MB/s" );
191 
192  }
193  else
194  {
195 
196  bool tIsNewAcq = true;
197 
198  steady_clock::time_point tStartTime = steady_clock::now();
199 
200  for( unsigned iRecord = 0; iRecord < tNRecords; ++iRecord )
201  {
202  for( unsigned iStream = 0; iStream < tNStreams; ++iStream )
203  {
204  ::memcpy( tStreamData[ iStream ], tDataMaster.data(), tNBytes );
205  if( ! tStreams[ iStream ]->WriteRecord( tIsNewAcq ) )
206  {
207  LERROR( mlog, "Unable to write record <" << iRecord << "> for stream <" << iStream << ">" );
208  return RETURN_ERROR;
209  }
210  }
211  tIsNewAcq = false;
212  }
213 
214  steady_clock::time_point tEndTime = steady_clock::now();
215 
216  duration< double > tDuration = duration_cast< duration< double > >( tEndTime - tStartTime );
217 
218  LINFO( mlog, "Processing time: " << tDuration.count() << " sec" );
219  LINFO( mlog, "Size of data written: " << tMBToWrite << " MB" );
220  LINFO( mlog, "Write speed: " << tMBToWrite / tDuration.count() << " MB/s" );
221 
222  }
223 
224  LINFO( mlog, "Closing file" );
225 
226  tWriteTest->FinishWriting();
227 
228  boost::filesystem::remove( tFilePath );
229 
230  LINFO( mlog, "Test finished" );
231 
232  }
233  catch( std::exception& e )
234  {
235  LERROR( mlog, "Exception thrown during write-speed test:\n" << e.what() );
236  return RETURN_ERROR;
237  }
238 
239  return RETURN_SUCCESS;
240 }
static const uint32_t sDigitizedUS
Definition: M3Constants.hh:38
Egg file header information.
Definition: M3Header.hh:152
void set_at(SetType value, unsigned index)
static scarab::logger mlog("M3Header")
Interface class for a variety of data types.
int main(int argc, char **argv)
static const uint32_t sBitsAlignedLeft
Definition: M3Constants.hh:44
static Monarch3 * OpenForWriting(const std::string &filename)
Definition: M3Monarch.cc:92
unsigned AddStream(const std::string &aSource, uint32_t anAcqRate, uint32_t aRecSize, uint32_t aSampleSize, uint32_t aDataTypeSize, uint32_t aDataFormat, uint32_t aBitDepth, uint32_t aBitAlignment, std::vector< unsigned > *aChanVec=NULL)
Definition: M3Header.cc:400