diff --git a/src/Allwmake b/src/Allwmake index 8d80ff5eb..d77033182 100755 --- a/src/Allwmake +++ b/src/Allwmake @@ -17,9 +17,13 @@ wmakePrintBuild -check || /bin/rm -f foam/Make/$WM_OPTIONS/global.? 2>/dev/null wmakeLnInclude foam wmakeLnInclude meshTools wmakeLnInclude OSspecific/$WM_OSTYPE -Pstream/Allwmake wmake libo OSspecific/$WM_OSTYPE + +set +x +echo +echo "Note: ignore spurious warnings about missing mpicxx.h headers" +set -x wmake libso foam # Decomposition methods needed by meshTools diff --git a/src/Pstream/Allwmake b/src/Pstream/Allwmake deleted file mode 100755 index 6c4131068..000000000 --- a/src/Pstream/Allwmake +++ /dev/null @@ -1,22 +0,0 @@ -#!/bin/sh -cd ${0%/*} || exit 1 # run from this directory -set -x - -wmake libso dummy - -case "$WM_MPLIB" in -*MPI* | MVAPICH*) - set +x - echo - echo "Note: ignore spurious warnings about missing mpicxx.h headers" - set -x - (WM_OPTIONS=${WM_OPTIONS}$WM_MPLIB; wmake libso mpi) - ;; - -#GAMMA) -# wmake libso gamma -# ;; -esac - - -# ----------------------------------------------------------------- end-of-file diff --git a/src/Pstream/dummy/IPread.C b/src/Pstream/dummy/IPread.C deleted file mode 100644 index 52d94a9e1..000000000 --- a/src/Pstream/dummy/IPread.C +++ /dev/null @@ -1,101 +0,0 @@ -/*---------------------------------------------------------------------------*\ - ========= | - \\ / F ield | foam-extend: Open Source CFD - \\ / O peration | Version: 3.2 - \\ / A nd | Web: http://www.foam-extend.org - \\/ M anipulation | For copyright notice see file Copyright -------------------------------------------------------------------------------- -License - This file is part of foam-extend. - - foam-extend is free software: you can redistribute it and/or modify it - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - foam-extend is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with foam-extend. If not, see . - -Description - Read token and binary block from IPstream - -\*---------------------------------------------------------------------------*/ - -#include "error.H" -#include "IPstream.H" - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // - -Foam::IPstream::IPstream -( - const commsTypes commsType, - const int fromProcNo, - const label bufSize, - streamFormat format, - versionNumber version -) -: - Pstream(commsType, bufSize), - Istream(format, version), - fromProcNo_(fromProcNo), - messageSize_(0) -{ - notImplemented - ( - "IPsream::IPstream" - "(" - "const commsTypes," - "const int fromProcNo," - "const label bufSize," - "streamFormat, versionNumber" - ")" - ); -} - - -// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // - -int Foam::IPstream::read -( - const commsTypes commsType, - const int fromProcNo, - char* buf, - const std::streamsize bufSize -) -{ - notImplemented - ( - "IPstream::read" - "(" - "const commsTypes," - "const int fromProcNo," - "char* buf," - "const label bufSize" - ")" - ); - - return 0; -} - - -void Foam::IPstream::waitRequests() -{} - - -bool Foam::IPstream::finishedRequest(const label) -{ - notImplemented("IPstream::finishedRequest()"); - return false; -} - - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -// ************************************************************************* // diff --git a/src/Pstream/dummy/Make/files b/src/Pstream/dummy/Make/files deleted file mode 100644 index 073e090a0..000000000 --- a/src/Pstream/dummy/Make/files +++ /dev/null @@ -1,5 +0,0 @@ -Pstream.C -IPread.C -OPwrite.C - -LIB = $(FOAM_LIBBIN)/dummy/libPstream diff --git a/src/Pstream/dummy/Make/options b/src/Pstream/dummy/Make/options deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/Pstream/dummy/OPwrite.C b/src/Pstream/dummy/OPwrite.C deleted file mode 100644 index 770864943..000000000 --- a/src/Pstream/dummy/OPwrite.C +++ /dev/null @@ -1,80 +0,0 @@ -/*---------------------------------------------------------------------------*\ - ========= | - \\ / F ield | foam-extend: Open Source CFD - \\ / O peration | Version: 3.2 - \\ / A nd | Web: http://www.foam-extend.org - \\/ M anipulation | For copyright notice see file Copyright -------------------------------------------------------------------------------- -License - This file is part of foam-extend. - - foam-extend is free software: you can redistribute it and/or modify it - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - foam-extend is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with foam-extend. If not, see . - -Description - Write primitive and binary block from OPstream - -\*---------------------------------------------------------------------------*/ - -#include "error.H" -#include "OPstream.H" - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * // - -Foam::OPstream::~OPstream() -{ - notImplemented("OPstream::~OPstream()"); -} - - -// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // - -bool Foam::OPstream::write -( - const commsTypes commsType, - const int toProcNo, - const char* buf, - const std::streamsize bufSize -) -{ - notImplemented - ( - "IPstream::write" - "(" - "const commsTypes commsType," - "const int fromProcNo," - "char* buf," - "const label bufSize" - ")" - ); - - return false; -} - - -void Foam::OPstream::waitRequests() -{} - - -bool Foam::OPstream::finishedRequest(const label) -{ - notImplemented("OPstream::finishedRequest()"); - return false; -} - - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -// ************************************************************************* // diff --git a/src/Pstream/dummy/Pstream.C b/src/Pstream/dummy/Pstream.C deleted file mode 100644 index e397fb212..000000000 --- a/src/Pstream/dummy/Pstream.C +++ /dev/null @@ -1,65 +0,0 @@ -/*---------------------------------------------------------------------------*\ - ========= | - \\ / F ield | foam-extend: Open Source CFD - \\ / O peration | Version: 3.2 - \\ / A nd | Web: http://www.foam-extend.org - \\/ M anipulation | For copyright notice see file Copyright -------------------------------------------------------------------------------- -License - This file is part of foam-extend. - - foam-extend is free software: you can redistribute it and/or modify it - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - foam-extend is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with foam-extend. If not, see . - -\*---------------------------------------------------------------------------*/ - -#include "Pstream.H" -#include "PstreamReduceOps.H" - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -void Foam::Pstream::addValidParOptions(HashTable& validParOptions) -{} - - -bool Foam::Pstream::init(int& argc, char**& argv) -{ - FatalErrorIn("Pstream::init(int& argc, char**& argv)") - << "Trying to use the dummy Pstream library." << nl - << "This dummy library cannot be used in parallel mode" - << Foam::exit(FatalError); - - return false; -} - - -void Foam::Pstream::exit(int errnum) -{ - notImplemented("Pstream::exit(int errnum)"); -} - - -void Foam::Pstream::abort() -{ - notImplemented("Pstream::abort()"); -} - - -void Foam::reduce(scalar&, const sumOp&) -{} - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -// ************************************************************************* // diff --git a/src/Pstream/gamma/IPread.C b/src/Pstream/gamma/IPread.C deleted file mode 100644 index 8e83be0ff..000000000 --- a/src/Pstream/gamma/IPread.C +++ /dev/null @@ -1,136 +0,0 @@ -/*---------------------------------------------------------------------------*\ - ========= | - \\ / F ield | foam-extend: Open Source CFD - \\ / O peration | Version: 3.2 - \\ / A nd | Web: http://www.foam-extend.org - \\/ M anipulation | For copyright notice see file Copyright -------------------------------------------------------------------------------- -License - This file is part of foam-extend. - - foam-extend is free software: you can redistribute it and/or modify it - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - foam-extend is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with foam-extend. If not, see . - -Description - Read token and binary block from IPstream - -\*---------------------------------------------------------------------------*/ - -#include "IPstream.H" -#include "long.H" -#include "PstreamGlobals.H" - -extern "C" -{ -# include -} - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -namespace Foam -{ - -// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // - -IPstream::IPstream -( - const commsTypes commsType, - const int fromProcNo, - const label bufSize, - streamFormat format, - versionNumber version -) -: - Pstream(commsType, bufSize), - Istream(format, version), - fromProcNo_(fromProcNo), - messageSize_(0) -{ - // Blocking read. - - setOpened(); - setGood(); - - if (Pstream::debug) - { - Pout<< "IPstream::IPstream : Starting receive from " << fromProcNo_ - << " recvIndex:" << PstreamGlobals::recvIndex[fromProcNo_] - << Foam::endl; - } - - PstreamGlobals::gammaWait(fromProcNo_); - - label ready = PstreamGlobals::consumeIndex[fromProcNo_]; - messageSize_ = PstreamGlobals::recvBufLen[ready][fromProcNo_]; - - if (!bufSize) - { - if (Pstream::debug) - { - Pout<< "IPstream::IPstream : sizing buffer to " << messageSize_ - << endl; - } - - buf_.setSize(messageSize_); - } - - PstreamGlobals::copyReceive(fromProcNo_, buf_.begin(), buf_.size()); - - if (Pstream::debug) - { - Pout<< "IPstream::IPstream : Received " << messageSize_ - << " from " << fromProcNo_ - << Foam::endl; - } -} - - -// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // - -label IPstream::read -( - const commsTypes commsType, - const int fromProcNo, - char* buf, - const std::streamsize bufSize -) -{ - // Blocking read. - label messageSize; - - if (Pstream::debug) - { - Pout<< "IPstream::read : Starting receive from " << fromProcNo - << " recvIndex:" << PstreamGlobals::recvIndex[fromProcNo] - << Foam::endl; - } - - PstreamGlobals::gammaWait(fromProcNo); - messageSize = PstreamGlobals::copyReceive(fromProcNo, buf, bufSize); - - if (Pstream::debug) - { - Pout<< "IPstream::read : Received " << messageSize - << " from " << fromProcNo - << Foam::endl; - } - - return messageSize; -} - - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -} // End namespace Foam - -// ************************************************************************* // diff --git a/src/Pstream/gamma/Make/files b/src/Pstream/gamma/Make/files deleted file mode 100644 index a0374cb9c..000000000 --- a/src/Pstream/gamma/Make/files +++ /dev/null @@ -1,6 +0,0 @@ -PstreamGlobals.C -Pstream.C -OPwrite.C -IPread.C - -LIB = $(FOAM_MPI_LIBBIN)/libPstream diff --git a/src/Pstream/gamma/Make/options b/src/Pstream/gamma/Make/options deleted file mode 100644 index 90632e801..000000000 --- a/src/Pstream/gamma/Make/options +++ /dev/null @@ -1,4 +0,0 @@ -include $(RULES)/mplib$(WM_MPLIB) - -EXE_INC = $(PFLAGS) $(PINC) -LIB_LIBS = $(PLIBS) diff --git a/src/Pstream/gamma/OPwrite.C b/src/Pstream/gamma/OPwrite.C deleted file mode 100644 index 88ebf53ee..000000000 --- a/src/Pstream/gamma/OPwrite.C +++ /dev/null @@ -1,185 +0,0 @@ -/*---------------------------------------------------------------------------*\ - ========= | - \\ / F ield | foam-extend: Open Source CFD - \\ / O peration | Version: 3.2 - \\ / A nd | Web: http://www.foam-extend.org - \\/ M anipulation | For copyright notice see file Copyright -------------------------------------------------------------------------------- -License - This file is part of foam-extend. - - foam-extend is free software: you can redistribute it and/or modify it - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - foam-extend is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with foam-extend. If not, see . - -Description - Write primitive and binary block from OPstream gamma-mpi - -\*---------------------------------------------------------------------------*/ - -#include "OPstream.H" -#include "long.H" -#include "PstreamGlobals.H" - -extern "C" { - -#include - -} - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -namespace Foam -{ - -// * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * // - -// Largest message sent so far. This tracks the size of the receive -// buffer on the receiving end. Done so we only send out resize messages -// if necessary -//! @cond fileScope -labelList maxSendSize; -//! @endcond fileScope - - -// * * * * * * * * * * * * * * * * Destructor * * * * * * * * * * * * * * * // - -OPstream::~OPstream() -{ - if (Pstream::debug) - { - Pout<< "OPstream::~OPstream() to processor " << toProcNo_ - << Foam::endl; - } - - if - ( - !write - ( - commsType_, - toProcNo_, - buf_.begin(), - bufPosition_ - ) - ) - { - FatalErrorIn("OPstream::~OPstream()") - << "GAMMA cannot send outgoing message" - << Foam::abort(FatalError); - } -} - - -// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // - -bool OPstream::write -( - const commsTypes commsType, - const int toProcNo, - const char* buf, - const std::streamsize bufSize -) -{ - if (PstreamGlobals::getSizeFromHeader(buf, bufSize) != -1) - { - FatalErrorIn("OPstream::write") - << "Problem: Trying to send message of size " << bufSize - << " that corresponds to the special resizeMessage." - << Foam::abort(FatalError); - } - - if (maxSendSize.empty()) - { - // Intialize maxSendSize to the initial size of the receive buffers. - maxSendSize.setSize(Pstream::nProcs()); - maxSendSize = PstreamGlobals::initialBufferLen; - maxSendSize[Pstream::myProcNo()] = 0; - - if (Pstream::debug) - { - forAll(maxSendSize, procNo) - { - Pout<< "OPstream::write() : for toProcNo:" << procNo - << " set maxSendSize to " << maxSendSize[procNo] - << Foam::endl; - } - } - } - - if (Pstream::debug) - { - Pout<< "OPstream::write() : proc:" << toProcNo - << " maxSendSize:" << maxSendSize[toProcNo] - << Foam::endl; - } - - if (bufSize > maxSendSize[toProcNo]) - { - // Send resize message. - if (Pstream::debug) - { - Pout<< "OPstream::write() : Sending resize message to proc " - << toProcNo - << " for size:" << bufSize - << Foam::endl; - } - - PstreamGlobals::setResizeMessage(bufSize); - gamma_send_flowctl - ( - toProcNo, - reinterpret_cast(PstreamGlobals::resizeMessage), - PstreamGlobals::resizeMessageLen*sizeof(uint64_t) - ); - - maxSendSize[toProcNo] = bufSize; - } - - - // Do normal send - // ~~~~~~~~~~~~~~ - - // Note: could be put into allocation of buf. - //gamma_mlock(const_cast(buf), bufSize); - - if (Pstream::debug) - { - Pout<< "OPstream::write() : Sending to proc " << toProcNo - << " bytes:" << bufSize << Foam::endl; - } - - gamma_send_flowctl - ( - toProcNo, - const_cast(buf), - bufSize - ); - - //gamma_munlock(const_cast(buf), bufSize); - - if (Pstream::debug) - { - Pout<< "OPstream::write() : Sent " << bufSize - << " to proc " << toProcNo - << Foam::endl; - } - - - return true; -} - - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -} // End namespace Foam - -// ************************************************************************* // diff --git a/src/Pstream/gamma/Pstream.C b/src/Pstream/gamma/Pstream.C deleted file mode 100644 index d9afbbda4..000000000 --- a/src/Pstream/gamma/Pstream.C +++ /dev/null @@ -1,474 +0,0 @@ -/*---------------------------------------------------------------------------*\ - ========= | - \\ / F ield | foam-extend: Open Source CFD - \\ / O peration | Version: 3.2 - \\ / A nd | Web: http://www.foam-extend.org - \\/ M anipulation | For copyright notice see file Copyright -------------------------------------------------------------------------------- -License - This file is part of foam-extend. - - foam-extend is free software: you can redistribute it and/or modify it - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - foam-extend is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with foam-extend. If not, see . - -Description - Pstream for GAMMA - - GAMMA has a (polling) receive handler which gets called every time a - received message is complete. Ours stores the length of the currently - received message and sets up the next buffer to store the next message - in. - Note that the pattern between two processors can be - - send - - receive - - receive - - send - since the first swap might belong to a local exchange and the second to - a reduce. Since gamma has to have the receive buffers already set up we - have to allocate them big enough. To prevent excessive amounts needed we - dynamically resize them (never shrink) by sending special 'resize' messages - before sending a largish message. - - Because of this we actually need four receive buffers: - - send - - receive resize message - - receive normal message - - receive resize message - - receive normal message - - send - - The special resize message is a message with a special header which - (hopefully) should never appear in normal exchanges (it actually checks - for this in the OPstream::send) - -\*---------------------------------------------------------------------------*/ - -#include "Pstream.H" -#include "PstreamReduceOps.H" -#include "OSspecific.H" -#include "PstreamGlobals.H" - -#include -#include -#include - -extern "C" -{ -# include -} - - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -namespace Foam -{ - -// Receive handler to copy out received message length and switch buffers. -static void handler(void) -{ - label current = PstreamGlobals::recvIndex[gamma_active_port]; - - List& buf = PstreamGlobals::recvBuf[current][gamma_active_port]; - label bufLen = PstreamGlobals::recvBufLen[current][gamma_active_port]; - - if (bufLen != -1) - { - FatalErrorIn("Pstream::handler(void)") - << "Buffer length not reset : " - << bufLen - << " when receiving message of size " << gamma_msglen - << " from processor " << gamma_active_port << endl - << "This means that the existing data has not been consumed yet" - << " (by IPstream::read) and means your communication pattern" - << " is probably not balanced (a receive for every send)" - << endl - << "This can happen if you have e.g. gather without scatter." - << endl - << "A workaround is to increase the depth of the circular" - << " receive buffers in PstreamGlobals.H" - << abort(FatalError); - } - - - // Some checks - if - ( - gamma_msglen < 0 - || gamma_msglen > buf.size() - ) - { - FatalErrorIn("Pstream::handler(void)") - << "Received message of size " << gamma_msglen - << " from processor " << gamma_active_port - << Foam::endl - << "but global receive buffer is only of size " - << buf.size() - << abort(FatalError); - } - - // Check for resize message - label resizeLen = PstreamGlobals::getSizeFromHeader - ( - buf.begin(), - gamma_msglen - ); - - if (resizeLen != -1) - { - if (Pstream::debug) - { - Pout<< "Pstream::handler : Resize message:" << resizeLen - << " from proc " << gamma_active_port - << " current size:" - << PstreamGlobals::getMaxBufSize(gamma_active_port) - << Foam::endl; - } - - // Saved current buffer. - List savedBuf; - - if (resizeLen > PstreamGlobals::getMaxBufSize(gamma_active_port)) - { - if (Pstream::debug) - { - Pout<< "Pstream::handler :" - << " resizing receive buffer for processor " - << gamma_active_port - << " from " - << PstreamGlobals::getMaxBufSize(gamma_active_port) - << " to " << resizeLen << Foam::endl; - } - - // Save the pointer (that gamma knows about) so we can safely - // gamma_switch_to_buffer with a valid pointer. - // Not sure if nessecary but do anyway. - savedBuf.transfer(buf); - - // Resize all the buffers - forAll(PstreamGlobals::recvBuf, i) - { - List& chars = - PstreamGlobals::recvBuf[i][gamma_active_port]; - -// gamma_munlock(chars.begin(), chars.size()); - chars.setSize(resizeLen); -// gamma_mlock(chars.begin(), chars.size()); - } - } - - // Update length with special value to denote resize was done. - PstreamGlobals::recvBufLen[current][gamma_active_port] = -2; - } - else - { - // Update length with actual message length - PstreamGlobals::recvBufLen[current][gamma_active_port] = gamma_msglen; - } - - // Go to next buffer. - label next = PstreamGlobals::recvBuf.fcIndex(current); - PstreamGlobals::recvIndex[gamma_active_port] = next; - -// gamma_switch_to_buffer - gamma_post_recv - ( - gamma_active_port, - PstreamGlobals::recvBuf[next][gamma_active_port].begin(), - PstreamGlobals::recvBuf[next][gamma_active_port].size() - ); -} - - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -void Pstream::addValidParOptions(HashTable& validParOptions) -{ - validParOptions.insert("np", ""); - validParOptions.insert("p4pg", "PI file"); - validParOptions.insert("p4wd", "directory"); - validParOptions.insert("p4amslave", ""); - validParOptions.insert("p4yourname", "hostname"); - - validParOptions.insert("machinefile", "machine file"); - validParOptions.insert("GAMMANP", "numProcs"); - validParOptions.insert("GAMMAHOME", "gamma cwd"); - validParOptions.insert("GAMMA", "1(enable) or 0(disable)"); -} - - -bool Pstream::init(int& argc, char**& argv) -{ - int numprocs = 0; - - string npString("-GAMMANP"); - - for (label i = 0; i < argc; i++) - { - if (argv[i] == npString) - { - if (i+1 < argc) - { - numprocs = atoi(argv[i+1]); - break; - } - } - } - - // Initialize GAMMA - unsigned char smallNumprocs = numprocs; - - gamma_init(smallNumprocs, argc, argv); - - myProcNo_ = gamma_my_node(); - - // Make sure printing with prefix. - setParRun(); - - procIDs_.setSize(numprocs); - - forAll(procIDs_, procNo) - { - procIDs_[procNo] = procNo; - } - - - // Allocate receive buffers. - // ~~~~~~~~~~~~~~~~~~~~~~~~~ - - // Make sure each receive buffer is at least large enough to receive - // the resize message. - - // Current active buffer - PstreamGlobals::recvIndex.setSize(numprocs); - PstreamGlobals::recvIndex = 0; - PstreamGlobals::consumeIndex.setSize(numprocs); - PstreamGlobals::consumeIndex = 0; - - forAll(PstreamGlobals::recvBuf, i) - { - PstreamGlobals::recvBufLen[i].setSize(numprocs); - PstreamGlobals::recvBufLen[i] = -1; - - List >& buffers = PstreamGlobals::recvBuf[i]; - - buffers.setSize(numprocs); - forAll(buffers, procNo) - { - if (procNo != myProcNo_) - { - buffers[procNo].setSize(PstreamGlobals::initialBufferLen); - - // Acc. to gamma sources all buffers need to be in memory. - // Either locked or "write touched". -// gamma_mlock - // ( - // buffers[procNo].begin(), - // buffers[procNo].size() - // ); - } - } - } - - - // Lock the special resize message -// gamma_mlock -// ( - // reinterpret_cast(PstreamGlobals::resizeMessage), - // PstreamGlobals::resizeMessageLen*sizeof(uint64_t) - // ); - - - // Attach current receive buffers - forAll(procIDs_, procNo) - { - if (procNo != myProcNo_) - { - // Buffer index (always 0 at this point) - label current = PstreamGlobals::recvIndex[procNo]; - - // Current buffer for this processor. - List& buf = PstreamGlobals::recvBuf[current][procNo]; - - gamma_set_active_port - ( - procNo, //unsigned short port, - procNo, //unsigned short dest_node, - gamma_my_par_pid(), //unsigned char dest_par_pid, - myProcNo_, //unsigned short dest_port, - handler, //callback - procNo, //unsigned short semaphore, - GO_BACK, //unsigned char buffer_kind, - buf.begin(), - buf.size() - ); - } - } - - - // Make sure all have allocated the ports (so set the receive buffers) - gamma_sync(); - - Info<< "GAMMA Pstream initialized with:" << nl - << " floatTransfer : " << floatTransfer << nl - << " nProcsSimpleSum : " << nProcsSimpleSum() << nl - << " scheduledTransfer : " << Pstream::scheduledTransfer << nl - << Foam::endl; - - // Now that nprocs is known construct communication tables. - initCommunicationSchedule(); - - return true; -} - - -void Pstream::exit(int errnum) -{ -// gamma_munlockall(); - gamma_exit(); - //gamma_abort(); -} - - -void Pstream::abort() -{ -Pout<< "**Pstream::abort()**" << endl; -// gamma_munlockall(); - gamma_abort(); -} - - -void reduce(scalar& Value, const sumOp& bop) -{ - if (!Pstream::parRun()) - { - return; - } - - if (Pstream::debug) - { - Pout<< "**entering Pstream::reduce for " << Value << Foam::endl; - } - - - if (Pstream::master()) - { - for - ( - int slave=Pstream::firstSlave(); - slave<=Pstream::lastSlave(); - slave++ - ) - { - scalar value; - - if - ( - !IPstream::read - ( - slave, - reinterpret_cast(&value), // buf - sizeof(Value) // bufSize - ) - ) - { - FatalErrorIn - ( - "reduce(scalar& Value, const sumOp& sumOp)" - ) << "IPstream::read failed" - << Foam::abort(FatalError); - } - - Value = bop(Value, value); - } - } - else - { - if - ( - !OPstream::write - ( - Pstream::masterNo(), - reinterpret_cast(&Value), // buf - sizeof(Value), // bufSize - false // non-buffered - ) - ) - { - FatalErrorIn - ( - "reduce(scalar& Value, const sumOp& sumOp)" - ) << "OPstream::write failed" - << Foam::abort(FatalError); - } - } - - if (Pstream::master()) - { - for - ( - int slave=Pstream::firstSlave(); - slave<=Pstream::lastSlave(); - slave++ - ) - { - if - ( - !OPstream::write - ( - slave, - reinterpret_cast(&Value), // buf - sizeof(Value), // bufSize, - false // non-buffered - ) - ) - { - FatalErrorIn - ( - "reduce(scalar& Value, const sumOp& sumOp)" - ) << "OPstream::write failed" - << Foam::abort(FatalError); - } - } - } - else - { - if - ( - !IPstream::read - ( - Pstream::masterNo(), - reinterpret_cast(&Value), // buf - sizeof(Value) // bufSize - ) - ) - { - FatalErrorIn - ( - "reduce(scalar& Value, const sumOp& sumOp)" - ) << "IPstream::read failed" - << Foam::abort(FatalError); - } - } - - if (Pstream::debug) - { - Pout<< "**exiting Pstream::reduce with " << Value << Foam::endl; - } -} - - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -} // End namespace Foam - -// ************************************************************************* // diff --git a/src/Pstream/gamma/PstreamGlobals.C b/src/Pstream/gamma/PstreamGlobals.C deleted file mode 100644 index 15b161d67..000000000 --- a/src/Pstream/gamma/PstreamGlobals.C +++ /dev/null @@ -1,206 +0,0 @@ -/*---------------------------------------------------------------------------*\ - ========= | - \\ / F ield | foam-extend: Open Source CFD - \\ / O peration | Version: 3.2 - \\ / A nd | Web: http://www.foam-extend.org - \\/ M anipulation | For copyright notice see file Copyright -------------------------------------------------------------------------------- -License - This file is part of foam-extend. - - foam-extend is free software: you can redistribute it and/or modify it - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - foam-extend is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with foam-extend. If not, see . - -\*---------------------------------------------------------------------------*/ - -#include "PstreamGlobals.H" -#include "IOstreams.H" -#include "Pstream.H" - -extern "C" { - -#include - -} - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -namespace Foam -{ - -// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // - -// Receive buffers -FixedList >, 4> PstreamGlobals::recvBuf; - -// Length of receive buffers -FixedList PstreamGlobals::recvBufLen; - -labelList PstreamGlobals::recvIndex; -labelList PstreamGlobals::consumeIndex; - -// These are all signalling nans and probably different from the ones that -// the fpu might ever generate. -uint64_t PstreamGlobals::resizeMessage[PstreamGlobals::resizeMessageLen] = -{ - 0x7ff7ffffffffffABllu, - 0x7ff7ffffffffffCDllu, - 0x7ff7ffffffffff12llu, - 0x7ff7ffffffffff30llu, - 0x7ff7ffffffffff19llu, - 0x0000000000000000llu // this word gets overwritten with the length. -}; - - -// Wrapper around gamma_wait -void PstreamGlobals::gammaWait(const label procNo) -{ - // Last request. Block. - gamma_wait(procNo, 1); - - // Currently unconsumed received message - label ready = PstreamGlobals::consumeIndex[procNo]; - - // Check received length - if (PstreamGlobals::recvBufLen[ready][procNo] == -2) - { - // Was resize message. Consume and rewait (is always followed by - // real message) - - if (Pstream::debug) - { - Pout<< "PstreamGlobals::gammaWait : " - << "Resize event. consumeIndex:" << ready - << " Restarting receive from " << procNo << endl; - } - // Consume resize message - PstreamGlobals::recvBufLen[ready][procNo] = -1; - PstreamGlobals::consumeIndex[procNo] = - PstreamGlobals::recvBuf.fcIndex(ready); - // And rewait - gamma_wait(procNo, 1); - } -} - - -// Copies data from global receive buffer into buf. -label PstreamGlobals::copyReceive -( - const label procNo, - char* buf, - const label bufSize -) -{ - // Get the ready buffer - label ready = consumeIndex[procNo]; - - // Actually received - label receivedLen = recvBufLen[ready][procNo]; - - if (Pstream::debug) - { - Pout<< "copyReceive : for proc " << procNo - << " copying " << receivedLen << " bytes out of buffer " << ready - << endl; - } - - if (receivedLen < 0) - { - FatalErrorIn - ( - "Pstream::copyReceive(const label, char*, const label)" - ) << "Illegal message length " - << receivedLen - << " received from proc " << procNo << " into buffer " << ready - << endl - << "This is probably caused by receiving more than is actually" - << " sent (e.g. gather without scatter)." << endl - << abort(FatalError); - } - - if (receivedLen > bufSize) - { - FatalErrorIn - ( - "Pstream::copyReceive(const label, char*, const label)" - ) << "buffer (" - << bufSize - << ") not large enough for incomming message (" - << receivedLen << ')' - << " received from proc " << procNo << " into buffer " << ready - << abort(FatalError); - } - - // Copy out of receive buffer - memcpy - ( - buf, - recvBuf[ready][procNo].begin(), - receivedLen - ); - // Release receive buffer - recvBufLen[ready][procNo] = -1; - // Go to next buffer to consume - consumeIndex[procNo] = recvBuf.fcIndex(ready); - - return receivedLen; -} - - -// Checks whether an incoming message is a resize message. If not returns -1, -// otherwise returns size read from header. -label PstreamGlobals::getSizeFromHeader(const char* buf, const label len) -{ - if (len != resizeMessageLen*sizeof(uint64_t)) - { - return -1; - } - - const uint64_t* dPtr = reinterpret_cast(buf); - - // Check all but the last word - for (label i = 0; i < resizeMessageLen-1; i++) - { - if (*dPtr++ != resizeMessage[i]) - { - return -1; - } - } - - return *reinterpret_cast(dPtr); -} - - -void PstreamGlobals::setResizeMessage(const label len) -{ - reinterpret_cast(resizeMessage[resizeMessageLen-1]) = len; -} - - -label PstreamGlobals::getMaxBufSize(const int procNo) -{ - label maxSz = 0; - - forAll(recvBuf, i) - { - maxSz = max(maxSz, recvBuf[i][procNo].size()); - } - return maxSz; -} - - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -} // End namespace Foam - -// ************************************************************************* // diff --git a/src/Pstream/gamma/PstreamGlobals.H b/src/Pstream/gamma/PstreamGlobals.H deleted file mode 100644 index bc97a1544..000000000 --- a/src/Pstream/gamma/PstreamGlobals.H +++ /dev/null @@ -1,105 +0,0 @@ -/*---------------------------------------------------------------------------*\ - ========= | - \\ / F ield | foam-extend: Open Source CFD - \\ / O peration | Version: 3.2 - \\ / A nd | Web: http://www.foam-extend.org - \\/ M anipulation | For copyright notice see file Copyright -------------------------------------------------------------------------------- -License - This file is part of foam-extend. - - foam-extend is free software: you can redistribute it and/or modify it - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - foam-extend is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with foam-extend. If not, see . - -Namespace - Foam::PstreamGlobals - -Description - Global functions and variables for working with parallel streams, - but principally for gamma/mpi - -SourceFiles - PstreamGlobals.C - -\*---------------------------------------------------------------------------*/ - -#ifndef PstreamGlobals_H -#define PstreamGlobals_H - -#include "FixedList.H" -#include "labelList.H" -#include "DynamicList.H" - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -namespace Foam -{ - -/*---------------------------------------------------------------------------*\ - Class PstreamGlobals Declaration -\*---------------------------------------------------------------------------*/ - -namespace PstreamGlobals -{ - -//- Block wait for message on port procNo -void gammaWait(const label procNo); - -//- Helper routine to copy out newly received data -label copyReceive -( - const label procNo, - char* buf, - const label bufSize -); - - -//- Receive buffers -extern FixedList >, 4> recvBuf; - -//- Length of receive buffers -extern FixedList recvBufLen; - -//- Currently active buffer in receiving -extern labelList recvIndex; -//- Receive buffer that has to be consumed -extern labelList consumeIndex; - - -//- Special message to signal resizing -const int resizeMessageLen = 6; -extern uint64_t resizeMessage[]; -//- Initial buffer length. Should be able to contain the message comfortably. -const int initialBufferLen = 2*resizeMessageLen*sizeof(uint64_t); - -//- Helper routine to check if a message is a resize message. -// Returns -1 if not or the new size. -label getSizeFromHeader(const char* buf, const label len); -//- Change the resize message to contain the new length -void setResizeMessage(const label len); - -//- Get max size of all receive buffers to procNo -label getMaxBufSize(const int procNo); - -}; - - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -} // End namespace Foam - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -#endif - -// ************************************************************************* // diff --git a/src/Pstream/mpi/IPread.C b/src/Pstream/mpi/IPread.C deleted file mode 100644 index 8f533bb53..000000000 --- a/src/Pstream/mpi/IPread.C +++ /dev/null @@ -1,235 +0,0 @@ -/*---------------------------------------------------------------------------*\ - ========= | - \\ / F ield | foam-extend: Open Source CFD - \\ / O peration | Version: 3.2 - \\ / A nd | Web: http://www.foam-extend.org - \\/ M anipulation | For copyright notice see file Copyright -------------------------------------------------------------------------------- -License - This file is part of foam-extend. - - foam-extend is free software: you can redistribute it and/or modify it - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - foam-extend is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with foam-extend. If not, see . - -Description - Read token and binary block from IPstream - -\*---------------------------------------------------------------------------*/ - -#include "mpi.h" - -#include "IPstream.H" -#include "PstreamGlobals.H" - -// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // - -Foam::IPstream::IPstream -( - const commsTypes commsType, - const int fromProcNo, - const label bufSize, - streamFormat format, - versionNumber version -) -: - Pstream(commsType, bufSize), - Istream(format, version), - fromProcNo_(fromProcNo), - messageSize_(0) -{ - setOpened(); - setGood(); - - MPI_Status status; - - // If the buffer size is not specified, probe the incoming message - // and set it - if (!bufSize) - { - MPI_Probe(procID(fromProcNo_), msgType(), MPI_COMM_WORLD, &status); - MPI_Get_count(&status, MPI_BYTE, &messageSize_); - - buf_.setSize(messageSize_); - } - - messageSize_ = read(commsType, fromProcNo_, buf_.begin(), buf_.size()); - - if (!messageSize_) - { - FatalErrorIn - ( - "IPstream::IPstream(const int fromProcNo, " - "const label bufSize, streamFormat format, versionNumber version)" - ) << "read failed" - << Foam::abort(FatalError); - } -} - - -// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // - -Foam::label Foam::IPstream::read -( - const commsTypes commsType, - const int fromProcNo, - char* buf, - const std::streamsize bufSize -) -{ - if (commsType == blocking || commsType == scheduled) - { - MPI_Status status; - - if - ( - MPI_Recv - ( - buf, - bufSize, - MPI_PACKED, - procID(fromProcNo), - msgType(), - MPI_COMM_WORLD, - &status - ) - ) - { - FatalErrorIn - ( - "IPstream::read" - "(const int fromProcNo, char* buf, std::streamsize bufSize)" - ) << "MPI_Recv cannot receive incoming message" - << Foam::abort(FatalError); - - return 0; - } - - - // Check size of message read - - label messageSize; - MPI_Get_count(&status, MPI_BYTE, &messageSize); - - if (messageSize > bufSize) - { - FatalErrorIn - ( - "IPstream::read" - "(const int fromProcNo, char* buf, std::streamsize bufSize)" - ) << "buffer (" << label(bufSize) - << ") not large enough for incoming message (" - << messageSize << ')' - << Foam::abort(FatalError); - } - - return messageSize; - } - else if (commsType == nonBlocking) - { - MPI_Request request; - - if - ( - MPI_Irecv - ( - buf, - bufSize, - MPI_PACKED, - procID(fromProcNo), - msgType(), - MPI_COMM_WORLD, - &request - ) - ) - { - FatalErrorIn - ( - "IPstream::read" - "(const int fromProcNo, char* buf, std::streamsize bufSize)" - ) << "MPI_Recv cannot start non-blocking receive" - << Foam::abort(FatalError); - - return 0; - } - - PstreamGlobals::IPstream_outstandingRequests_.append(request); - - return 1; - } - else - { - FatalErrorIn - ( - "IPstream::read" - "(const int fromProcNo, char* buf, std::streamsize bufSize)" - ) << "Unsupported communications type " << commsType - << Foam::abort(FatalError); - - return 0; - } -} - - -void Foam::IPstream::waitRequests() -{ - if (PstreamGlobals::IPstream_outstandingRequests_.size()) - { - if - ( - MPI_Waitall - ( - PstreamGlobals::IPstream_outstandingRequests_.size(), - PstreamGlobals::IPstream_outstandingRequests_.begin(), - MPI_STATUSES_IGNORE - ) - ) - { - FatalErrorIn - ( - "IPstream::waitRequests()" - ) << "MPI_Waitall returned with error" << endl; - } - - PstreamGlobals::IPstream_outstandingRequests_.clear(); - } -} - - -bool Foam::IPstream::finishedRequest(const label i) -{ - if (i >= PstreamGlobals::IPstream_outstandingRequests_.size()) - { - FatalErrorIn - ( - "IPstream::finishedRequest(const label)" - ) << "There are " - << PstreamGlobals::IPstream_outstandingRequests_.size() - << " outstanding send requests and you are asking for i=" << i - << nl - << "Maybe you are mixing blocking/non-blocking comms?" - << Foam::abort(FatalError); - } - - int flag; - MPI_Test - ( - &PstreamGlobals::IPstream_outstandingRequests_[i], - &flag, - MPI_STATUS_IGNORE - ); - - return flag != 0; -} - - -// ************************************************************************* // diff --git a/src/Pstream/mpi/Make/files b/src/Pstream/mpi/Make/files deleted file mode 100644 index 6b1d3f4c5..000000000 --- a/src/Pstream/mpi/Make/files +++ /dev/null @@ -1,6 +0,0 @@ -OPwrite.C -IPread.C -Pstream.C -PstreamGlobals.C - -LIB = $(FOAM_MPI_LIBBIN)/libPstream diff --git a/src/Pstream/mpi/Make/options b/src/Pstream/mpi/Make/options deleted file mode 100644 index 90632e801..000000000 --- a/src/Pstream/mpi/Make/options +++ /dev/null @@ -1,4 +0,0 @@ -include $(RULES)/mplib$(WM_MPLIB) - -EXE_INC = $(PFLAGS) $(PINC) -LIB_LIBS = $(PLIBS) diff --git a/src/Pstream/mpi/Pstream.C b/src/Pstream/mpi/Pstream.C deleted file mode 100644 index 47b76100f..000000000 --- a/src/Pstream/mpi/Pstream.C +++ /dev/null @@ -1,428 +0,0 @@ -/*---------------------------------------------------------------------------*\ - ========= | - \\ / F ield | foam-extend: Open Source CFD - \\ / O peration | Version: 3.2 - \\ / A nd | Web: http://www.foam-extend.org - \\/ M anipulation | For copyright notice see file Copyright -------------------------------------------------------------------------------- -License - This file is part of foam-extend. - - foam-extend is free software: you can redistribute it and/or modify it - under the terms of the GNU General Public License as published by the - Free Software Foundation, either version 3 of the License, or (at your - option) any later version. - - foam-extend is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU General Public License - along with foam-extend. If not, see . - -\*---------------------------------------------------------------------------*/ - -#include "mpi.h" - -#include "Pstream.H" -#include "PstreamReduceOps.H" -#include "OSspecific.H" - -#include -#include -#include - -#if defined(WM_SP) -# define MPI_SCALAR MPI_FLOAT -#elif defined(WM_DP) -# define MPI_SCALAR MPI_DOUBLE -#elif defined(WM_LDP) -# define MPI_SCALAR MPI_LONG_DOUBLE -#endif - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -// NOTE: -// valid parallel options vary between implementations, but flag common ones. -// if they are not removed by MPI_Init(), the subsequent argument processing -// will notice that they are wrong -void Foam::Pstream::addValidParOptions(HashTable& validParOptions) -{ - validParOptions.insert("np", ""); - validParOptions.insert("p4pg", "PI file"); - validParOptions.insert("p4wd", "directory"); - validParOptions.insert("p4amslave", ""); - validParOptions.insert("p4yourname", "hostname"); - validParOptions.insert("GAMMANP", "number of instances"); - validParOptions.insert("machinefile", "machine file"); -} - - -bool Foam::Pstream::init(int& argc, char**& argv) -{ - MPI_Init(&argc, &argv); - - int numprocs; - MPI_Comm_size(MPI_COMM_WORLD, &numprocs); - MPI_Comm_rank(MPI_COMM_WORLD, &myProcNo_); - - if (numprocs <= 1) - { - FatalErrorIn("Pstream::init(int& argc, char**& argv)") - << "bool Pstream::init(int& argc, char**& argv) : " - "attempt to run parallel on 1 processor" - << Foam::abort(FatalError); - } - - procIDs_.setSize(numprocs); - - forAll(procIDs_, procNo) - { - procIDs_[procNo] = procNo; - } - - setParRun(); - -# ifndef SGIMPI - string bufferSizeName = getEnv("MPI_BUFFER_SIZE"); - - if (bufferSizeName.size()) - { - int bufferSize = atoi(bufferSizeName.c_str()); - - if (bufferSize) - { - MPI_Buffer_attach(new char[bufferSize], bufferSize); - } - } - else - { - FatalErrorIn("Pstream::init(int& argc, char**& argv)") - << "Pstream::init(int& argc, char**& argv) : " - << "environment variable MPI_BUFFER_SIZE not defined" - << Foam::abort(FatalError); - } -# endif - - int processorNameLen; - char processorName[MPI_MAX_PROCESSOR_NAME]; - - MPI_Get_processor_name(processorName, &processorNameLen); - - //signal(SIGABRT, stop); - - // Now that nprocs is known construct communication tables. - initCommunicationSchedule(); - - return true; -} - - -void Foam::Pstream::exit(int errnum) -{ -# ifndef SGIMPI - int size; - char* buff; - MPI_Buffer_detach(&buff, &size); - delete[] buff; -# endif - - if (errnum == 0) - { - MPI_Finalize(); - ::exit(errnum); - } - else - { - MPI_Abort(MPI_COMM_WORLD, errnum); - } -} - - -void Foam::Pstream::abort() -{ - MPI_Abort(MPI_COMM_WORLD, 1); -} - - -void Foam::reduce(scalar& Value, const sumOp& bop) -{ - if (!Pstream::parRun()) - { - return; - } - - if (Pstream::nProcs() <= Pstream::nProcsSimpleSum()) - { - if (Pstream::master()) - { - for - ( - int slave=Pstream::firstSlave(); - slave<=Pstream::lastSlave(); - slave++ - ) - { - scalar value; - - if - ( - MPI_Recv - ( - &value, - 1, - MPI_SCALAR, - Pstream::procID(slave), - Pstream::msgType(), - MPI_COMM_WORLD, - MPI_STATUS_IGNORE - ) - ) - { - FatalErrorIn - ( - "reduce(scalar& Value, const sumOp& sumOp)" - ) << "MPI_Recv failed" - << Foam::abort(FatalError); - } - - Value = bop(Value, value); - } - } - else - { - if - ( - MPI_Send - ( - &Value, - 1, - MPI_SCALAR, - Pstream::procID(Pstream::masterNo()), - Pstream::msgType(), - MPI_COMM_WORLD - ) - ) - { - FatalErrorIn - ( - "reduce(scalar& Value, const sumOp& sumOp)" - ) << "MPI_Send failed" - << Foam::abort(FatalError); - } - } - - - if (Pstream::master()) - { - for - ( - int slave=Pstream::firstSlave(); - slave<=Pstream::lastSlave(); - slave++ - ) - { - if - ( - MPI_Send - ( - &Value, - 1, - MPI_SCALAR, - Pstream::procID(slave), - Pstream::msgType(), - MPI_COMM_WORLD - ) - ) - { - FatalErrorIn - ( - "reduce(scalar& Value, const sumOp& sumOp)" - ) << "MPI_Send failed" - << Foam::abort(FatalError); - } - } - } - else - { - if - ( - MPI_Recv - ( - &Value, - 1, - MPI_SCALAR, - Pstream::procID(Pstream::masterNo()), - Pstream::msgType(), - MPI_COMM_WORLD, - MPI_STATUS_IGNORE - ) - ) - { - FatalErrorIn - ( - "reduce(scalar& Value, const sumOp& sumOp)" - ) << "MPI_Recv failed" - << Foam::abort(FatalError); - } - } - } - else - { - scalar sum; - MPI_Allreduce(&Value, &sum, 1, MPI_SCALAR, MPI_SUM, MPI_COMM_WORLD); - Value = sum; - - /* - int myProcNo = Pstream::myProcNo(); - int nProcs = Pstream::nProcs(); - - // - // receive from children - // - int level = 1; - int thisLevelOffset = 2; - int childLevelOffset = thisLevelOffset/2; - int childProcId = 0; - - while - ( - (childLevelOffset < nProcs) - && (myProcNo % thisLevelOffset) == 0 - ) - { - childProcId = myProcNo + childLevelOffset; - - scalar value; - - if (childProcId < nProcs) - { - if - ( - MPI_Recv - ( - &value, - 1, - MPI_SCALAR, - Pstream::procID(childProcId), - Pstream::msgType(), - MPI_COMM_WORLD, - MPI_STATUS_IGNORE - ) - ) - { - FatalErrorIn - ( - "reduce(scalar& Value, const sumOp& sumOp)" - ) << "MPI_Recv failed" - << Foam::abort(FatalError); - } - - Value = bop(Value, value); - } - - level++; - thisLevelOffset <<= 1; - childLevelOffset = thisLevelOffset/2; - } - - // - // send and receive from parent - // - if (!Pstream::master()) - { - int parentId = myProcNo - (myProcNo % thisLevelOffset); - - if - ( - MPI_Send - ( - &Value, - 1, - MPI_SCALAR, - Pstream::procID(parentId), - Pstream::msgType(), - MPI_COMM_WORLD - ) - ) - { - FatalErrorIn - ( - "reduce(scalar& Value, const sumOp& sumOp)" - ) << "MPI_Send failed" - << Foam::abort(FatalError); - } - - if - ( - MPI_Recv - ( - &Value, - 1, - MPI_SCALAR, - Pstream::procID(parentId), - Pstream::msgType(), - MPI_COMM_WORLD, - MPI_STATUS_IGNORE - ) - ) - { - FatalErrorIn - ( - "reduce(scalar& Value, const sumOp& sumOp)" - ) << "MPI_Recv failed" - << Foam::abort(FatalError); - } - } - - - // - // distribute to my children - // - level--; - thisLevelOffset >>= 1; - childLevelOffset = thisLevelOffset/2; - - while (level > 0) - { - childProcId = myProcNo + childLevelOffset; - - if (childProcId < nProcs) - { - if - ( - MPI_Send - ( - &Value, - 1, - MPI_SCALAR, - Pstream::procID(childProcId), - Pstream::msgType(), - MPI_COMM_WORLD - ) - ) - { - FatalErrorIn - ( - "reduce(scalar& Value, const sumOp& sumOp)" - ) << "MPI_Send failed" - << Foam::abort(FatalError); - } - } - - level--; - thisLevelOffset >>= 1; - childLevelOffset = thisLevelOffset/2; - } - */ - } -} - - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -// ************************************************************************* // diff --git a/src/foam/Make/files b/src/foam/Make/files index 945a7dccc..f5b131619 100644 --- a/src/foam/Make/files +++ b/src/foam/Make/files @@ -141,8 +141,11 @@ $(StringStreams)/StringStreamsPrint.C Pstreams = $(Streams)/Pstreams $(Pstreams)/Pstream.C $(Pstreams)/PstreamCommsStruct.C +$(Pstreams)/PstreamGlobals.C $(Pstreams)/IPstream.C $(Pstreams)/OPstream.C +$(Pstreams)/IPread.C +$(Pstreams)/OPwrite.C $(Pstreams)/PstreamsPrint.C dictionary = db/dictionary diff --git a/src/foam/Make/options b/src/foam/Make/options index 5d55cb1c8..b3ff0c4f3 100644 --- a/src/foam/Make/options +++ b/src/foam/Make/options @@ -1,9 +1,10 @@ -EXE_INC = \ +include $(RULES)/mplib$(WM_MPLIB) + +EXE_INC = $(PFLAGS) $(PINC)\ -I$(WM_THIRD_PARTY_DIR)/zlib-1.2.3 -LIB_LIBS = \ +LIB_LIBS = $(PLIBS)\ $(FOAM_LIBBIN)/libOSspecific.o \ - -L$(FOAM_LIBBIN)/dummy -lPstream \ -lz $(OBJECTS_DIR)/global.o: FORCE diff --git a/src/foam/db/IOstreams/Pstreams/IPread.C b/src/foam/db/IOstreams/Pstreams/IPread.C index 93d3941fe..1efd67d49 100644 --- a/src/foam/db/IOstreams/Pstreams/IPread.C +++ b/src/foam/db/IOstreams/Pstreams/IPread.C @@ -1,9 +1,9 @@ /*---------------------------------------------------------------------------*\ ========= | \\ / F ield | foam-extend: Open Source CFD - \\ / O peration | Version: 3.2 - \\ / A nd | Web: http://www.foam-extend.org - \\/ M anipulation | For copyright notice see file Copyright + \\ / O peration | + \\ / A nd | For copyright notice see file Copyright + \\/ M anipulation | ------------------------------------------------------------------------------- License This file is part of foam-extend. @@ -26,136 +26,212 @@ Description \*---------------------------------------------------------------------------*/ +#include "mpi.h" + #include "IPstream.H" -#include "error.H" -#include "int.H" +#include "PstreamGlobals.H" -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // +// * * * * * * * * * * * * * * * * Constructor * * * * * * * * * * * * * * * // -namespace Foam +Foam::IPstream::IPstream +( + const commsTypes commsType, + const int fromProcNo, + const label bufSize, + streamFormat format, + versionNumber version +) +: + Pstream(commsType, bufSize), + Istream(format, version), + fromProcNo_(fromProcNo), + messageSize_(0) { + setOpened(); + setGood(); -// * * * * * * * * * * * * * Private member functions * * * * * * * * * * * // + MPI_Status status; -inline void IPstream::checkEof() -{ - if (bufPosition_ == messageSize_) + // If the buffer size is not specified, probe the incomming message + // and set it + if (!bufSize) { - setEof(); + MPI_Probe(procID(fromProcNo_), msgType(), MPI_COMM_WORLD, &status); + MPI_Get_count(&status, MPI_BYTE, &messageSize_); + + buf_.setSize(messageSize_); + } + + messageSize_ = read(commsType, fromProcNo_, buf_.begin(), buf_.size()); + + if (!messageSize_) + { + FatalErrorIn + ( + "IPstream::IPstream(const int fromProcNo, " + "const label bufSize, streamFormat format, versionNumber version)" + ) << "read failed" + << Foam::abort(FatalError); } } -template -inline void IPstream::readFromBuffer(T& t) +// * * * * * * * * * * * * * * * Member Functions * * * * * * * * * * * * * // + +Foam::label Foam::IPstream::read +( + const commsTypes commsType, + const int fromProcNo, + char* buf, + const std::streamsize bufSize +) { - const size_t align = sizeof(T); - bufPosition_ = align + ((bufPosition_ - 1) & ~(align - 1)); - - t = reinterpret_cast(buf_[bufPosition_]); - bufPosition_ += sizeof(T); - checkEof(); -} - - -inline void IPstream::readFromBuffer(void* data, size_t count, size_t align) -{ - if (align > 1) + if (commsType == blocking || commsType == scheduled) { - bufPosition_ = align + ((bufPosition_ - 1) & ~(align - 1)); + MPI_Status status; + + if + ( + MPI_Recv + ( + buf, + bufSize, + MPI_PACKED, + procID(fromProcNo), + msgType(), + MPI_COMM_WORLD, + &status + ) + ) + { + FatalErrorIn + ( + "IPstream::read" + "(const int fromProcNo, char* buf, std::streamsize bufSize)" + ) << "MPI_Recv cannot receive incomming message" + << Foam::abort(FatalError); + + return 0; + } + + + // Check size of message read + + label messageSize; + MPI_Get_count(&status, MPI_BYTE, &messageSize); + + if (messageSize > bufSize) + { + FatalErrorIn + ( + "IPstream::read" + "(const int fromProcNo, char* buf, std::streamsize bufSize)" + ) << "buffer (" << label(bufSize) + << ") not large enough for incomming message (" + << messageSize << ')' + << Foam::abort(FatalError); + } + + return messageSize; } - - register const char* bufPtr = &buf_[bufPosition_]; - - register char* dataPtr = reinterpret_cast(data); - register size_t i = count; - while (i--) *dataPtr++ = *bufPtr++; - bufPosition_ += count; - checkEof(); -} - - -// * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // - -IPstream::~IPstream() -{} - - -Istream& IPstream::read(char& c) -{ - c = buf_[bufPosition_]; - bufPosition_++; - checkEof(); - return *this; -} - - -Istream& IPstream::read(word& w) -{ - size_t ws; - readFromBuffer(ws); - w = &buf_[bufPosition_]; - bufPosition_ += ws + 1; - checkEof(); - return *this; -} - - -Istream& IPstream::read(string& s) -{ - size_t ss; - readFromBuffer(ss); - s = &buf_[bufPosition_]; - bufPosition_ += ss + 1; - checkEof(); - return *this; -} - - -Istream& IPstream::read(label& l) -{ - readFromBuffer(l); - return *this; -} - - -Istream& IPstream::read(floatScalar& s) -{ - readFromBuffer(s); - return *this; -} - - -Istream& IPstream::read(doubleScalar& s) -{ - readFromBuffer(s); - return *this; -} - - -Istream& IPstream::read(char* data, std::streamsize count) -{ - if (format() != BINARY) + else if (commsType == nonBlocking) { - FatalErrorIn("IPstream::read(char*, std::streamsize)") - << "stream format not binary" + MPI_Request request; + + if + ( + MPI_Irecv + ( + buf, + bufSize, + MPI_PACKED, + procID(fromProcNo), + msgType(), + MPI_COMM_WORLD, + &request + ) + ) + { + FatalErrorIn + ( + "IPstream::read" + "(const int fromProcNo, char* buf, std::streamsize bufSize)" + ) << "MPI_Recv cannot start non-blocking receive" + << Foam::abort(FatalError); + + return 0; + } + + PstreamGlobals::IPstream_outstandingRequests_.append(request); + + return 1; + } + else + { + FatalErrorIn + ( + "IPstream::read" + "(const int fromProcNo, char* buf, std::streamsize bufSize)" + ) << "Unsupported communications type " << commsType + << Foam::abort(FatalError); + + return 0; + } +} + + +void Foam::IPstream::waitRequests() +{ + if (PstreamGlobals::IPstream_outstandingRequests_.size()) + { + if + ( + MPI_Waitall + ( + PstreamGlobals::IPstream_outstandingRequests_.size(), + PstreamGlobals::IPstream_outstandingRequests_.begin(), + MPI_STATUSES_IGNORE + ) + ) + { + FatalErrorIn + ( + "IPstream::waitRequests()" + ) << "MPI_Waitall returned with error" << endl; + } + + PstreamGlobals::IPstream_outstandingRequests_.clear(); + } +} + + +bool Foam::IPstream::finishedRequest(const label i) +{ + if (i >= PstreamGlobals::IPstream_outstandingRequests_.size()) + { + FatalErrorIn + ( + "IPstream::finishedRequest(const label)" + ) << "There are " + << PstreamGlobals::IPstream_outstandingRequests_.size() + << " outstanding send requests and you are asking for i=" << i + << nl + << "Maybe you are mixing blocking/non-blocking comms?" << Foam::abort(FatalError); } - readFromBuffer(data, count, 8); - return *this; -} + int flag; + MPI_Test + ( + &PstreamGlobals::IPstream_outstandingRequests_[i], + &flag, + MPI_STATUS_IGNORE + ); - -Istream& IPstream::rewind() -{ - bufPosition_ = 0; - return *this; + return flag != 0; } // * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * // -} // End namespace Foam - // ************************************************************************* // diff --git a/src/Pstream/mpi/OPwrite.C b/src/foam/db/IOstreams/Pstreams/OPwrite.C similarity index 100% rename from src/Pstream/mpi/OPwrite.C rename to src/foam/db/IOstreams/Pstreams/OPwrite.C diff --git a/src/foam/db/IOstreams/Pstreams/Pstream.C b/src/foam/db/IOstreams/Pstreams/Pstream.C index bdab2f32c..bb5e9a97e 100644 --- a/src/foam/db/IOstreams/Pstreams/Pstream.C +++ b/src/foam/db/IOstreams/Pstreams/Pstream.C @@ -23,9 +23,27 @@ License \*---------------------------------------------------------------------------*/ + +#include +#include +#include + +#include "mpi.h" + #include "Pstream.H" +#include "PstreamReduceOps.H" #include "debug.H" #include "dictionary.H" +#include "OSspecific.H" + +#if defined(WM_SP) +# define MPI_SCALAR MPI_FLOAT +#elif defined(WM_DP) +# define MPI_SCALAR MPI_DOUBLE +#elif defined(WM_LDP) +# define MPI_SCALAR MPI_LONG_DOUBLE +#endif + // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * // @@ -204,6 +222,383 @@ void Foam::Pstream::initCommunicationSchedule() } +// NOTE: +// valid parallel options vary between implementations, but flag common ones. +// if they are not removed by MPI_Init(), the subsequent argument processing +// will notice that they are wrong +void Foam::Pstream::addValidParOptions(HashTable& validParOptions) +{ + validParOptions.insert("np", ""); + validParOptions.insert("p4pg", "PI file"); + validParOptions.insert("p4wd", "directory"); + validParOptions.insert("p4amslave", ""); + validParOptions.insert("p4yourname", "hostname"); + validParOptions.insert("GAMMANP", "number of instances"); + validParOptions.insert("machinefile", "machine file"); +} + + +bool Foam::Pstream::init(int& argc, char**& argv) +{ + MPI_Init(&argc, &argv); + + int numprocs; + MPI_Comm_size(MPI_COMM_WORLD, &numprocs); + MPI_Comm_rank(MPI_COMM_WORLD, &myProcNo_); + + if (numprocs <= 1) + { + FatalErrorIn("Pstream::init(int& argc, char**& argv)") + << "bool Pstream::init(int& argc, char**& argv) : " + "attempt to run parallel on 1 processor" + << Foam::abort(FatalError); + } + + procIDs_.setSize(numprocs); + + forAll(procIDs_, procNo) + { + procIDs_[procNo] = procNo; + } + + setParRun(); + +# ifndef SGIMPI + string bufferSizeName = getEnv("MPI_BUFFER_SIZE"); + + if (bufferSizeName.size()) + { + int bufferSize = atoi(bufferSizeName.c_str()); + + if (bufferSize) + { + MPI_Buffer_attach(new char[bufferSize], bufferSize); + } + } + else + { + FatalErrorIn("Pstream::init(int& argc, char**& argv)") + << "Pstream::init(int& argc, char**& argv) : " + << "environment variable MPI_BUFFER_SIZE not defined" + << Foam::abort(FatalError); + } +# endif + + int processorNameLen; + char processorName[MPI_MAX_PROCESSOR_NAME]; + + MPI_Get_processor_name(processorName, &processorNameLen); + + //signal(SIGABRT, stop); + + // Now that nprocs is known construct communication tables. + initCommunicationSchedule(); + + return true; +} + + +void Foam::Pstream::exit(int errnum) +{ +# ifndef SGIMPI + int size; + char* buff; + MPI_Buffer_detach(&buff, &size); + delete[] buff; +# endif + + if (errnum == 0) + { + MPI_Finalize(); + ::exit(errnum); + } + else + { + MPI_Abort(MPI_COMM_WORLD, errnum); + } +} + + +void Foam::Pstream::abort() +{ + MPI_Abort(MPI_COMM_WORLD, 1); +} + + +void Foam::reduce(scalar& Value, const sumOp& bop) +{ + if (!Pstream::parRun()) + { + return; + } + + if (Pstream::nProcs() <= Pstream::nProcsSimpleSum()) + { + if (Pstream::master()) + { + for + ( + int slave=Pstream::firstSlave(); + slave<=Pstream::lastSlave(); + slave++ + ) + { + scalar value; + + if + ( + MPI_Recv + ( + &value, + 1, + MPI_SCALAR, + Pstream::procID(slave), + Pstream::msgType(), + MPI_COMM_WORLD, + MPI_STATUS_IGNORE + ) + ) + { + FatalErrorIn + ( + "reduce(scalar& Value, const sumOp& sumOp)" + ) << "MPI_Recv failed" + << Foam::abort(FatalError); + } + + Value = bop(Value, value); + } + } + else + { + if + ( + MPI_Send + ( + &Value, + 1, + MPI_SCALAR, + Pstream::procID(Pstream::masterNo()), + Pstream::msgType(), + MPI_COMM_WORLD + ) + ) + { + FatalErrorIn + ( + "reduce(scalar& Value, const sumOp& sumOp)" + ) << "MPI_Send failed" + << Foam::abort(FatalError); + } + } + + + if (Pstream::master()) + { + for + ( + int slave=Pstream::firstSlave(); + slave<=Pstream::lastSlave(); + slave++ + ) + { + if + ( + MPI_Send + ( + &Value, + 1, + MPI_SCALAR, + Pstream::procID(slave), + Pstream::msgType(), + MPI_COMM_WORLD + ) + ) + { + FatalErrorIn + ( + "reduce(scalar& Value, const sumOp& sumOp)" + ) << "MPI_Send failed" + << Foam::abort(FatalError); + } + } + } + else + { + if + ( + MPI_Recv + ( + &Value, + 1, + MPI_SCALAR, + Pstream::procID(Pstream::masterNo()), + Pstream::msgType(), + MPI_COMM_WORLD, + MPI_STATUS_IGNORE + ) + ) + { + FatalErrorIn + ( + "reduce(scalar& Value, const sumOp& sumOp)" + ) << "MPI_Recv failed" + << Foam::abort(FatalError); + } + } + } + else + { + scalar sum; + MPI_Allreduce(&Value, &sum, 1, MPI_SCALAR, MPI_SUM, MPI_COMM_WORLD); + Value = sum; + + /* + int myProcNo = Pstream::myProcNo(); + int nProcs = Pstream::nProcs(); + + // + // receive from children + // + int level = 1; + int thisLevelOffset = 2; + int childLevelOffset = thisLevelOffset/2; + int childProcId = 0; + + while + ( + (childLevelOffset < nProcs) + && (myProcNo % thisLevelOffset) == 0 + ) + { + childProcId = myProcNo + childLevelOffset; + + scalar value; + + if (childProcId < nProcs) + { + if + ( + MPI_Recv + ( + &value, + 1, + MPI_SCALAR, + Pstream::procID(childProcId), + Pstream::msgType(), + MPI_COMM_WORLD, + MPI_STATUS_IGNORE + ) + ) + { + FatalErrorIn + ( + "reduce(scalar& Value, const sumOp& sumOp)" + ) << "MPI_Recv failed" + << Foam::abort(FatalError); + } + + Value = bop(Value, value); + } + + level++; + thisLevelOffset <<= 1; + childLevelOffset = thisLevelOffset/2; + } + + // + // send and receive from parent + // + if (!Pstream::master()) + { + int parentId = myProcNo - (myProcNo % thisLevelOffset); + + if + ( + MPI_Send + ( + &Value, + 1, + MPI_SCALAR, + Pstream::procID(parentId), + Pstream::msgType(), + MPI_COMM_WORLD + ) + ) + { + FatalErrorIn + ( + "reduce(scalar& Value, const sumOp& sumOp)" + ) << "MPI_Send failed" + << Foam::abort(FatalError); + } + + if + ( + MPI_Recv + ( + &Value, + 1, + MPI_SCALAR, + Pstream::procID(parentId), + Pstream::msgType(), + MPI_COMM_WORLD, + MPI_STATUS_IGNORE + ) + ) + { + FatalErrorIn + ( + "reduce(scalar& Value, const sumOp& sumOp)" + ) << "MPI_Recv failed" + << Foam::abort(FatalError); + } + } + + + // + // distribute to my children + // + level--; + thisLevelOffset >>= 1; + childLevelOffset = thisLevelOffset/2; + + while (level > 0) + { + childProcId = myProcNo + childLevelOffset; + + if (childProcId < nProcs) + { + if + ( + MPI_Send + ( + &Value, + 1, + MPI_SCALAR, + Pstream::procID(childProcId), + Pstream::msgType(), + MPI_COMM_WORLD + ) + ) + { + FatalErrorIn + ( + "reduce(scalar& Value, const sumOp& sumOp)" + ) << "MPI_Send failed" + << Foam::abort(FatalError); + } + } + + level--; + thisLevelOffset >>= 1; + childLevelOffset = thisLevelOffset/2; + } + */ + } +} + + // * * * * * * * * * * * * * * Static Data Members * * * * * * * * * * * * * // // Initialise my process number to 0 (the master) diff --git a/src/Pstream/mpi/PstreamGlobals.C b/src/foam/db/IOstreams/Pstreams/PstreamGlobals.C similarity index 100% rename from src/Pstream/mpi/PstreamGlobals.C rename to src/foam/db/IOstreams/Pstreams/PstreamGlobals.C diff --git a/src/Pstream/mpi/PstreamGlobals.H b/src/foam/db/IOstreams/Pstreams/PstreamGlobals.H similarity index 100% rename from src/Pstream/mpi/PstreamGlobals.H rename to src/foam/db/IOstreams/Pstreams/PstreamGlobals.H