uThreads  0.3.0
IOHandler.h
1 /*******************************************************************************
2  * Copyright © 2015, 2016 Saman Barghi
3  *
4  * This program is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation, either version 3 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program. If not, see <http://www.gnu.org/licenses/>.
16  *******************************************************************************/
17 
18 #ifndef UTHREADS_IOHANDLER_H_
19 #define UTHREADS_IOHANDLER_H_
20 
21 #include <unordered_map>
22 #include <vector>
23 #include <sys/socket.h>
24 #include <mutex>
25 #include "../runtime/uThread.h"
26 #include "../runtime/kThread.h"
27 #include "../generic/Semaphore.h"
28 
29 #define POLL_READY ((uThread*)1)
30 #define POLL_WAIT ((uThread*)2)
31 
32 class Connection;
39 class PollData : public Link<PollData>{
40  friend IOHandler;
41  friend Connection;
42 private:
43 
44  /*
45  * Potentially there is only the poller thread and maximum two
46  * other uThreads (one for read, one for write) can contend for
47  * this mutex, the overhead of acquiring and releasing this
48  * mutex under linux is not that high (due to futex operating in
49  * user-level under no contention). However, it might be necessary
50  * to change it to a compare-and-swap or reader/writer lock if this
51  * ever needed to be portable.
52  * Another reason for lack of contention is that epoll is used in
53  * edge-triggered and it is usually the case that either the poller
54  * or the requesting uThread is updating the semaphore.
55  */
56 
57  // First 64 bytes (CACHELINE_SIZE)
69  uThread* rut = nullptr;
70  uThread* wut = nullptr;
71 
73  bool closing;
74 
76  bool opened;
77 
79  bool isBlockingOnRead;
80 
81  //file descriptor
82  int fd = -1;
83 
84  // First 64 bytes (CACHELINE_SIZE)
85 
86  //Mutex that protects this PollData
87  //std::mutex mtx;
88 
94  void reset(){
95  fd = -1;
96  rut = nullptr;
97  wut = nullptr;
98  closing = false;
99  opened = false;
100  };
101 
102 public:
107  PollData( int fd) : fd(fd), closing(false), opened(false), isBlockingOnRead(false){};
108 
114  PollData(): fd(-1), closing(false), opened(false), isBlockingOnRead(false){};
115  PollData(const PollData&) = delete;
116  const PollData& operator=(const PollData&) = delete;
117  ~PollData(){};
118 } __packed;
119 
120 /*
121  * PollCache is used to cache PollData objects and avoid
122  * nullptr exceptions after the file descriptor is closed.
123  * e.g., after a connection is closed, epoll might generate
124  * a notification returning the pointer to PollData back to
125  * the IOHandler. At this point if the PollData object is
126  * destroyed, the pointer is not valid anymore and segfault
127  * can happen. By allocating space in PollCache and returning
128  * the pointer to it after the connection is closed, the pointer
129  * is always valid. TODO: it might be necessary to deal with stale
130  * notifications in the future, but for now the assumption is that
131  * a stale notification can only cause an unnecessary unblock of the
132  * new uThread(if the PollData is assigned to a new Connection).
133  */
134 class PollCache{
135  friend class IOHandler;
136  friend class Connection;
137 protected:
138  IntrusiveQueue<PollData> cache;
139  std::mutex mtx;
140 
141  PollData* getPollData(){
142  std::unique_lock<std::mutex> mlock(mtx);
143  if(cache.empty()){
144  for(int i=0 ; i < 128; i++){
145  PollData* pd = new PollData();
146  cache.push(*pd);
147  }
148  }
149  PollData* pd = cache.front();
150  cache.pop();
151  return pd;
152  }
153 
154  void pushPollData(PollData* pd){
155  std::unique_lock<std::mutex> mlock(mtx);
156  cache.push(*pd);
157  }
158 };
159 #if defined (__linux__)
160 #include "EpollIOHandler.h"
161 #else
162 #error unsupported system: only __linux__ supported at this moment
163 #endif
164 
165 /*
166  * This class is a virtual class to provide nonblocking I/O
167  * to uThreads. select/poll/epoll or other type of nonblocking
168  * I/O can have their own class that inherits from IOHandler.
169  * The purpose of this class is to be used as a thread local
170  * I/O handler per kThread.
171  */
172 class IOHandler{
173  friend class Connection;
174  friend class Cluster;
175  friend class ReadyQueue;
176  friend class IOPoller;
177  friend class Scheduler;
178 
179 protected:
180 
181  static IOHandler iohandler;
182 
183  //Variables for bulk push to readyQueue
184  size_t unblockCounter;
185 
186  std::atomic_flag isPolling;
187 
188  semaphore sem;
189 
190  kThread ioKT; //IO kThread
191 
192  PollCache pollCache;
193 
194  IOPoller poller;
195 
196  /* polling flags */
197  enum Flag {
198  UT_IOREAD = 1 << 0, //READ
199  UT_IOWRITE = 1 << 1 //WRITE
200  };
201 
202  void block(PollData &pd, bool isRead);
203  bool inline unblock(PollData &pd, bool isRead);
204 
205  static void postSwitchFunc(void* ut, void* args);
206 
207  //Polling IO Function
208  static void pollerFunc(void*) __noreturn;
209 
210  IOHandler();
211  void PollReady(PollData &pd, int flag); //When there is notification update pollData and unblock the related ut
212  ~IOHandler(){}; //should be protected
213 
214 public:
215  /* public interfaces */
216  void open(PollData &pd);
217  int close(PollData &pd);
218  void wait(PollData& pd, int flag);
219  ssize_t poll(int timeout, int flag);
220  ssize_t nonblockingPoll();
221  void reset(PollData &pd);
222  //dealing with uThreads
223 };
224 
225 
227 #endif /* IOHANDLER_H_ */
Definition: EpollIOHandler.h:13
Object to represent kernel threads.
Definition: kThread.h:54
user-level threads (fiber)
Definition: uThread.h:63
Scheduler and Cluster of kThreads.
Definition: Cluster.h:61
Represents a network connection.
Definition: Network.h:33