[Fawkes Git] branch/thofmann/syncpoint: 2 revs updated. (0.5.0-3141-gdd21195)

Till Hofmann hofmann at kbsg.rwth-aachen.de
Fri May 13 17:15:46 CEST 2016


Changes have been pushed for the project "Fawkes Robotics Software Framework".

Gitweb: http://git.fawkesrobotics.org/fawkes.git
Trac:   http://trac.fawkesrobotics.org

The branch, thofmann/syncpoint has been updated
        to  dd21195687d5e6f6d12cbd388b7ea04ce73b23b4 (commit)
       via  937f0ed6c45ad1ac81cb6762e0d8c96052ae10bd (commit)
      from  32bad7cca8e94fed513f40412ef9ec7586d686e2 (commit)

http://git.fawkesrobotics.org/fawkes.git/thofmann/syncpoint

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.

- *Log* ---------------------------------------------------------------
commit 937f0ed6c45ad1ac81cb6762e0d8c96052ae10bd
Author:     Till Hofmann <hofmann at kbsg.rwth-aachen.de>
AuthorDate: Fri May 13 17:10:37 2016 +0200
Commit:     Till Hofmann <hofmann at kbsg.rwth-aachen.de>
CommitDate: Fri May 13 17:15:15 2016 +0200

    syncpoint: release all waiters simultaneously if a timeout occurs
    
    Don't let all components run a wait with an individual timeout, but
    instead have one timeout for the syncpoint, and if a timeout occurs,
    release all current waiters. The timeout is set per syncpoint, either
    while creating the syncpoint or by calling wait with a specified
    timeout.

http://git.fawkesrobotics.org/fawkes.git/commit/937f0ed
http://trac.fawkesrobotics.org/changeset/937f0ed

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
commit dd21195687d5e6f6d12cbd388b7ea04ce73b23b4
Author:     Till Hofmann <hofmann at kbsg.rwth-aachen.de>
AuthorDate: Fri May 13 17:15:43 2016 +0200
Commit:     Till Hofmann <hofmann at kbsg.rwth-aachen.de>
CommitDate: Fri May 13 17:15:43 2016 +0200

    syncpoint: test that all waiters are released when a timeout occurs

http://git.fawkesrobotics.org/fawkes.git/commit/dd21195
http://trac.fawkesrobotics.org/changeset/dd21195

- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -


- *Summary* -----------------------------------------------------------
 src/libs/syncpoint/syncpoint.cpp            |   56 +++++++++++++++++++++------
 src/libs/syncpoint/syncpoint.h              |   14 +++++-
 src/libs/syncpoint/tests/test_syncpoint.cpp |   44 ++++++++++++++++++++-
 3 files changed, 97 insertions(+), 17 deletions(-)


- *Diffs* -------------------------------------------------------------

- *commit* 937f0ed6c45ad1ac81cb6762e0d8c96052ae10bd - - - - - - - - - -
Author:  Till Hofmann <hofmann at kbsg.rwth-aachen.de>
Date:    Fri May 13 17:10:37 2016 +0200
Subject: syncpoint: release all waiters simultaneously if a timeout occurs

 src/libs/syncpoint/syncpoint.cpp |   56 +++++++++++++++++++++++++++++--------
 src/libs/syncpoint/syncpoint.h   |   14 +++++++--
 2 files changed, 55 insertions(+), 15 deletions(-)

