Monarch  v3.8.2
Project 8 Data File Format Library
M3Stream.cc
Go to the documentation of this file.
1 /*
2  * M3Stream.cc
3  *
4  * Created on: Dec 26, 2014
5  * Author: nsoblath
6  */
7 
8 #define M3_API_EXPORTS
9 
10 #include "M3Stream.hh"
11 
12 #include "M3IToA.hh"
13 #include "logger.hh"
14 
15 #include <cstdlib> // for abs
16 
17 /* Notes on reading:
18  *
19  * During reading, both fNRecordsInAcq and fRecordsCount are used, as well as both fAcquisitionId and fNAcquisitions;
20  * All four variables will always be valid.
21  *
22  */
23 
24 /* Notes on writing:
25  *
26  * As writing progresses, fRecordCountInAcq is incremented, and fNRecordsInAcq is ignored.
27  * When the acquisition is finalized with FinalizeCurrentAcq(), fNRecordsInAcq is updated to be fRecordCountInAcq + 1.
28  * Therefore fRecordCountInAcq is only valid for the last completed acquisition.
29  *
30  * As writing progresses, fAcquisitionId is incremented and fNAcquisitions is ignored.
31  * When the stream is finalized with FinalizeStream(), fNAcquisitions is updated to be fAcquisitionId + 1.
32  * Therefore fNAcquisitions is only valid after the stream is finalized.
33  *
34  */
35 
36 namespace monarch3
37 {
38  LOGGER( mlog, "M3Stream" );
39 
40  M3Stream::M3Stream( const M3StreamHeader& aHeader, HAS_GRP_IFC* aH5StreamsLoc, uint32_t aAccessFormat ) :
41  fMode( kRead ),
42  fDoReadRecord( NULL ),
43  fDoWriteRecord( NULL ),
44  fIsInitialized( false ),
45  fRecordsAccessed( false ),
46  fDataTypeSize( aHeader.GetDataTypeSize() ),
47  fSampleSize( aHeader.GetSampleSize() ),
48  fStrRecNBytes( aHeader.GetNChannels() * aHeader.GetRecordSize() * aHeader.GetSampleSize() * aHeader.GetDataTypeSize() ),
49  fStrRecSize( aHeader.GetNChannels() * aHeader.GetRecordSize() ),
50  fChanRecNBytes( aHeader.GetRecordSize() * aHeader.GetSampleSize() * aHeader.GetDataTypeSize() ),
51  fChanRecSize( aHeader.GetRecordSize() ),
52  fChanRecLength( (double)aHeader.GetRecordSize() / ((double)aHeader.GetAcquisitionRate() * 1.e-3) ),
53  fStreamRecord(),
54  fNChannels( aHeader.GetNChannels() ),
55  fChannelRecords( new M3Record[ aHeader.GetNChannels() ] ),
56  fNAcquisitions( 0 ),
57  fAcquisitionId( 0 ),
58  fRecordCountInAcq( 0 ),
59  fNRecordsInAcq( 0 ),
60  fAcqFirstRecTime( 0 ),
61  fAcqFirstRecId( 0 ),
62  fAcqFirstRecTimes( NULL ),
63  fAcqFirstRecIds( NULL ),
64  fDataInterleaved( aHeader.GetChannelFormat() == sInterleaved ),
65  fAccessFormat( aAccessFormat ),
66  fRecordIndex(),
67  fRecordCountInFile( 0 ),
68  fNRecordsInFile( 0 ),
69  fFirstRecordInFile( 0 ),
70  fH5StreamParentLoc( new H5::Group( aH5StreamsLoc->openGroup( aHeader.GetLabel() ) ) ),
71  fH5AcqLoc( NULL ),
72  fH5CurrentAcqDataSet( NULL ),
73  fH5DataSpaceUser( NULL ),
74  fMutexPtr( new std::mutex() )
75  {
76  LDEBUG( mlog, "Creating stream for <" << aHeader.GetLabel() << ">" );
77 
78  if( aHeader.GetDataFormat() == sDigitizedUS )
79  {
80  switch( fDataTypeSize )
81  {
82  case 1:
83  fDataTypeInFile = H5::PredType::STD_U8LE;
84  fDataTypeUser = H5::PredType::NATIVE_UINT8;
85  break;
86  case 2:
87  fDataTypeInFile = H5::PredType::STD_U16LE;
88  fDataTypeUser = H5::PredType::NATIVE_UINT16;
89  break;
90  case 4:
91  fDataTypeInFile = H5::PredType::STD_U32LE;
92  fDataTypeUser = H5::PredType::NATIVE_UINT32;
93  break;
94  case 8:
95  fDataTypeInFile = H5::PredType::STD_U64LE;
96  fDataTypeUser = H5::PredType::NATIVE_UINT64;
97  break;
98  default:
99  throw M3Exception() << "Unknown unsigned integer data type size: " << fDataTypeSize;
100  }
101  }
102  else if( aHeader.GetDataFormat() == sDigitizedS )
103  {
104  switch( fDataTypeSize )
105  {
106  case 1:
107  fDataTypeInFile = H5::PredType::STD_I8LE;
108  fDataTypeUser = H5::PredType::NATIVE_INT8;
109  break;
110  case 2:
111  fDataTypeInFile = H5::PredType::STD_I16LE;
112  fDataTypeUser = H5::PredType::NATIVE_INT16;
113  break;
114  case 4:
115  fDataTypeInFile = H5::PredType::STD_I32LE;
116  fDataTypeUser = H5::PredType::NATIVE_INT32;
117  break;
118  case 8:
119  fDataTypeInFile = H5::PredType::STD_I64LE;
120  fDataTypeUser = H5::PredType::NATIVE_INT64;
121  break;
122  default:
123  throw M3Exception() << "Unknown signed integer data type size: " << fDataTypeSize;
124  }
125  }
126  else // aHeader.GetDataFormat() == sAnalog
127  {
128  switch( fDataTypeSize )
129  {
130  case 4:
131  fDataTypeInFile = H5::PredType::IEEE_F32LE;
132  fDataTypeUser = H5::PredType::NATIVE_FLOAT;
133  break;
134  case 8:
135  fDataTypeInFile = H5::PredType::IEEE_F64LE ;
136  fDataTypeUser = H5::PredType::NATIVE_DOUBLE;
137  break;
138  default:
139  throw M3Exception() << "Unknown floating-point data type size: " << fDataTypeSize;
140  }
141  }
142 
143  // variables to store the HDF5 error printing state
144  H5E_auto2_t tAutoPrintFunc;
145  void* tClientData;
146 
147  // Determine if we're in read or write mode
148  // and get/create the acquisitions group
149  // Nested exceptions are used so that the outer try block can be used to determine whether we're reading or writing
150  try
151  {
152  // turn off HDF5 error printing because the throwing of an exception here means we're writing instead of reading
153  H5::Exception::getAutoPrint( tAutoPrintFunc, &tClientData );
154  H5::Exception::dontPrint();
155 
156  fH5AcqLoc = new H5::Group( fH5StreamParentLoc->openGroup( "acquisitions" ) );
157  LDEBUG( mlog, "Opened acquisition group in <read> mode" );
158 
159  // turn HDF5 error printing back on
160  H5::Exception::setAutoPrint( tAutoPrintFunc, tClientData );
161 
162  try
163  {
164  H5::Attribute tAttrNAcq( fH5StreamParentLoc->openAttribute( "n_acquisitions" ) );
165  tAttrNAcq.read( tAttrNAcq.getDataType(), &fNAcquisitions );
166  H5::Attribute tAttrNRec( fH5StreamParentLoc->openAttribute( "n_records" ) );
167  tAttrNRec.read( tAttrNRec.getDataType(), &fNRecordsInFile );
168  BuildIndex();
169  }
170  catch( H5::Exception& )
171  {
172  throw M3Exception() << "Acquisitions group is not properly setup for reading\n";
173  }
174 
175  LDEBUG( mlog, "Number of acquisitions found: " << fNAcquisitions << "; Number of records found: " << fNRecordsInFile );
176  fMode = kRead;
177  }
178  catch( H5::Exception& )
179  {
180  // if we ended up here, the acquisitions group doesn't exist, so we must be in write mode
181 
182  // turn HDF5 error printing back on
183  H5::Exception::setAutoPrint( tAutoPrintFunc, tClientData );
184 
185  try
186  {
187  fH5AcqLoc = new H5::Group( fH5StreamParentLoc->createGroup( "acquisitions" ) );
188  LDEBUG( mlog, "Opened acquisition group in <write> mode" );
189  fMode = kWrite;
190  }
191  catch( H5::Exception& )
192  {
193  throw M3Exception() << "Unable to open new acquisitions group for writing\n";
194  }
195  }
196 
197  Initialize();
198  }
199 
201  {
202  delete fH5DataSpaceUser; fH5DataSpaceUser = NULL;
204  delete fH5AcqLoc; fH5AcqLoc = NULL;
205  delete fH5StreamParentLoc; fH5StreamParentLoc = NULL;
206 
207  delete [] fChannelRecords;
208  }
209 
210  void M3Stream::Initialize() const
211  {
212  LDEBUG( mlog, "Initializing stream" );
213  fIsInitialized = false;
214 
215  // The case where the access format is separate, but the data in the file is interleaved is special.
216  // In this case, the stream record memory is not used.
217  // Reading and writing is done directly from the channel records using HDF5's interleaving capabilities.
219  {
220  // no memory is allocated for the stream record
222 
223  // allocate memory for each channel record
224  for( unsigned iChan = 0; iChan < fNChannels; ++iChan )
225  {
227  }
228 
229  // set the read/write functions to the special versions
232 
233  // Arrays for HDF5 file reading/writing
235  fStrMaxDataDims[ 0 ] = H5S_UNLIMITED; fStrMaxDataDims[ 1 ] = fStrRecSize * fSampleSize;
238  fDataOffset[ 0 ] = 0; fDataOffset[ 1 ] = 0;
239  fDataStride[ 0 ] = 1; fDataStride[ 1 ] = fNChannels;
240  fDataBlock[ 0 ] = 1; fDataBlock[ 1 ] = fSampleSize;
241  /*
242  std::cout << "str data dims: " << fStrDataDims[0] << " " << fStrDataDims[1] << std::endl;
243  std::cout << "str max data dims: " << fStrMaxDataDims[0] << " " << fStrMaxDataDims[1] << std::endl;
244  std::cout << "str data chunk dims: " << fStrDataChunkDims[0] << " " << fStrDataChunkDims[1] << std::endl;
245  std::cout << "str data dims 1 rec: " << fDataDims1Rec[0] << " " << fDataDims1Rec[1] << std::endl;
246  std::cout << "str data offset: " << fDataOffset[0] << " " << fDataOffset[1] << std::endl;
247  std::cout << "str data stride: " << fDataStride[0] << " " << fDataStride[1] << std::endl;
248  std::cout << "str data block: " << fDataBlock[0] << " " << fDataBlock[1] << std::endl;
249  */
250 
251  // HDF5 object initialization
252  delete fH5DataSpaceUser;
253  fH5DataSpaceUser = new H5::DataSpace( N_DATA_DIMS, fDataDims1Rec, NULL );
254 
255  fIsInitialized = true;
256  return;
257  }
258 
259  // allocate stream record memory
261 
262  // channel records point to portions of the stream record and do not own their own data
263  byte_type* tChanDataPtr = fStreamRecord.GetData();
264  for( unsigned iChan = 0; iChan < fNChannels; ++iChan )
265  {
267  }
268 
269  // set the read/write functions to the general versions
272 
273  // Arrays for HDF5 file reading/writing
275  fStrMaxDataDims[ 0 ] = H5S_UNLIMITED; fStrMaxDataDims[ 1 ] = fStrRecSize * fSampleSize;
278  fDataOffset[ 0 ] = 0; fDataOffset[ 1 ] = 0;
279  fDataStride[ 0 ] = 1; fDataStride[ 1 ] = fSampleSize;
280  fDataBlock[ 0 ] = 1; fDataBlock[ 1 ] = fStrRecSize * fSampleSize;
281  /*
282  std::cout << "str data dims: " << fStrDataDims[0] << " " << fStrDataDims[1] << std::endl;
283  std::cout << "str max data dims: " << fStrMaxDataDims[0] << " " << fStrMaxDataDims[1] << std::endl;
284  std::cout << "str data chunk dims: " << fStrDataChunkDims[0] << " " << fStrDataChunkDims[1] << std::endl;
285  std::cout << "str data dims 1 rec: " << fDataDims1Rec[0] << " " << fDataDims1Rec[1] << std::endl;
286  std::cout << "str data offset: " << fDataOffset[0] << " " << fDataOffset[1] << std::endl;
287  std::cout << "str data stride: " << fDataStride[0] << " " << fDataStride[1] << std::endl;
288  std::cout << "str data block: " << fDataBlock[0] << " " << fDataBlock[1] << std::endl;
289  */
290 
291  // HDF5 object initialization
292  delete fH5DataSpaceUser;
293  fH5DataSpaceUser = new H5::DataSpace( N_DATA_DIMS, fDataDims1Rec, NULL );
294 
295  fIsInitialized = true;
296  return;
297  }
298 
300  {
301  return &fStreamRecord;
302  }
303 
304  const M3Record* M3Stream::GetChannelRecord( unsigned aChannel ) const
305  {
306  if( aChannel < fNChannels )
307  {
308  return &(fChannelRecords[ aChannel ]);
309  }
310  throw M3Exception() << "Channel <" << aChannel << "> requested; only " << fNChannels << " in this stream.";
311  }
312 
313  bool M3Stream::ReadRecord( int anOffset, bool aIfNewAcqStartAtFirstRec ) const
314  {
315  if( ! fIsInitialized ) Initialize();
316 
317  std::unique_lock< std::mutex >( *fMutexPtr.get() );
318 
319  // anOffset should not move us forward if this is the very first record read in the file (fRecordsAccessed == false)
320  // Otherwise anOffset should be incremented to 1 to move us forward appropriately (fRecordsAccessed == true)
321  anOffset += (int)fRecordsAccessed;
322 
323  LDEBUG( mlog, "Before moving: Record count in file = " << fRecordCountInFile << "; Record ID (in acquisition) = " << fRecordCountInAcq );
324 
325  if( ( anOffset < 0 && (unsigned)abs( anOffset ) > fRecordCountInFile ) ||
326  ( anOffset > 0 && fRecordCountInFile + anOffset >= fNRecordsInFile ) ||
327  ( anOffset == 0 && fNRecordsInFile == 0 ))
328  {
329  // either requested to go back before the beginning of the file, or past the end
330  LDEBUG( mlog, "Requested offset would move is out of range for the file" );
331  return false;
332  }
333 
335  unsigned nextAcq = fRecordIndex.at( fRecordCountInFile ).first;
337  LDEBUG( mlog, "After offset calculation: Record count in file = " << fRecordCountInFile << "; Record ID (in acquisition) = " << fRecordCountInAcq );
338 
339  try
340  {
341  bool tIsNewAcq = false;
342  if( nextAcq != fAcquisitionId || ! fRecordsAccessed )
343  {
344  // we are going to a new acquisition
345 
346  // check if we need to correct our position in the new acquisition back to the beginning of the acquisition
347  if( aIfNewAcqStartAtFirstRec && fRecordCountInAcq != 0 )
348  {
350  // make sure the record correction ended up in the same new acquisition
351  if( fRecordIndex.at( fRecordCountInFile ).first != nextAcq )
352  {
353  throw M3Exception() << "Tried to start at the beginning of the new acquisition, but ended up in a different acquisition: " << fRecordIndex.at( fRecordCountInFile ).first << " != " << nextAcq;
354  }
355  fRecordCountInAcq = 0;
356  LDEBUG( mlog, "After offset calc + new acq correction: Record count in file = " << fRecordCountInFile << "; Record ID (in acquisition) = " << fRecordCountInAcq );
357  }
358 
359  tIsNewAcq = true;
360  fAcquisitionId = nextAcq;
361  delete fH5CurrentAcqDataSet;
363  fH5CurrentAcqDataSet = new H5::DataSet( fH5AcqLoc->openDataSet( fAcqNameBuffer ) );
364  H5::Attribute tAttrNRIA( fH5CurrentAcqDataSet->openAttribute( "n_records" ) );
365  tAttrNRIA.read( tAttrNRIA.getDataType(), &fNRecordsInAcq );
366  }
367 
368  LDEBUG( mlog, "Going to record: record count in file: " << fRecordCountInFile << " -- acquisition: " << nextAcq << " -- record in acquisition: " << fRecordCountInAcq );
369 
371 
372  (this->*fDoReadRecord)( tIsNewAcq );
373 
374  // can now update the first record in the file
375  if( ! fRecordsAccessed )
376  {
377  fRecordsAccessed = true;
379  LDEBUG( mlog, "First record in file: " << fFirstRecordInFile );
380  }
381  }
382  catch( H5::Exception& e )
383  {
384  throw M3Exception() << "HDF5 error while reading a record:\n\t" << e.getCDetailMsg() << " (function: " << e.getFuncName() << ")";
385  }
386 
387  return true;
388  }
389 
390  void M3Stream::Close() const
391  {
392  //LDEBUG( mlog, "const M3Stream::Close()" );
393 
394  delete fH5DataSpaceUser; fH5DataSpaceUser = NULL;
396  delete fH5AcqLoc; fH5AcqLoc = NULL;
397  delete fH5StreamParentLoc; fH5StreamParentLoc = NULL;
398 
399  return;
400  }
401 
402 
403  M3Record* M3Stream::GetChannelRecord( unsigned aChannel )
404  {
405  if( aChannel < fNChannels )
406  {
407  return &(fChannelRecords[ aChannel ]);
408  }
409  throw M3Exception() << "Channel <" << aChannel << "> requested; only " << fNChannels << " in this stream.";
410  }
411 
412  bool M3Stream::WriteRecord( bool aIsNewAcquisition )
413  {
414  // note: fRecordCountInAcq is used to keep track of the number of records written in each acquisition;
415  // fNRecordsInAcq is only valid for the last completed acquisition.
416 
417  if( ! fIsInitialized ) Initialize();
418  if( ! fRecordsAccessed ) aIsNewAcquisition = true;
419 
420  try
421  {
422  std::unique_lock< std::mutex >( *fMutexPtr.get() );
423 
424  if( aIsNewAcquisition )
425  {
427 
429  else fRecordsAccessed = true;
430 
431  // Setup the new dataset
432  fStrDataDims[ 0 ] = 1;
433  H5::DSetCreatPropList tPropList;
434  tPropList.setChunk( N_DATA_DIMS, fStrDataChunkDims );
435 
437  fH5CurrentAcqDataSet = new H5::DataSet( fH5AcqLoc->createDataSet( fAcqNameBuffer, fDataTypeInFile, H5::DataSpace( N_DATA_DIMS, fStrDataDims, fStrMaxDataDims ), tPropList ) );
438  }
439  else
440  {
441  // Extend the current dataset
442  fStrDataDims[ 0 ] = fStrDataDims[ 0 ] + 1;
444  }
445 
446  LTRACE( mlog, "Writing acq. " << fAcquisitionId << ", record " << fRecordCountInAcq );
447 
449 
450  (this->*fDoWriteRecord)( aIsNewAcquisition );
451 
454  return true;
455  }
456  catch( H5::Exception& e )
457  {
458  LWARN( mlog, "DIAGNOSTIC: id of fH5CurrentAcqDataSet: " << fH5CurrentAcqDataSet->getId() );
459  LWARN( mlog, "DIAGNOSTIC: class name: " << fH5CurrentAcqDataSet->fromClass() );
460  H5D_space_status_t t_status;
461  fH5CurrentAcqDataSet->getSpaceStatus( t_status );
462  LWARN( mlog, "DIAGNOSTIC: offset: " << fH5CurrentAcqDataSet->getOffset() << " space status: " << t_status << " storage size: " << fH5CurrentAcqDataSet->getStorageSize() << " in mem data size: " << fH5CurrentAcqDataSet->getInMemDataSize() );
463  throw M3Exception() << "HDF5 error while writing a record:\n\t" << e.getCDetailMsg() << " (function: " << e.getFuncName() << ")";
464  }
465  catch( std::exception& e )
466  {
467  throw M3Exception() << e.what();
468  }
469 
470  return false;
471  }
472 
474  {
475  //LDEBUG( mlog, "non-const M3Stream::Close()" );
476  FinalizeStream();
477 
478  delete fH5DataSpaceUser; fH5DataSpaceUser = NULL;
480  delete fH5AcqLoc; fH5AcqLoc = NULL;
481  delete fH5StreamParentLoc; fH5StreamParentLoc = NULL;
482 
483  return;
484  }
485 
486  void M3Stream::SetAccessFormat( uint32_t aFormat ) const
487  {
488  fAccessFormat = aFormat;
489  fIsInitialized = false;
490  return;
491  }
492 
493  void M3Stream::ReadRecordInterleavedToSeparate( bool aIsNewAcquisition ) const
494  {
495  if( aIsNewAcquisition )
496  {
497  try
498  {
499  delete [] fAcqFirstRecTimes;
501  H5::Attribute tAttrAFRT( fH5CurrentAcqDataSet->openAttribute( "first_record_time" ) );
502  tAttrAFRT.read( tAttrAFRT.getDataType(), fAcqFirstRecTimes );
503 
504  delete [] fAcqFirstRecIds;
506  H5::Attribute tAttrAFRI( fH5CurrentAcqDataSet->openAttribute( "first_record_id" ) );
507  tAttrAFRI.read( tAttrAFRI.getDataType(), fAcqFirstRecIds );
508  }
509  catch( H5::Exception& )
510  {
511  // Backwards compatibility with older files that don't have first_record_time and first_record_id
512  // Times increment by the record length starting at 0, but ID increments starting at 0
513  for( unsigned iChan = 0; iChan < fNChannels; ++iChan )
514  {
515  fAcqFirstRecTimes[ iChan ] = 0;
516  fAcqFirstRecIds[ iChan ] = 0;
517  }
518  }
521  }
522 
523  H5::DataSpace tDataSpaceInFile = fH5CurrentAcqDataSet->getSpace();
524  for( unsigned iChan = 0; iChan < fNChannels; ++iChan )
525  {
526  fDataOffset[ 1 ] = iChan;
527  tDataSpaceInFile.selectHyperslab( H5S_SELECT_SET, fDataDims1Rec, fDataOffset, fDataStride, fDataBlock );
528  fH5CurrentAcqDataSet->read( fChannelRecords[ iChan ].GetData(), fDataTypeUser, *fH5DataSpaceUser, tDataSpaceInFile );
531  }
532  return;
533  }
534 
535  void M3Stream::ReadRecordAsIs( bool aIsNewAcquisition ) const
536  {
537  if( aIsNewAcquisition )
538  {
539  try
540  {
541  H5::Attribute tAttrAFRT( fH5CurrentAcqDataSet->openAttribute( "first_record_time" ) );
542  tAttrAFRT.read( tAttrAFRT.getDataType(), &fAcqFirstRecTime );
543  H5::Attribute tAttrAFRI( fH5CurrentAcqDataSet->openAttribute( "first_record_id" ) );
544  tAttrAFRI.read( tAttrAFRI.getDataType(), &fAcqFirstRecId );
545  }
546  catch( H5::Exception& )
547  {
548  // Backwards compatibility with older files that don't have first_record_time and first_record_id
549  // Times increment by the record length starting at 0, but ID increments starting at 0
550  fAcqFirstRecTime = 0;
551  fAcqFirstRecId = 0;
552  for( unsigned iChan = 0; iChan < fNChannels; ++iChan )
553  {
554  fAcqFirstRecTimes[ iChan ] = 0;
555  fAcqFirstRecIds[ iChan ] = 0;
556  }
557  }
558  }
559 
560  H5::DataSpace tDataSpaceInFile = fH5CurrentAcqDataSet->getSpace();
561  tDataSpaceInFile.selectHyperslab( H5S_SELECT_SET, fDataDims1Rec, fDataOffset );
565  for( unsigned iChan = 0; iChan < fNChannels; ++iChan )
566  {
569  }
570  return;
571  }
572 
573  void M3Stream::WriteRecordSeparateToInterleaved( bool aIsNewAcquisition )
574  {
575  H5::DataSpace tDataSpaceInFile = fH5CurrentAcqDataSet->getSpace();
576  for( unsigned iChan = 0; iChan < fNChannels; ++iChan )
577  {
578  fDataOffset[ 1 ] = iChan;
579  tDataSpaceInFile.selectHyperslab( H5S_SELECT_SET, fDataDims1Rec, fDataOffset, fDataStride, fDataBlock );
580  //std::cout << "about to write separate to interleaved " << fDataTypeUser.fromClass() << std::endl;
581  fH5CurrentAcqDataSet->write( fChannelRecords[ iChan ].GetData(), fDataTypeUser, *fH5DataSpaceUser, tDataSpaceInFile );
582  }
583  if( aIsNewAcquisition )
584  {
585  TimeType* tTimes = new TimeType[ fNChannels ];
586  RecordIdType* tIds = new RecordIdType[ fNChannels ];
587  for( unsigned iChan = 0; iChan < fNChannels; ++iChan )
588  {
589  tTimes[ iChan ] = fChannelRecords[ iChan ].GetTime();
590  tIds[ iChan ] = fChannelRecords[ iChan ].GetRecordId();
591  }
592  fH5CurrentAcqDataSet->createAttribute( "first_record_time", MH5Type< TimeType >::H5(), H5::DataSpace( H5S_SCALAR ) ).write( MH5Type< TimeType >::Native(), tTimes );
593  fH5CurrentAcqDataSet->createAttribute( "first_record_id", MH5Type< RecordIdType >::H5(), H5::DataSpace( H5S_SCALAR ) ).write( MH5Type< RecordIdType >::Native(), tIds );
594  delete [] tTimes;
595  delete [] tIds;
596  }
597  return;
598  }
599 
600  void M3Stream::WriteRecordAsIs( bool aIsNewAcquisition )
601  {
602  H5::DataSpace tDataSpaceInFile = fH5CurrentAcqDataSet->getSpace();
603  tDataSpaceInFile.selectHyperslab( H5S_SELECT_SET, fDataDims1Rec, fDataOffset );
605  if( aIsNewAcquisition )
606  {
607  TimeType tTime = fStreamRecord.GetTime();
609  fH5CurrentAcqDataSet->createAttribute( "first_record_time", MH5Type< TimeType >::H5(), H5::DataSpace( H5S_SCALAR ) ).write( MH5Type< TimeType >::Native(), &tTime );
610  fH5CurrentAcqDataSet->createAttribute( "first_record_id", MH5Type< RecordIdType >::H5(), H5::DataSpace( H5S_SCALAR ) ).write( MH5Type< RecordIdType >::Native(), &tId );
611  }
612  return;
613  }
614 
615  void M3Stream::BuildIndex() const
616  {
617  fRecordIndex.resize( fNRecordsInFile );
618  unsigned tNRecInAcq;
619  unsigned iRecInFile = 0;
620  for( unsigned iAcq = 0; iAcq < fNAcquisitions; ++iAcq )
621  {
622  u32toa( iAcq, fAcqNameBuffer );
623  H5::Attribute tAttr( fH5AcqLoc->openDataSet( fAcqNameBuffer ).openAttribute( "n_records" ) );
624  tAttr.read( tAttr.getDataType(), &tNRecInAcq );
625  LDEBUG( mlog, "Acquisition <" << fAcqNameBuffer << "> has " << tNRecInAcq << " records" );
626  for( unsigned iRecInAcq = 0; iRecInAcq < tNRecInAcq; ++iRecInAcq )
627  {
628  fRecordIndex.at( iRecInFile ).first = iAcq;
629  fRecordIndex.at( iRecInFile ).second = iRecInAcq;
630  LTRACE( mlog, "Record index: " << iRecInFile << " -- " << iAcq << " -- " << iRecInAcq );
631  ++iRecInFile;
632  }
633  }
634  return;
635  }
636 
638  {
639  if( fH5CurrentAcqDataSet == NULL ) return;
640 
642 
643  fH5CurrentAcqDataSet->createAttribute( "n_records", MH5Type< unsigned >::H5(), H5::DataSpace( H5S_SCALAR ) ).write( MH5Type< unsigned >::Native(), &fNRecordsInAcq );
644  LDEBUG( mlog, "Finalizing acq. " << fAcquisitionId << " with " << fNRecordsInAcq << " records" );
645 
646  fRecordCountInAcq = 0;
647  delete fH5CurrentAcqDataSet;
648  fH5CurrentAcqDataSet = NULL;
649 
650  return;
651  }
652 
654  {
656 
657  if( fH5AcqLoc == NULL ) return;
658 
659  fNAcquisitions = ( fAcquisitionId + 1 ) * (unsigned)fRecordsAccessed;
660  fH5StreamParentLoc->openAttribute( "n_acquisitions" ).write( MH5Type< unsigned >::Native(), &fNAcquisitions );
661  fH5StreamParentLoc->openAttribute( "n_records" ).write( MH5Type< unsigned >::Native(), &fRecordCountInFile );
662  LDEBUG( mlog, "Finalizing stream with " << fNAcquisitions << " acquisitions and " << fRecordCountInFile << " records" );
663 
664  return;
665  }
666 
667 } /* namespace monarch */
RecordIdType fAcqFirstRecId
Definition: M3Stream.hh:175
unsigned fDataTypeSize
Definition: M3Stream.hh:154
const M3Record * GetStreamRecord() const
Get the pointer to the stream record.
Definition: M3Stream.cc:299
void Close() const
Close the file.
Definition: M3Stream.cc:390
DoWriteRecordFunc fDoWriteRecord
Definition: M3Stream.hh:149
static const uint32_t sSeparate
Definition: M3Constants.hh:51
RecordIdType * fAcqFirstRecIds
Definition: M3Stream.hh:177
unsigned fNChannels
Definition: M3Stream.hh:166
hsize_t fDataOffset[N_DATA_DIMS]
Definition: M3Stream.hh:209
TimeType * fAcqFirstRecTimes
Definition: M3Stream.hh:176
static const uint32_t sDigitizedUS
Definition: M3Constants.hh:38
unsigned fNRecordsInFile
Definition: M3Stream.hh:184
H5::Group * fH5StreamParentLoc
Definition: M3Stream.hh:195
hsize_t fDataStride[N_DATA_DIMS]
Definition: M3Stream.hh:210
Contains the information that makes up a record.
Definition: M3Record.hh:35
uint32_t fAccessFormat
Definition: M3Stream.hh:180
M3Stream(const M3StreamHeader &aHeader, HAS_GRP_IFC *aH5StreamParentLoc, uint32_t aAccessFormat=sSeparate)
Definition: M3Stream.cc:40
DoReadRecordFunc fDoReadRecord
Definition: M3Stream.hh:145
AcquisitionIdType fAcquisitionId
Definition: M3Stream.hh:170
void BuildIndex() const
Definition: M3Stream.cc:615
void FinalizeStream()
Definition: M3Stream.cc:653
STL namespace.
uint64_t fChanRecLength
Definition: M3Stream.hh:162
virtual const char * what() const
Definition: M3Exception.cc:34
hsize_t fDataBlock[N_DATA_DIMS]
Definition: M3Stream.hh:211
hsize_t fStrMaxDataDims[N_DATA_DIMS]
Definition: M3Stream.hh:206
unsigned fNRecordsInAcq
Definition: M3Stream.hh:173
void SetAccessFormat(uint32_t aFormat) const
Access format can be changed during read or write; must call Initialize() after this.
Definition: M3Stream.cc:486
void Initialize()
Allocate no memory for the record; data pointer is to NULL.
Definition: M3Record.cc:40
uint8_t byte_type
Definition: M3Types.hh:22
void ReadRecordInterleavedToSeparate(bool aIsNewAcquisition) const
Definition: M3Stream.cc:493
void ReadRecordAsIs(bool aIsNewAcquisition) const
Definition: M3Stream.cc:535
TimeType * GetTimePtr() const
Definition: M3Record.hh:100
unsigned fChanRecSize
Definition: M3Stream.hh:161
M3Record * fChannelRecords
Definition: M3Stream.hh:167
bool ReadRecord(int anOffset=0, bool aIfNewAcqStartAtFirstRec=true) const
Read the next record from the file.
Definition: M3Stream.cc:313
void SetRecordId(RecordIdType aId)
Definition: M3Record.hh:89
unsigned fSampleSize
Definition: M3Stream.hh:155
RecordIdType GetRecordId() const
Definition: M3Record.hh:79
static const uint32_t sDigitizedS
Definition: M3Constants.hh:39
unsigned fChanRecNBytes
Definition: M3Stream.hh:160
TimeType fAcqFirstRecTime
Definition: M3Stream.hh:174
void SetTime(TimeType aTime)
Definition: M3Record.hh:105
static const uint32_t sInterleaved
Specifies whether the data channels are interleaved or separate in a stream.
Definition: M3Constants.hh:50
uint64_t RecordIdType
Definition: M3Types.hh:25
M3Record fStreamRecord
Definition: M3Stream.hh:164
unsigned fStrRecNBytes
Definition: M3Stream.hh:157
const byte_type * GetData() const
Definition: M3Record.hh:111
void u32toa(uint32_t value, char *buffer)
Quickly convert a 32-bit unsigned integer to a char array (buffer should already be allocated) ...
Definition: M3IToA.cc:42
uint64_t TimeType
Definition: M3Types.hh:26
Specialized exception class for Monarch3.
Definition: M3Exception.hh:28
Single-stream header information.
Definition: M3Header.hh:33
static scarab::logger mlog("M3Header")
RecordIdType * GetRecordIdPtr() const
Definition: M3Record.hh:84
const M3Record * GetChannelRecord(unsigned aChannel) const
Get the pointer to a particular channel record.
Definition: M3Stream.cc:304
H5::Group * fH5AcqLoc
Definition: M3Stream.hh:196
unsigned fRecordCountInFile
Definition: M3Stream.hh:183
unsigned fNAcquisitions
Definition: M3Stream.hh:169
virtual ~M3Stream()
Definition: M3Stream.cc:200
unsigned fRecordCountInAcq
Definition: M3Stream.hh:172
H5::DataType fDataTypeUser
Definition: M3Stream.hh:202
void WriteRecordSeparateToInterleaved(bool aIsNewAcquisition)
Definition: M3Stream.cc:573
unsigned fFirstRecordInFile
Definition: M3Stream.hh:185
H5::DataSpace * fH5DataSpaceUser
Definition: M3Stream.hh:199
hsize_t fDataDims1Rec[N_DATA_DIMS]
Definition: M3Stream.hh:208
std::vector< std::pair< unsigned, unsigned > > fRecordIndex
Definition: M3Stream.hh:182
void WriteRecordAsIs(bool aIsNewAcquisition)
Definition: M3Stream.cc:600
unsigned fStrRecSize
Definition: M3Stream.hh:158
H5::DataType fDataTypeInFile
Definition: M3Stream.hh:201
bool WriteRecord(bool aIsNewAcquisition)
Write the record contents to the file.
Definition: M3Stream.cc:412
hsize_t fStrDataChunkDims[N_DATA_DIMS]
Definition: M3Stream.hh:207
void Initialize() const
Setup to read/write data (called in constructor; only call this if read/write parameters change durin...
Definition: M3Stream.cc:210
TimeType GetTime() const
Definition: M3Record.hh:95
char fAcqNameBuffer[10]
Definition: M3Stream.hh:193
mutex_ptr fMutexPtr
Definition: M3Stream.hh:213
hsize_t fStrDataDims[N_DATA_DIMS]
Definition: M3Stream.hh:205
H5::DataSet * fH5CurrentAcqDataSet
Definition: M3Stream.hh:197
void FinalizeCurrentAcq()
Definition: M3Stream.cc:637