/* $Revision: 20 $ $Date: 9/10/03 15:44 $ Copyright © 1999-2006, FSL Technologies Limited. Contact "http://fost.3.felspar.com". */ #include "stdafx.h" #include #include #include #include using namespace std; using namespace FSLib; using namespace FSLib::Exceptions; namespace { Revision c_revision( L"$Archive: /FOST.3/F3Util/tcpstream.cpp $", __DATE__, L"$Revision: 20 $", L"$Date: 9/10/03 15:44 $" ); // Time outs const Setting c_connect( L"$Archive: /FOST.3/F3Util/tcpstream.cpp $", L"IP", L"ConnectTimeOut", L"30", true ); const Setting c_accept( L"$Archive: /FOST.3/F3Util/tcpstream.cpp $", L"IP", L"AcceptTimeOut", L"300", true ); const Setting c_read( L"$Archive: /FOST.3/F3Util/tcpstream.cpp $", L"IP", L"ReadTimeOut", L"30", true ); const Setting c_showmanyc( L"$Archive: /FOST.3/F3Util/tcpstream.cpp $", L"IP", L"showmanycTimeOut", L"30", true ); // Buffer sizes const Setting c_readBuffer( L"$Archive: /FOST.3/F3Util/tcpstream.cpp $", L"IP", L"ReadBuffer", L"2048", true ); const Setting c_writeBuffer( L"$Archive: /FOST.3/F3Util/tcpstream.cpp $", L"IP", L"WriteBuffer", L"2048", true ); // Logging options const Setting c_logStream( L"$Archive: /FOST.3/F3Util/tcpstream.cpp $", L"IP", L"LogConnect", L"0", true ); const Setting c_logBuffer( L"$Archive: /FOST.3/F3Util/tcpstream.cpp $", L"IP", L"LogTransfer", L"0", true ); const Setting c_logInbound( L"$Archive: /FOST.3/F3Util/tcpstream.cpp $", L"IP", L"LogInbound", L"0", true ); } /* TCPStream */ // This is the constructor of a class that has been exported. // see tcpstream.h for the class definition inline TCPStream::TCPStream( const Host &a_host, t_port a_port ) : std::basic_iostream< char >( new TCPBuf() ), m_remoteHost( a_host ), m_port( a_port ), m_socket( 0 ), m_server( NULL ) { open( a_host, a_port ); } inline TCPStream::TCPStream( TCPServer *server, t_socket socket ) : std::basic_iostream< char >( new TCPBuf() ), m_remoteHost( server->host() ), m_port( server->port() ), m_socket( socket ), m_server( server ) { dynamic_cast< TCPBuf * >( rdbuf() )->attach(m_socket); if ( _wtoi( Setting::value( L"IP", L"LogConnect" ).c_str() ) ) { YAML::Record log; log.add( L"Internet Protocol", L"TCP/IP" ); log.add( L"Connect", socket ); log.log(); } } inline TCPStream::~TCPStream() { try { close(); delete rdbuf(); } catch ( exception & ) { absorbException(); } } inline void TCPStream::open( const Host &a_host, t_port a_port) { bool logging( _wtoi( Setting::value( L"IP", L"LogConnect" ).c_str() ) ? true : false ); initWSA(); m_remoteHost = a_host; m_port = a_port; #pragma warning ( push ) #pragma warning ( disable : 4244 ) m_socket = socket( AF_INET, SOCK_STREAM, 0 /*PF_INET*/ ); #pragma warning ( pop ) if ( m_socket == INVALID_SOCKET ) { throw SocketError( L"TCPStream::open - socket", WSAGetLastError() ); } { long unsigned int tmp=1; ioctlsocket(m_socket, FIONBIO, &tmp); // Set non-blocking mode. } sockaddr_in sin; sin.sin_family = AF_INET; sin.sin_addr.s_addr = htonl( m_remoteHost.address() ); sin.sin_port=htons( m_port ); if ( SOCKET_ERROR == connect( m_socket, reinterpret_cast< struct sockaddr * >( &sin ), sizeof( sin ) ) ) { if ( WSAEWOULDBLOCK != WSAGetLastError() ) { // Normal with non-blocking sockets. throw SocketError( L"TCPStream::open - connect", WSAGetLastError() ); } } { fd_set fds[1]; FD_ZERO(fds); #pragma warning ( push ) #pragma warning ( disable : 4127 ) FD_SET(m_socket, fds); #pragma warning ( pop ) TIMEVAL tv[1]; tv->tv_sec= _wtoi( Setting::value( L"IP", L"ConnectTimeOut" ).c_str() ); tv->tv_usec=0; t_socket select_ret = select( /*m_socket + 1*/ 0 , NULL, fds, NULL, tv); if( select_ret == SOCKET_ERROR ) { throw SocketError( L"connect-select", WSAGetLastError() ); } else if(select_ret==0) { throw SocketError( L"connect-select", WSAETIMEDOUT ); } } { int new_error; int new_error_size=sizeof(new_error); getsockopt(m_socket, SOL_SOCKET, SO_ERROR, (char *)&new_error, &new_error_size); if(new_error!=0) { throw SocketError( L"getsockopt-error", new_error ); } } dynamic_cast< TCPBuf * >( rdbuf() )->attach(m_socket); m_connected = TimeStamp::now(); if ( logging ) { YAML::Record log; log.add( L"Internet Protocol", L"TCP/IP" ); log.add( L"Connect", a_port ); log.add( L"Host", a_host ); log.log(); } } inline void TCPStream::close() { if ( rdbuf() == NULL ) return; flush(); if ( ::shutdown( m_socket, SD_BOTH ) == SOCKET_ERROR ) { throw SocketError( L"TCPServer::close() - shutdown()", WSAGetLastError() ); } do { WSASetLastError( 0 ); if ( ::closesocket( m_socket ) == SOCKET_ERROR ) { if ( WSAGetLastError() == WSAEWOULDBLOCK ) { Sleep( 10 ); } else { throw SocketError( L"TCPServer::close() - closesocket()", WSAGetLastError() ); } } } while ( WSAGetLastError() == WSAEWOULDBLOCK ); streambuf * sb = rdbuf(); rdbuf( NULL ); delete sb; m_closed = TimeStamp::now(); if ( _wtoi( Setting::value( L"IP", L"LogConnect" ).c_str() ) ) { YAML::Record log; log.add( L"Internet Protocol", L"TCP/IP" ); log.add( L"Close", m_socket ); log.add( L"Host", m_remoteHost ); log.log(); } } /* TCPBuf */ inline TCPBuf::TCPBuf(void) : m_socket( ~ t_socket( 0 ) ), m_outBuffer( new char[ _wtoi( Setting::value( L"IP", L"WriteBuffer" ).c_str() ) ] ), m_transferredOut( 0 ), m_transferredIn( 0 ) { initWSA(); } inline TCPBuf::~TCPBuf() { try { if(eback()) delete[] eback(); delete[] m_outBuffer; if ( _wtoi( Setting::value( L"IP", L"LogTransfer" ).c_str() ) ) { YAML::Record log; log.add( L"Internet Protocol", L"TCP/IP" ); log.add( L"Bytes out", m_transferredOut ); log.add( L"Bytes in", m_transferredIn ); log.log(); } } catch ( exception & ) { absorbException(); } } inline int TCPBuf::overflow(int a_ch) { // Dump characters to output. __int64 dump_len=pptr()-pbase(); if(a_ch!=EOF) ++dump_len; if( dump_len==0 ) { return 0; } boost::shared_array< char > mymem = boost::shared_array< char >( new char [ _wtoi( Setting::value( L"IP", L"WriteBuffer" ).c_str() ) ] ); char * dump_data=mymem.get(); memcpy( dump_data, pbase(), pptr()-pbase() ); if( a_ch != EOF ) dump_data[ dump_len - 1 ] = char( a_ch & 0xff ); __int64 ret; while( ( ret = send( m_socket, dump_data, static_cast< int >( dump_len ), 0 ) ) != dump_len ) { if( ret < 0 ) { throw SocketError( L"overflow - send", WSAGetLastError() ); } m_transferredOut += ret; dump_data += ret; dump_len -= ret; if( dump_len <= 0 ) { break; } } m_transferredOut += ret; setp( m_outBuffer, m_outBuffer + _wtoi( Setting::value( L"IP", L"WriteBuffer" ).c_str() )- 1 ); return 0; } inline int TCPBuf::sync() { overflow( EOF ); return 0; } inline int TCPBuf::showmanyc() { sync(); boost::shared_array< char > dump_data = boost::shared_array< char >( new char [ _wtoi( Setting::value( L"IP", L"ReadBuffer" ).c_str() ) ] ); int ret; fd_set fds; TIMEVAL tv; FD_ZERO(&fds); #pragma warning ( push ) #pragma warning ( disable : 4127 ) FD_SET(m_socket, &fds); #pragma warning ( pop ) tv.tv_sec= _wtoi( Setting::value( L"IP", L"showmanycTimeOut" ).c_str() ); tv.tv_usec=0; t_socket select_ret = select( /*m_socket+1*/ 0, &fds, NULL, NULL, &tv ); if( select_ret == SOCKET_ERROR ) { if ( WSAGetLastError() == WSAETIMEDOUT ) { // A timeout for showmanyc just means that there isn't anything available yet return 0; } else { throw SocketError( L"showmanyc - select", WSAGetLastError() ); } } else if(select_ret==0) { // We aren't ready yet return 0; } while( ( ret = recv( m_socket, dump_data.get(), _wtoi( Setting::value( L"IP", L"WriteBuffer" ).c_str() ), 0 ) ) < 0 ) { throw SocketError( L"showmanyc - recv", WSAGetLastError() ); } if( ret == 0 ) { return 0; } m_transferredIn += ret; char * tmp=new char[ ret ]; // Allocate enough for buffer. memcpy(tmp, dump_data.get(), ret); // Copy across. if( eback() ) delete[] eback(); setg( tmp, tmp, tmp + ret ); return dump_data[0]; } inline int TCPBuf::underflow() { sync(); // Fill Get buffer. Well, fill it with whatever we can get. boost::shared_array< char > dump_data = boost::shared_array< char >( new char [ _wtoi( Setting::value( L"IP", L"WriteBuffer" ).c_str() ) ] ); int ret; fd_set fds; TIMEVAL tv; FD_ZERO(&fds); #pragma warning ( push ) #pragma warning ( disable : 4127 ) FD_SET(m_socket, &fds); #pragma warning ( pop ) tv.tv_sec= _wtoi( Setting::value( L"IP", L"ReadTimeOut" ).c_str() ); tv.tv_usec=0; int select_ret = select( /*m_socket+1*/ 0, &fds, NULL, NULL, &tv ); if(select_ret==SOCKET_ERROR) { throw SocketError( L"underflow - select", WSAGetLastError() ); } else if(select_ret==0) { throw SocketError( WSAETIMEDOUT ); // If we haven't got our socket readable, then we simulate a timeout. } while((ret=recv(m_socket, dump_data.get(), _wtoi( Setting::value( L"IP", L"WriteBuffer" ).c_str() ), 0))<0) { throw SocketError( L"underflow - recv", WSAGetLastError() ); } if(ret==0) { return(EOF); } m_transferredIn += ret; if(eback()) delete[] eback(); char * tmp=new char[ret]; // Allocate enough for buffer. memcpy( tmp, dump_data.get(), ret ); // Copy across. setg(tmp,tmp,tmp+ret); return(dump_data[0]); } inline void TCPBuf::attach( t_socket a_socket ) { m_socket=a_socket; #ifdef _WIN32 long unsigned int tmp=1; ioctlsocket(m_socket, FIONBIO, &tmp); // Set non-blocking mode. #else #error Need UNIX non-blocking code. fnctl(m_socket, ...); #endif } inline int TCPBuf::close() { #ifdef _WIN32 return closesocket(m_socket); #else return close(m_socket); #endif } /* TCPServer */ inline TCPServer::TCPServer( t_port port ) : m_host( Host( L"localhost" ) ), m_port( port ) { initWSA(); listen(); } inline void TCPServer::listen() { #pragma warning ( push ) #pragma warning ( disable : 4244 ) m_socket=socket(AF_INET, SOCK_STREAM, 0 /*PF_INET*/); #pragma warning ( pop ) if ( m_socket == INVALID_SOCKET ) { throw SocketError( L"TCPServer::listen() - socket()", WSAGetLastError() ); } { // Set non-blocking mode. long unsigned int tmp=1; ioctlsocket(m_socket, FIONBIO, &tmp); } sockaddr_in sin; sin.sin_family=AF_INET; sin.sin_addr.s_addr=htonl(m_host.address()); sin.sin_port=htons(m_port); if ( ::bind( m_socket, reinterpret_cast< struct sockaddr * >( &sin ), sizeof( sin ) ) == SOCKET_ERROR ) { throw SocketError( L"TCPServer::listen() - bind()", WSAGetLastError() ); } if ( ::listen( m_socket, 4 ) == SOCKET_ERROR ) { throw SocketError( L"TCPServer::listen() - listen()", WSAGetLastError() ); } if ( _wtoi( Setting::value( L"IP", L"LogInbound" ).c_str() ) ) { YAML::Record log; log.add( L"Internet Protocol", L"TCP/IP" ); log.add( L"Server listening on port", m_port ); log.log(); } } inline TCPStream &TCPServer::stream() { t_socket socket; do { sockaddr_in sin; int bytes( sizeof( sin ) ); WSASetLastError( 0 ); #pragma warning ( push ) #pragma warning ( disable : 4244 ) if ( ( socket = ::accept( m_socket, reinterpret_cast< struct sockaddr * >( &sin ), &bytes ) ) == SOCKET_ERROR ) { #pragma warning ( pop ) if ( WSAGetLastError() == WSAEWOULDBLOCK ) { Sleep( 10 ); } else { throw SocketError( L"TCPServer::stream() - accept()", WSAGetLastError() ); } } } while ( WSAGetLastError() == WSAEWOULDBLOCK ); m_stream.reset( new TCPStream( this, socket ) ); if ( _wtoi( Setting::value( L"IP", L"LogInbound" ).c_str() ) ) { YAML::Record log; log.add( L"Internet Protocol", L"TCP/IP" ); log.add( L"Accepted connection", TimeStamp::now() ); log.log(); } return *m_stream; } inline const Host &TCPServer::host() const { return m_host; } inline t_port TCPServer::port() const { return m_port; }