_Diff for modified files_:
diff --git a/src/libs/syncpoint/syncpoint.cpp b/src/libs/syncpoint/syncpoint.cpp
index dc16c07..f3d5d76 100644
--- a/src/libs/syncpoint/syncpoint.cpp
+++ b/src/libs/syncpoint/syncpoint.cpp
@@ -53,8 +53,13 @@ namespace fawkes {
  * @param identifier The identifier of the SyncPoint. This must be in absolute
  * path style, e.g. '/some/syncpoint'.
  * @param logger The logger to use for error messages.
+ * @param max_waittime_sec the maximum number of seconds to wait until a timeout
+ * is triggered
+ * @param max_waittime_nsec the maximum number of nanoseconds to wait until a
+ * timeout is triggered
  */
-SyncPoint::SyncPoint(string identifier, MultiLogger *logger)
+SyncPoint::SyncPoint(string identifier, MultiLogger *logger,
+  uint max_waittime_sec /* = 0 */, uint max_waittime_nsec /* = 0 */)
     : identifier_(identifier),
       emit_calls_(CircularBuffer<SyncPointCall>(1000)),
       wait_for_one_calls_(CircularBuffer<SyncPointCall>(1000)),
@@ -66,6 +71,10 @@ SyncPoint::SyncPoint(string identifier, MultiLogger *logger)
       cond_wait_for_one_(new WaitCondition(mutex_wait_for_one_)),
       mutex_wait_for_all_(new Mutex()),
       cond_wait_for_all_(new WaitCondition(mutex_wait_for_all_)),
+      wait_for_all_timer_running_(false),
+      wait_for_one_timer_running_(false),
+      max_waittime_sec_(max_waittime_sec),
+      max_waittime_nsec_(max_waittime_nsec),
       logger_(logger),
       last_emitter_reset_(Time(0l))
 {
@@ -207,7 +216,9 @@ SyncPoint::emit(const std::string & component, bool remove_from_pending)
  * Either wait until a single emitter has emitted the SyncPoint, or wait
  * until all registered emitters have emitted the SyncPoint.
  * If wait_sec != 0 or wait_nsec !=0, then only wait for
- * wait_sec + wait_nsec*10^-9 seconds.
+ * wait_sec + wait_nsec*10^-9 seconds and set the SyncPoint's maximum waiting
+ * time to the specified time (i.e., on any subsequent wait calls, wait for
+ * the specified time until a timeout is triggered).
  * If the maximal wait time has been exceeded, a warning is shown and the
  * SyncPoint is released.
  * @param component The identifier of the component waiting for the SyncPoint
@@ -230,16 +241,19 @@ SyncPoint::wait(const std::string & component,
   WaitCondition *cond;
   CircularBuffer<SyncPointCall> *calls;
   Mutex *mutex_cond;
+  bool *timer_running;
   // set watchers, cond and calls depending of the Wakeup type
   if (type == WAIT_FOR_ONE) {
     watchers = &watchers_wait_for_one_;
     cond = cond_wait_for_one_;
     mutex_cond = mutex_wait_for_one_;
+    timer_running = &wait_for_one_timer_running_;
     calls = &wait_for_one_calls_;
   } else if (type == WAIT_FOR_ALL) {
     watchers = &watchers_wait_for_all_;
     cond = cond_wait_for_all_;
     mutex_cond = mutex_wait_for_all_;
+    timer_running = &wait_for_all_timer_running_;
     calls = &wait_for_all_calls_;
   } else {
     throw SyncPointInvalidTypeException();
@@ -270,22 +284,41 @@ SyncPoint::wait(const std::string & component,
    */
   Time start;
   mutex_cond->lock();
-  pthread_cleanup_push(cleanup_mutex, mutex_cond);
   if (emit_locker_ == component) {
     mutex_next_wait_->unlock();
     emit_locker_ = "";
   }
-  ml.unlock();
   if (need_to_wait) {
-    if (!cond->reltimed_wait(wait_sec, wait_nsec)) {
+    if (*timer_running) {
+      ml.unlock();
+      pthread_cleanup_push(cleanup_mutex, mutex_cond);
+      cond->wait();
+      pthread_cleanup_pop(1);
+    } else {
+      *timer_running = true;
+      if (wait_sec != 0 || wait_nsec != 0) {
+        max_waittime_sec_ = wait_sec;
+        max_waittime_nsec_ = wait_nsec;
+      }
+      ml.unlock();
+      bool timeout;
+      pthread_cleanup_push(cleanup_mutex, mutex_cond);
+      timeout = !cond->reltimed_wait(max_waittime_sec_, max_waittime_nsec_);
+      pthread_cleanup_pop(1);
       ml.relock();
-      // wait failed, default
-      handle_default(component, type, wait_sec, wait_nsec);
+      *timer_running = false;
+      if (timeout) {
+        // wait failed, handle default
+        handle_default(component, type);
+        mutex_cond->lock();
+        cond->wake_all();
+        mutex_cond->unlock();
+      }
       ml.unlock();
     }
+  } else {
+    ml.unlock();
   }
-  mutex_cond->unlock();
-  pthread_cleanup_pop(0);
   Time wait_time = Time() - start;
   ml.relock();
   calls->push_back(SyncPointCall(component, start, wait_time));
@@ -462,14 +495,13 @@ SyncPoint::is_pending(string component) {
 }
 
 void
-SyncPoint::handle_default(string component, WakeupType type,
-  uint max_time_sec, uint max_time_nsec)
+SyncPoint::handle_default(string component, WakeupType type)
 {
   logger_->log_warn(component.c_str(),
       "Thread time limit exceeded while waiting for syncpoint '%s'. "
       "Time limit: %f sec.",
       get_identifier().c_str(),
-      max_time_sec + static_cast<float>(max_time_nsec) / 1000000000.f);
+      max_waittime_sec_ + static_cast<float>(max_waittime_nsec_)/1000000000.f);
   bad_components_.insert(pending_emitters_.begin(), pending_emitters_.end());
   if (bad_components_.size() > 1) {
     string bad_components_string = "";
diff --git a/src/libs/syncpoint/syncpoint.h b/src/libs/syncpoint/syncpoint.h
index ab8b2ab..4a1e641 100644
--- a/src/libs/syncpoint/syncpoint.h
+++ b/src/libs/syncpoint/syncpoint.h
@@ -64,7 +64,8 @@ class SyncPoint
       NONE
     } WakeupType;
 
-    SyncPoint(std::string identifier, MultiLogger *logger);
+    SyncPoint(std::string identifier, MultiLogger *logger,
+      uint max_waittime_sec = 0, uint max_waittime_nsec = 0);
     virtual ~SyncPoint();
 
     /** send a signal to all waiting threads */
@@ -141,6 +142,14 @@ class SyncPoint
     Mutex *mutex_wait_for_all_;
     /** WaitCondition which is used for wait_for_all() */
     WaitCondition *cond_wait_for_all_;
+    /** true if the wait for all timer is running */
+    bool wait_for_all_timer_running_;
+    /** true if the wait for one timer is running */
+    bool wait_for_one_timer_running_;
+    /** maximum waiting time in secs */
+    uint max_waittime_sec_;
+    /** maximum waiting time in nsecs */
+    uint max_waittime_nsec_;
 
     /** Logger */
     MultiLogger *logger_;
@@ -148,8 +157,7 @@ class SyncPoint
   private:
     void reset_emitters();
     bool is_pending(std::string component);
-    void handle_default(std::string component, WakeupType type,
-      uint max_time_sec, uint max_time_nsec);
+    void handle_default(std::string component, WakeupType type);
     void cleanup();
 
   private:

- *commit* dd21195687d5e6f6d12cbd388b7ea04ce73b23b4 - - - - - - - - - -
Author:  Till Hofmann <hofmann at kbsg.rwth-aachen.de>
Date:    Fri May 13 17:15:43 2016 +0200
Subject: syncpoint: test that all waiters are released when a timeout occurs

 src/libs/syncpoint/tests/test_syncpoint.cpp |   44 +++++++++++++++++++++++++-
 1 files changed, 42 insertions(+), 2 deletions(-)

_Diff for modified files_:
diff --git a/src/libs/syncpoint/tests/test_syncpoint.cpp b/src/libs/syncpoint/tests/test_syncpoint.cpp
index 5da0e14..590e602 100644
--- a/src/libs/syncpoint/tests/test_syncpoint.cpp
+++ b/src/libs/syncpoint/tests/test_syncpoint.cpp
@@ -375,6 +375,10 @@ struct waiter_thread_params {
     string sp_identifier;
     /** Name of the component */
     string component = "";
+    /** timeout in sec */
+    uint timeout_sec = 0;
+    /** timeout in nsec */
+    uint timeout_nsec = 0;
 };
 
 
@@ -390,7 +394,8 @@ void * start_waiter_thread(void * data) {
   }
   RefPtr<SyncPoint> sp = params->manager->get_syncpoint(component, params->sp_identifier);
   for (uint i = 0; i < params->num_wait_calls; i++) {
-    sp->wait(component);
+    sp->wait(component, SyncPoint::WAIT_FOR_ONE, params->timeout_sec,
+        params->timeout_nsec);
   }
   pthread_exit(NULL);
 }
@@ -583,7 +588,8 @@ void * start_barrier_waiter_thread(void * data) {
   RefPtr<SyncPoint> sp;
   sp = params->manager->get_syncpoint(component, params->sp_identifier);
   for (uint i = 0; i < params->num_wait_calls; i++) {
-    sp->wait(component, SyncPoint::WAIT_FOR_ALL);
+    sp->wait(component, SyncPoint::WAIT_FOR_ALL, params->timeout_sec,
+        params->timeout_nsec);
   }
   pthread_exit(NULL);
 }
@@ -1151,3 +1157,37 @@ TEST_F(SyncPointManagerTest, WaitersAreAlwaysReleasedSimultaneouslyTest)
     EXPECT_EQ(0, pthread_tryjoin_np(threads[i], NULL));
   }
 }
+
+/** Test whether all syncpoints are released simultaneously if a timeout occurs;
+ *  i.e. make sure that only the first waiter's timeout matters and all
+ *  subsequent waiters are released when the first waiter times out.
+ */
+TEST_F(SyncPointManagerTest, WaitersTimeoutSimultaneousReleaseTest)
+{
+  RefPtr<SyncPoint> sp = manager->get_syncpoint("emitter1", "/test");
+  sp->register_emitter("emitter1");
+  uint num_threads = 2;
+  pthread_t threads[num_threads];
+  string sp_identifier = "/test";
+  waiter_thread_params params[num_threads];
+  for (uint i = 0; i < num_threads; i++) {
+    params[i].manager = manager;
+    params[i].thread_nr = i;
+    params[i].num_wait_calls = 1;
+    params[i].timeout_sec = 1;
+    params[i].sp_identifier = sp_identifier;
+  }
+  pthread_create(&threads[0], &attrs, start_barrier_waiter_thread, &params[0]);
+  pthread_yield();
+  EXPECT_EQ(EBUSY, pthread_tryjoin_np(threads[0], NULL));
+  params[1].timeout_sec = 5;
+  pthread_create(&threads[1], &attrs, start_barrier_waiter_thread, &params[1]);
+  usleep(10000);
+  for (uint i = 0; i < num_threads; i++) {
+    EXPECT_EQ(EBUSY, pthread_tryjoin_np(threads[i], NULL));
+  }
+  sleep(2);
+  for (uint i = 0; i < num_threads; i++) {
+    EXPECT_EQ(0, pthread_tryjoin_np(threads[i], NULL));
+  }
+}




-- 
Fawkes Robotics Framework                 http://www.fawkesrobotics.org


More information about the fawkes-commits mailing list